Public Functions

The queue is used by a client in the following steps

1.  Register the client (a queue consumer)

pgq.register_consumer(queue_name, consumer_id)

2. run a loop createing, consuming and closing batches

2a. pgq.get_batch_events(batch_id int8)returns an int8 batch handle
2b. pgq.get_batch_events(batch_id int8)returns a set of events for current batch

the event structure is :(ev_id int8, ev_time timestamptz, ev_txid int8, ev_retry int4, ev_type text, ev_data text, ev_extra1, ev_extra2, ev_extra3, ev_extra4)

2c. if any of the events need to be tagged as failed, use a the function

pgq.event_failed(batch_id int8, event_id int8, reason text)

2d.  if you want the event to be re-inserted in the main queue afrer N seconds, use

pgq.event_retry(batch_id int8, event_id int8, retry_seconds int4)

2e.  To finish processing and release the batch, use

pgq.finish_batch(batch_id int8)

Until this is not done, the consumer will get same batch again.

After calling finish_batch consumer cannot do any operations with events of that batch.  All operations must be done before.

-- ----------------------------------------------------------------------

Summary
Public FunctionsThe queue is used by a client in the following steps
Queue creation
pgq.create_queue(1)Creates new queue with given name.
pgq.drop_queue(2)Drop queue and all associated tables.
pgq.drop_queue(1)Drop queue and all associated tables.
pgq.set_queue_config(3)Set configuration for specified queue.
Event publishing
pgq.insert_event(3)Insert a event into queue.
pgq.insert_event(7)Insert a event into queue with all the extra fields.
pgq.current_event_table(1)Return active event table for particular queue.
Subscribing to queue
pgq.register_consumer(2)Subscribe consumer on a queue.
pgq.register_consumer_at(3)Extended registration, allows to specify tick_id.
pgq.unregister_consumer(2)Unsubscribe consumer from the queue.
Batch processing
pgq.next_batch_info(2)Makes next block of events active.
pgq.next_batch(2)Old function that returns just batch_id.
pgq.next_batch_custom(5)Makes next block of events active.
pgq.get_batch_events(1)Get all events in batch.
pgq.get_batch_cursor(4)Get events in batch using a cursor.
pgq.get_batch_cursor(3)Get events in batch using a cursor.
pgq.event_retry(3a)Put the event into retry queue, to be processed again later.
pgq.event_retry(3b)Put the event into retry queue, to be processed later again.
pgq.batch_retry(2)Put whole batch into retry queue, to be processed again later.
pgq.finish_batch(1)Closes a batch.
General info functions
pgq.get_queue_info(0)Get info about all queues.
pgq.get_queue_info(1)Get info about particular queue.
pgq.get_consumer_info(0)Returns info about all consumers on all queues.
pgq.get_consumer_info(1)Returns info about all consumers on single queue.
pgq.get_consumer_info(2)Get info about particular consumer on particular queue.
pgq.version(0)Returns version string for pgq.
pgq.get_batch_info(1)Returns detailed info about a batch.

Queue creation

pgq.create_queue(1)

pgq.create_queue(i_queue_name text) returns integer

Creates new queue with given name.

Returns

0queue already exists
1queue created Calls: pgq.grant_perms(i_queue_name); pgq.ticker(i_queue_name); pgq.tune_storage(i_queue_name); Tables directly manipulated:
insertpgq.queue
createpgq.event_N () inherits (pgq.event_template)
createpgq.event_N_0 .. pgq.event_N_M () inherits (pgq.event_N)

pgq.drop_queue(2)

pgq.drop_queue(x_queue_name text,
x_force bool) returns integer

Drop queue and all associated tables.

Parameters

x_queue_namequeue name
x_forceignore (drop) existing consumers Returns:
1success Calls: pgq.unregister_consumer(queue_name, consumer_name) perform pgq.ticker(i_queue_name); perform pgq.tune_storage(i_queue_name); Tables directly manipulated:
deletepgq.queue
droppgq.event_N (), pgq.event_N_0 .. pgq.event_N_M

pgq.drop_queue(1)

pgq.drop_queue(x_queue_name text) returns integer

Drop queue and all associated tables.  No consumers must be listening on the queue.

pgq.set_queue_config(3)

pgq.set_queue_config(x_queue_name text,
x_param_name text,
x_param_value text) returns integer

Set configuration for specified queue.

Parameters

x_queue_nameName of the queue to configure.
x_param_nameConfiguration parameter name.
x_param_valueConfiguration parameter value.

