tembo-io/pgmq

improve management of queue archives

Opened this issue · 0 comments

pgmq.archive() deletes the message from the queue and inserts it into the queue's archive table. Messages will remain in the archive table indefinitely. Depending on volume and available storage, that can become a big problem. There's nothing preventing users from managing the queue archives themselves, but I think ideally the PGMQ API provides this for users.

In the Tembo platform, we are partitioning the archive table with pg_partman manually. Below is how we migrate an existing archive table into a partitioned table. Note: this does not migrate existing archive data to the new partitioned archive table:

Here the queue name is "data_plane_events".

BEGIN;

ALTER TABLE pgmq.a_data_plane_events RENAME TO a_data_plane_events_old;

-- DDL from existing table / same as PGMQ def but with partitioning
CREATE TABLE pgmq.a_data_plane_events (
	msg_id bigint NOT NULL,
	read_ct int4 NULL DEFAULT 0,
	enqueued_at timestamptz NULL DEFAULT now(),
	archived_at timestamptz NULL DEFAULT now(),
	vt timestamptz NULL,
	message jsonb NULL
)
PARTITION BY RANGE (archived_at);
ALTER INDEX pgmq.archived_at_idx_data_plane_events RENAME TO archived_at_idx_data_plane_events_old;
CREATE INDEX pgmqarchived_at_idx_data_plane_events ON pgmq.a_data_plane_events (archived_at);

-- initialize partman
SELECT public.create_parent(
    'pgmq.a_data_plane_events',
    'archived_at',
    'native',
    'daily'
);
-- set retention
UPDATE part_config
    SET retention = '30 days',
        retention_keep_table = false,
        retention_keep_index = false,
        infinite_time_partitions = true
    WHERE parent_table = 'pgmq.a_data_plane_events';

select public.run_maintenance();

COMMIT;

Once data ages out the aged partitions are just dropped and that data is lost. I think a better solution would be have the option to move the partitions to S3 instead of dropping them. We could do this by integrating with pg_tier. Those aged partitions would move to S3 and remain queryable (albeit slower). This would effectively provide a "bottomless" storage for the queue archives.