Returns

0 if event was already in queue, 1 otherwise.  Calls: None Tables directly manipulated: update - pgq.queue

Event publishing

pgq.insert_event(3)

pgq.insert_event(queue_name text,
ev_type text,
ev_data text) returns bigint

Insert a event into queue.

Parameters

queue_nameName of the queue
ev_typeUser-specified type for the event
ev_dataUser data for the event

Returns

Event ID Calls: pgq.insert_event(7)

pgq.insert_event(7)

pgq.insert_event(queue_name text,
ev_type text,
ev_data text,
ev_extra1 text,
ev_extra2 text,
ev_extra3 text,
ev_extra4 text) returns bigint

Insert a event into queue with all the extra fields.

Parameters

queue_nameName of the queue
ev_typeUser-specified type for the event
ev_dataUser data for the event
ev_extra1Extra data field for the event
ev_extra2Extra data field for the event
ev_extra3Extra data field for the event
ev_extra4Extra data field for the event

Returns

Event ID Calls: pgq.insert_event_raw(11) Tables directly manipulated: insert - pgq.insert_event_raw(11), a C function, inserts into current event_N_M table

pgq.current_event_table(1)

pgq.current_event_table(x_queue_name text) returns text

Return active event table for particular queue.  Event can be added to it without going via functions, e.g. by COPY.

If queue is disabled and GUC session_replication_role <> ‘replica’ then raises exception.

or expressed in a different wayan even table of a disabled queue is returned only on replica

Note

The result is valid only during current transaction.

Permissions

Actual insertion requires superuser access.

Parameters

x_queue_nameQueue name.

Subscribing to queue

pgq.register_consumer(2)

pgq.register_consumer(x_queue_name text,
x_consumer_id text) returns integer

Subscribe consumer on a queue.

From this moment forward, consumer will see all events in the queue.

Parameters

x_queue_nameName of queue
x_consumer_nameName of consumer

Returns

0if already registered
1if new registration Calls: pgq.register_consumer_at(3) Tables directly manipulated: None

pgq.register_consumer_at(3)

pgq.register_consumer_at(x_queue_name text,
x_consumer_name text,
x_tick_pos bigint) returns integer

Extended registration, allows to specify tick_id.

Note

For usage in special situations.

Parameters

x_queue_nameName of a queue
x_consumer_nameName of consumer
x_tick_posTick ID

Returns

0/1 whether consumer has already registered.  Calls: None Tables directly manipulated: update/insert - pgq.subscription

pgq.unregister_consumer(2)

pgq.unregister_consumer(x_queue_name text,
x_consumer_name text) returns integer

Unsubscribe consumer from the queue.  Also consumer’s retry events are deleted.

Parameters

x_queue_nameName of the queue
x_consumer_nameName of the consumer

Returns

number of (sub)consumers unregistered Calls: None Tables directly manipulated: delete - pgq.retry_queue delete - pgq.subscription

Batch processing

pgq.next_batch_info(2)

pgq.next_batch_info(in i_queue_name text,
in i_consumer_name text,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8) as

Makes next block of events active.

If it returns NULL, there is no events available in queue.  Consumer should sleep then.

The values from event_id sequence may give hint how big the batch may be.  But they are inexact, they do not give exact size.  Client MUST NOT use them to detect whether the batch contains any events at all - the values are unfit for that purpose.

Parameters

i_queue_nameName of the queue
i_consumer_nameName of the consumer

Returns

batch_idBatch ID or NULL if there are no more events available.
cur_tick_idEnd tick id.
cur_tick_timeEnd tick time.
cur_tick_event_seqValue from event id sequence at the time tick was issued.
prev_tick_idStart tick id.
prev_tick_timeStart tick time.
prev_tick_event_seqvalue from event id sequence at the time tick was issued.  Calls: pgq.next_batch_custom(5) Tables directly manipulated: None

pgq.next_batch(2)

pgq.next_batch(in i_queue_name text,
in i_consumer_name text) returns int8

Old function that returns just batch_id.

Parameters

i_queue_nameName of the queue
i_consumer_nameName of the consumer

Returns

Batch ID or NULL if there are no more events available.

pgq.next_batch_custom(5)

pgq.next_batch_custom(in i_queue_name text,
in i_consumer_name text,
in i_min_lag interval,
in i_min_count int4,
in i_min_interval interval,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8) as

Makes next block of events active.  Block size can be tuned with i_min_count, i_min_interval parameters.  Events age can be tuned with i_min_lag.

If it returns NULL, there is no events available in queue.  Consumer should sleep then.

The values from event_id sequence may give hint how big the batch may be.  But they are inexact, they do not give exact size.  Client MUST NOT use them to detect whether the batch contains any events at all - the values are unfit for that purpose.

Note

i_min_lag together with i_min_interval/i_min_count is inefficient.

Parameters

i_queue_nameName of the queue
i_consumer_nameName of the consumer
i_min_lagConsumer wants events older than that
i_min_countConsumer wants batch to contain at least this many events
i_min_intervalConsumer wants batch to cover at least this much time

Returns

batch_idBatch ID or NULL if there are no more events available.
cur_tick_idEnd tick id.
cur_tick_timeEnd tick time.
cur_tick_event_seqValue from event id sequence at the time tick was issued.
prev_tick_idStart tick id.
prev_tick_timeStart tick time.
prev_tick_event_seqvalue from event id sequence at the time tick was issued.  Calls: pgq.insert_event_raw(11) Tables directly manipulated:
updatepgq.subscription

pgq.get_batch_events(1)

pgq.get_batch_events(in x_batch_id bigint,
out ev_id bigint,
out ev_time timestamptz,
out ev_txid bigint,
out ev_retry int4,
out ev_type text,
out ev_data text,
out ev_extra1 text,
out ev_extra2 text,
out ev_extra3 text,
out ev_extra4 text) returns setof record

Get all events in batch.

Parameters

x_batch_idID of active batch.

Returns

List of events.

pgq.get_batch_cursor(4)

pgq.get_batch_cursor(in i_batch_id bigint,
in i_cursor_name text,
in i_quick_limit int4,
in i_extra_where text,
out ev_id bigint,
out ev_time timestamptz,
out ev_txid bigint,
out ev_retry int4,
out ev_type text,
out ev_data text,
out ev_extra1 text,
out ev_extra2 text,
out ev_extra3 text,
out ev_extra4 text) returns setof record

Get events in batch using a cursor.

Parameters

i_batch_idID of active batch.
i_cursor_nameName for new cursor
i_quick_limitNumber of events to return immediately
i_extra_whereoptional where clause to filter events

Returns

List of events.  Calls: pgq.batch_event_sql(i_batch_id) - internal function which generates SQL optimised specially for getting events in this batch

pgq.get_batch_cursor(3)

pgq.get_batch_cursor(in i_batch_id bigint,
in i_cursor_name text,
in i_quick_limit int4,
out ev_id bigint,
out ev_time timestamptz,
out ev_txid bigint,
out ev_retry int4,
out ev_type text,
out ev_data text,
out ev_extra1 text,
out ev_extra2 text,
out ev_extra3 text,
out ev_extra4 text) returns setof record

Get events in batch using a cursor.

Parameters

i_batch_idID of active batch.
i_cursor_nameName for new cursor
i_quick_limitNumber of events to return immediately

Returns

List of events.  Calls: pgq.get_batch_cursor(4)

pgq.event_retry(3a)

pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_time timestamptz) returns integer

Put the event into retry queue, to be processed again later.

Parameters

x_batch_idID of active batch.
x_event_idevent id
x_retry_timeTime when the event should be put back into queue

Returns

1success
0event already in retry queue Calls: None Tables directly manipulated:
insertpgq.retry_queue

pgq.event_retry(3b)

pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_seconds integer) returns integer

Put the event into retry queue, to be processed later again.

Parameters

x_batch_idID of active batch.
x_event_idevent id
x_retry_secondsTime when the event should be put back into queue

Returns

1success
0event already in retry queue Calls: pgq.event_retry(3a) Tables directly manipulated: None

pgq.batch_retry(2)

pgq.batch_retry(i_batch_id bigint,
i_retry_seconds integer) returns integer

Put whole batch into retry queue, to be processed again later.

Parameters

i_batch_idID of active batch.
i_retry_timeTime when the event should be put back into queue

Returns

number of events inserted Calls: None Tables directly manipulated: pgq.retry_queue

pgq.finish_batch(1)

pgq.finish_batch(x_batch_id bigint) returns integer

Closes a batch.  No more operations can be done with events of this batch.

Parameters

x_batch_idid of batch.

Returns

1 if batch was found, 0 otherwise.  Calls: None Tables directly manipulated: update - pgq.subscription

General info functions

pgq.get_queue_info(0)

pgq.get_queue_info(out queue_name text,
out queue_ntables integer,
out queue_cur_table integer,
out queue_rotation_period interval,
out queue_switch_time timestamptz,
out queue_external_ticker boolean,
out queue_ticker_paused boolean,
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
out ticker_lag interval,
out ev_per_sec float8,
out ev_new bigint,
out last_tick_id bigint) returns setof record

Get info about all queues.

Returns

List of pgq.ret_queue_info records. queue_name - queue name queue_ntables - number of tables in this queue queue_cur_table - ??? queue_rotation_period - how often the event_N_M tables in this queue are rotated queue_switch_time - ??? when was this queue last rotated queue_external_ticker - ??? queue_ticker_paused - ??? is ticker paused in this queue queue_ticker_max_count - max number of events before a tick is issued queue_ticker_max_lag - maks time without a tick queue_ticker_idle_period - how often the ticker should check this queue ticker_lag - time from last tick ev_per_sec - how many events per second this queue serves ev_new - ??? last_tick_id - last tick id for this queue

pgq.get_queue_info(1)

pgq.get_queue_info(in i_queue_name text,
out queue_name text,
out queue_ntables integer,
out queue_cur_table integer,
out queue_rotation_period interval,
out queue_switch_time timestamptz,
out queue_external_ticker boolean,
out queue_ticker_paused boolean,
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
out ticker_lag interval,
out ev_per_sec float8,
out ev_new bigint,
out last_tick_id bigint) returns setof record

Get info about particular queue.

Returns

One pgq.ret_queue_info record. contente same as forpgq.get_queue_info()

pgq.get_consumer_info(0)

pgq.get_consumer_info(out queue_name text,
out consumer_name text,
out lag interval,
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
out next_tick bigint,
out pending_events bigint) returns setof record

Returns info about all consumers on all queues.

Returns

See pgq.get_consumer_info(2)

pgq.get_consumer_info(1)

pgq.get_consumer_info(in i_queue_name text,
out queue_name text,
out consumer_name text,
out lag interval,
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
out next_tick bigint,
out pending_events bigint) returns setof record

Returns info about all consumers on single queue.

Returns

See pgq.get_consumer_info(2)

pgq.get_consumer_info(2)

pgq.get_consumer_info(in i_queue_name text,
in i_consumer_name text,
out queue_name text,
out consumer_name text,
out lag interval,
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
out next_tick bigint,
out pending_events bigint) returns setof record

Get info about particular consumer on particular queue.

Parameters

i_queue_namename of a queue.  (null = all)
i_consumer_namename of a consumer (null = all)

Returns

queue_nameQueue name
consumer_nameConsumer name
lagHow old are events the consumer is processing
last_seenWhen the consumer seen by pgq
last_tickTick ID of last processed tick
current_batchCurrent batch ID, if one is active or NULL
next_tickIf batch is active, then its final tick.

pgq.version(0)

pgq.version() returns text

Returns version string for pgq.  ATM it is based on SkyTools version and only bumped when database code changes.

pgq.get_batch_info(1)

pgq.get_batch_info(in x_batch_id bigint,
out queue_name text,
out consumer_name text,
out batch_start timestamptz,
out batch_end timestamptz,
out prev_tick_id bigint,
out tick_id bigint,
out lag interval,
out seq_start bigint,
out seq_end bigint) as

Returns detailed info about a batch.

Parameters

x_batch_idid of a active batch.

Returns: ??? pls check queue_name - which queue this batch came from consumer_name - batch processed by batch_start - start time of batch batch_end - end time of batch prev_tick_id - start tick for this batch tick_id - end tick for this batch lag - now() - tick_id.time seq_start - start event id for batch seq_end - end event id for batch

pgq.create_queue(i_queue_name text) returns integer
Creates new queue with given name.
pgq.drop_queue(x_queue_name text,
x_force bool) returns integer
Drop queue and all associated tables.
pgq.drop_queue(x_queue_name text) returns integer
Drop queue and all associated tables.
pgq.set_queue_config(x_queue_name text,
x_param_name text,
x_param_value text) returns integer
Set configuration for specified queue.
pgq.insert_event(queue_name text,
ev_type text,
ev_data text) returns bigint
Insert a event into queue.
pgq.insert_event(queue_name text,
ev_type text,
ev_data text,
ev_extra1 text,
ev_extra2 text,
ev_extra3 text,
ev_extra4 text) returns bigint
Insert a event into queue with all the extra fields.
pgq.current_event_table(x_queue_name text) returns text
Return active event table for particular queue.
pgq.register_consumer(x_queue_name text,
x_consumer_id text) returns integer
Subscribe consumer on a queue.
pgq.register_consumer_at(x_queue_name text,
x_consumer_name text,
x_tick_pos bigint) returns integer
Extended registration, allows to specify tick_id.
pgq.unregister_consumer(x_queue_name text,
x_consumer_name text) returns integer
Unsubscribe consumer from the queue.
pgq.next_batch_info(in i_queue_name text,
in i_consumer_name text,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8) as
Makes next block of events active.
pgq.next_batch(in i_queue_name text,
in i_consumer_name text) returns int8
Old function that returns just batch_id.
pgq.next_batch_custom(in i_queue_name text,
in i_consumer_name text,
in i_min_lag interval,
in i_min_count int4,
in i_min_interval interval,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8) as
Makes next block of events active.
pgq.get_batch_events(in x_batch_id bigint,
out ev_id bigint,
out ev_time timestamptz,
out ev_txid bigint,
out ev_retry int4,
out ev_type text,
out ev_data text,
out ev_extra1 text,
out ev_extra2 text,
out ev_extra3 text,
out ev_extra4 text) returns setof record
Get all events in batch.
pgq.get_batch_cursor(in i_batch_id bigint,
in i_cursor_name text,
in i_quick_limit int4,
in i_extra_where text,
out ev_id bigint,
out ev_time timestamptz,
out ev_txid bigint,
out ev_retry int4,
out ev_type text,
out ev_data text,
out ev_extra1 text,
out ev_extra2 text,
out ev_extra3 text,
out ev_extra4 text) returns setof record
Get events in batch using a cursor.
pgq.get_batch_cursor(in i_batch_id bigint,
in i_cursor_name text,
in i_quick_limit int4,
out ev_id bigint,
out ev_time timestamptz,
out ev_txid bigint,
out ev_retry int4,
out ev_type text,
out ev_data text,
out ev_extra1 text,
out ev_extra2 text,
out ev_extra3 text,
out ev_extra4 text) returns setof record
Get events in batch using a cursor.
pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_time timestamptz) returns integer
Put the event into retry queue, to be processed again later.
pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_seconds integer) returns integer
Put the event into retry queue, to be processed later again.
pgq.batch_retry(i_batch_id bigint,
i_retry_seconds integer) returns integer
Put whole batch into retry queue, to be processed again later.
pgq.finish_batch(x_batch_id bigint) returns integer
Closes a batch.
pgq.get_queue_info(out queue_name text,
out queue_ntables integer,
out queue_cur_table integer,
out queue_rotation_period interval,
out queue_switch_time timestamptz,
out queue_external_ticker boolean,
out queue_ticker_paused boolean,
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
out ticker_lag interval,
out ev_per_sec float8,
out ev_new bigint,
out last_tick_id bigint) returns setof record
Get info about all queues.
pgq.get_queue_info(in i_queue_name text,
out queue_name text,
out queue_ntables integer,
out queue_cur_table integer,
out queue_rotation_period interval,
out queue_switch_time timestamptz,
out queue_external_ticker boolean,
out queue_ticker_paused boolean,
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
out ticker_lag interval,
out ev_per_sec float8,
out ev_new bigint,
out last_tick_id bigint) returns setof record
Get info about particular queue.
pgq.get_consumer_info(out queue_name text,
out consumer_name text,
out lag interval,
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
out next_tick bigint,
out pending_events bigint) returns setof record
Returns info about all consumers on all queues.
pgq.get_consumer_info(in i_queue_name text,
out queue_name text,
out consumer_name text,
out lag interval,
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
out next_tick bigint,
out pending_events bigint) returns setof record
Returns info about all consumers on single queue.
pgq.get_consumer_info(in i_queue_name text,
in i_consumer_name text,
out queue_name text,
out consumer_name text,
out lag interval,
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
out next_tick bigint,
out pending_events bigint) returns setof record
Get info about particular consumer on particular queue.
pgq.version() returns text
Returns version string for pgq.
pgq.get_batch_info(in x_batch_id bigint,
out queue_name text,
out consumer_name text,
out batch_start timestamptz,
out batch_end timestamptz,
out prev_tick_id bigint,
out tick_id bigint,
out lag interval,
out seq_start bigint,
out seq_end bigint) as
Returns detailed info about a batch.
Close