The queue is used by a client in the following steps
1. Register the client (a queue consumer)
pgq.register_consumer(queue_name, consumer_id)
2. run a loop createing, consuming and closing batches
2a. pgq.get_batch_events(batch_id int8) | returns an int8 batch handle |
2b. pgq.get_batch_events(batch_id int8) | returns a set of events for current batch |
the event structure is :(ev_id int8, ev_time timestamptz, ev_txid int8, ev_retry int4, ev_type text, ev_data text, ev_extra1, ev_extra2, ev_extra3, ev_extra4)
2c. if any of the events need to be tagged as failed, use a the function
pgq.event_failed(batch_id int8, event_id int8, reason text)
2d. if you want the event to be re-inserted in the main queue afrer N seconds, use
pgq.event_retry(batch_id int8, event_id int8, retry_seconds int4)
2e. To finish processing and release the batch, use
pgq.finish_batch(batch_id int8)
Until this is not done, the consumer will get same batch again.
After calling finish_batch consumer cannot do any operations with events of that batch. All operations must be done before.
-- ----------------------------------------------------------------------
Public Functions | The queue is used by a client in the following steps |
Queue creation | |
pgq. | Creates new queue with given name. |
pgq. | Drop queue and all associated tables. |
pgq. | Drop queue and all associated tables. |
pgq. | Set configuration for specified queue. |
Event publishing | |
pgq. | Insert a event into queue. |
pgq. | Insert a event into queue with all the extra fields. |
pgq. | Return active event table for particular queue. |
Subscribing to queue | |
pgq. | Subscribe consumer on a queue. |
pgq. | Extended registration, allows to specify tick_id. |
pgq. | Unsubscribe consumer from the queue. |
Batch processing | |
pgq. | Makes next block of events active. |
pgq. | Old function that returns just batch_id. |
pgq. | Makes next block of events active. |
pgq. | Get all events in batch. |
pgq. | Get events in batch using a cursor. |
pgq. | Get events in batch using a cursor. |
pgq. | Put the event into retry queue, to be processed again later. |
pgq. | Put the event into retry queue, to be processed later again. |
pgq. | Put whole batch into retry queue, to be processed again later. |
pgq. | Closes a batch. |
General info functions | |
pgq. | Get info about all queues. |
pgq. | Get info about particular queue. |
pgq. | Returns info about all consumers on all queues. |
pgq. | Returns info about all consumers on single queue. |
pgq. | Get info about particular consumer on particular queue. |
pgq. | Returns version string for pgq. |
pgq. | Returns detailed info about a batch. |
pgq.create_queue( i_queue_name text ) returns integer
Creates new queue with given name.
0 | queue already exists |
1 | queue created Calls: pgq.grant_perms(i_queue_name); pgq.ticker(i_queue_name); pgq.tune_storage(i_queue_name); Tables directly manipulated: |
insert | pgq.queue |
create | pgq.event_N () inherits (pgq.event_template) |
create | pgq.event_N_0 .. pgq.event_N_M () inherits (pgq.event_N) |
pgq.drop_queue( x_queue_name text, x_force bool ) returns integer
Drop queue and all associated tables.
x_queue_name | queue name |
x_force | ignore (drop) existing consumers Returns: |
1 | success Calls: pgq.unregister_consumer(queue_name, consumer_name) perform pgq.ticker(i_queue_name); perform pgq.tune_storage(i_queue_name); Tables directly manipulated: |
delete | pgq.queue |
drop | pgq.event_N (), pgq.event_N_0 .. pgq.event_N_M |
pgq.set_queue_config( x_queue_name text, x_param_name text, x_param_value text ) returns integer
Set configuration for specified queue.
x_queue_name | Name of the queue to configure. |
x_param_name | Configuration parameter name. |
x_param_value | Configuration parameter value. |
0 if event was already in queue, 1 otherwise. Calls: None Tables directly manipulated: update - pgq.queue
pgq.insert_event( queue_name text, ev_type text, ev_data text, ev_extra1 text, ev_extra2 text, ev_extra3 text, ev_extra4 text ) returns bigint
Insert a event into queue with all the extra fields.
queue_name | Name of the queue |
ev_type | User-specified type for the event |
ev_data | User data for the event |
ev_extra1 | Extra data field for the event |
ev_extra2 | Extra data field for the event |
ev_extra3 | Extra data field for the event |
ev_extra4 | Extra data field for the event |
Event ID Calls: pgq.insert_event_raw(11) Tables directly manipulated: insert - pgq.insert_event_raw(11), a C function, inserts into current event_N_M table
pgq.current_event_table( x_queue_name text ) returns text
Return active event table for particular queue. Event can be added to it without going via functions, e.g. by COPY.
If queue is disabled and GUC session_replication_role <> ‘replica’ then raises exception.
or expressed in a different way | an even table of a disabled queue is returned only on replica |
The result is valid only during current transaction.
Actual insertion requires superuser access.
x_queue_name | Queue name. |
pgq.register_consumer( x_queue_name text, x_consumer_id text ) returns integer
Subscribe consumer on a queue.
From this moment forward, consumer will see all events in the queue.
x_queue_name | Name of queue |
x_consumer_name | Name of consumer |
0 | if already registered |
1 | if new registration Calls: pgq.register_consumer_at(3) Tables directly manipulated: None |
pgq.register_consumer_at( x_queue_name text, x_consumer_name text, x_tick_pos bigint ) returns integer
Extended registration, allows to specify tick_id.
For usage in special situations.
x_queue_name | Name of a queue |
x_consumer_name | Name of consumer |
x_tick_pos | Tick ID |
0/1 whether consumer has already registered. Calls: None Tables directly manipulated: update/insert - pgq.subscription
pgq.unregister_consumer( x_queue_name text, x_consumer_name text ) returns integer
Unsubscribe consumer from the queue. Also consumer’s retry events are deleted.
x_queue_name | Name of the queue |
x_consumer_name | Name of the consumer |
number of (sub)consumers unregistered Calls: None Tables directly manipulated: delete - pgq.retry_queue delete - pgq.subscription
pgq.next_batch_info( in i_queue_name text, in i_consumer_name text, out batch_id int8, out cur_tick_id int8, out prev_tick_id int8, out cur_tick_time timestamptz, out prev_tick_time timestamptz, out cur_tick_event_seq int8, out prev_tick_event_seq int8 ) as
Makes next block of events active.
If it returns NULL, there is no events available in queue. Consumer should sleep then.
The values from event_id sequence may give hint how big the batch may be. But they are inexact, they do not give exact size. Client MUST NOT use them to detect whether the batch contains any events at all - the values are unfit for that purpose.
i_queue_name | Name of the queue |
i_consumer_name | Name of the consumer |
batch_id | Batch ID or NULL if there are no more events available. |
cur_tick_id | End tick id. |
cur_tick_time | End tick time. |
cur_tick_event_seq | Value from event id sequence at the time tick was issued. |
prev_tick_id | Start tick id. |
prev_tick_time | Start tick time. |
prev_tick_event_seq | value from event id sequence at the time tick was issued. Calls: pgq.next_batch_custom(5) Tables directly manipulated: None |
pgq.next_batch_custom( in i_queue_name text, in i_consumer_name text, in i_min_lag interval, in i_min_count int4, in i_min_interval interval, out batch_id int8, out cur_tick_id int8, out prev_tick_id int8, out cur_tick_time timestamptz, out prev_tick_time timestamptz, out cur_tick_event_seq int8, out prev_tick_event_seq int8 ) as
Makes next block of events active. Block size can be tuned with i_min_count, i_min_interval parameters. Events age can be tuned with i_min_lag.
If it returns NULL, there is no events available in queue. Consumer should sleep then.
The values from event_id sequence may give hint how big the batch may be. But they are inexact, they do not give exact size. Client MUST NOT use them to detect whether the batch contains any events at all - the values are unfit for that purpose.
i_min_lag together with i_min_interval/i_min_count is inefficient.
i_queue_name | Name of the queue |
i_consumer_name | Name of the consumer |
i_min_lag | Consumer wants events older than that |
i_min_count | Consumer wants batch to contain at least this many events |
i_min_interval | Consumer wants batch to cover at least this much time |
batch_id | Batch ID or NULL if there are no more events available. |
cur_tick_id | End tick id. |
cur_tick_time | End tick time. |
cur_tick_event_seq | Value from event id sequence at the time tick was issued. |
prev_tick_id | Start tick id. |
prev_tick_time | Start tick time. |
prev_tick_event_seq | value from event id sequence at the time tick was issued. Calls: pgq.insert_event_raw(11) Tables directly manipulated: |
update | pgq.subscription |
pgq.get_batch_events( in x_batch_id bigint, out ev_id bigint, out ev_time timestamptz, out ev_txid bigint, out ev_retry int4, out ev_type text, out ev_data text, out ev_extra1 text, out ev_extra2 text, out ev_extra3 text, out ev_extra4 text ) returns setof record
Get all events in batch.
x_batch_id | ID of active batch. |
List of events.
pgq.get_batch_cursor( in i_batch_id bigint, in i_cursor_name text, in i_quick_limit int4, in i_extra_where text, out ev_id bigint, out ev_time timestamptz, out ev_txid bigint, out ev_retry int4, out ev_type text, out ev_data text, out ev_extra1 text, out ev_extra2 text, out ev_extra3 text, out ev_extra4 text ) returns setof record
Get events in batch using a cursor.
i_batch_id | ID of active batch. |
i_cursor_name | Name for new cursor |
i_quick_limit | Number of events to return immediately |
i_extra_where | optional where clause to filter events |
List of events. Calls: pgq.batch_event_sql(i_batch_id) - internal function which generates SQL optimised specially for getting events in this batch
pgq.get_batch_cursor( in i_batch_id bigint, in i_cursor_name text, in i_quick_limit int4, out ev_id bigint, out ev_time timestamptz, out ev_txid bigint, out ev_retry int4, out ev_type text, out ev_data text, out ev_extra1 text, out ev_extra2 text, out ev_extra3 text, out ev_extra4 text ) returns setof record
Get events in batch using a cursor.
i_batch_id | ID of active batch. |
i_cursor_name | Name for new cursor |
i_quick_limit | Number of events to return immediately |
List of events. Calls: pgq.get_batch_cursor(4)
pgq.event_retry( x_batch_id bigint, x_event_id bigint, x_retry_time timestamptz ) returns integer
Put the event into retry queue, to be processed again later.
x_batch_id | ID of active batch. |
x_event_id | event id |
x_retry_time | Time when the event should be put back into queue |
1 | success |
0 | event already in retry queue Calls: None Tables directly manipulated: |
insert | pgq.retry_queue |
pgq.event_retry( x_batch_id bigint, x_event_id bigint, x_retry_seconds integer ) returns integer
Put the event into retry queue, to be processed later again.
x_batch_id | ID of active batch. |
x_event_id | event id |
x_retry_seconds | Time when the event should be put back into queue |
1 | success |
0 | event already in retry queue Calls: pgq.event_retry(3a) Tables directly manipulated: None |
pgq.batch_retry( i_batch_id bigint, i_retry_seconds integer ) returns integer
Put whole batch into retry queue, to be processed again later.
i_batch_id | ID of active batch. |
i_retry_time | Time when the event should be put back into queue |
number of events inserted Calls: None Tables directly manipulated: pgq.retry_queue
pgq.get_queue_info( out queue_name text, out queue_ntables integer, out queue_cur_table integer, out queue_rotation_period interval, out queue_switch_time timestamptz, out queue_external_ticker boolean, out queue_ticker_paused boolean, out queue_ticker_max_count integer, out queue_ticker_max_lag interval, out queue_ticker_idle_period interval, out ticker_lag interval, out ev_per_sec float8, out ev_new bigint, out last_tick_id bigint ) returns setof record
Get info about all queues.
List of pgq.ret_queue_info records. queue_name - queue name queue_ntables - number of tables in this queue queue_cur_table - ??? queue_rotation_period - how often the event_N_M tables in this queue are rotated queue_switch_time - ??? when was this queue last rotated queue_external_ticker - ??? queue_ticker_paused - ??? is ticker paused in this queue queue_ticker_max_count - max number of events before a tick is issued queue_ticker_max_lag - maks time without a tick queue_ticker_idle_period - how often the ticker should check this queue ticker_lag - time from last tick ev_per_sec - how many events per second this queue serves ev_new - ??? last_tick_id - last tick id for this queue
pgq.get_queue_info( in i_queue_name text, out queue_name text, out queue_ntables integer, out queue_cur_table integer, out queue_rotation_period interval, out queue_switch_time timestamptz, out queue_external_ticker boolean, out queue_ticker_paused boolean, out queue_ticker_max_count integer, out queue_ticker_max_lag interval, out queue_ticker_idle_period interval, out ticker_lag interval, out ev_per_sec float8, out ev_new bigint, out last_tick_id bigint ) returns setof record
Get info about particular queue.
One pgq.ret_queue_info record. contente same as forpgq.get_queue_info()
pgq.get_consumer_info( out queue_name text, out consumer_name text, out lag interval, out last_seen interval, out last_tick bigint, out current_batch bigint, out next_tick bigint, out pending_events bigint ) returns setof record
Returns info about all consumers on all queues.
See pgq.get_consumer_info(2)
pgq.get_consumer_info( in i_queue_name text, out queue_name text, out consumer_name text, out lag interval, out last_seen interval, out last_tick bigint, out current_batch bigint, out next_tick bigint, out pending_events bigint ) returns setof record
Returns info about all consumers on single queue.
See pgq.get_consumer_info(2)
pgq.get_consumer_info( in i_queue_name text, in i_consumer_name text, out queue_name text, out consumer_name text, out lag interval, out last_seen interval, out last_tick bigint, out current_batch bigint, out next_tick bigint, out pending_events bigint ) returns setof record
Get info about particular consumer on particular queue.
i_queue_name | name of a queue. (null = all) |
i_consumer_name | name of a consumer (null = all) |
queue_name | Queue name |
consumer_name | Consumer name |
lag | How old are events the consumer is processing |
last_seen | When the consumer seen by pgq |
last_tick | Tick ID of last processed tick |
current_batch | Current batch ID, if one is active or NULL |
next_tick | If batch is active, then its final tick. |
pgq.get_batch_info( in x_batch_id bigint, out queue_name text, out consumer_name text, out batch_start timestamptz, out batch_end timestamptz, out prev_tick_id bigint, out tick_id bigint, out lag interval, out seq_start bigint, out seq_end bigint ) as
Returns detailed info about a batch.
x_batch_id | id of a active batch. |
Returns: ??? pls check queue_name - which queue this batch came from consumer_name - batch processed by batch_start - start time of batch batch_end - end time of batch prev_tick_id - start tick for this batch tick_id - end tick for this batch lag - now() - tick_id.time seq_start - start event id for batch seq_end - end event id for batch
Creates new queue with given name.
pgq.create_queue( i_queue_name text ) returns integer
Drop queue and all associated tables.
pgq.drop_queue( x_queue_name text, x_force bool ) returns integer
Drop queue and all associated tables.
pgq.drop_queue( x_queue_name text ) returns integer
Set configuration for specified queue.
pgq.set_queue_config( x_queue_name text, x_param_name text, x_param_value text ) returns integer
Insert a event into queue.
pgq.insert_event( queue_name text, ev_type text, ev_data text ) returns bigint
Insert a event into queue with all the extra fields.
pgq.insert_event( queue_name text, ev_type text, ev_data text, ev_extra1 text, ev_extra2 text, ev_extra3 text, ev_extra4 text ) returns bigint
Return active event table for particular queue.
pgq.current_event_table( x_queue_name text ) returns text
Subscribe consumer on a queue.
pgq.register_consumer( x_queue_name text, x_consumer_id text ) returns integer
Extended registration, allows to specify tick_id.
pgq.register_consumer_at( x_queue_name text, x_consumer_name text, x_tick_pos bigint ) returns integer
Unsubscribe consumer from the queue.
pgq.unregister_consumer( x_queue_name text, x_consumer_name text ) returns integer
Makes next block of events active.
pgq.next_batch_info( in i_queue_name text, in i_consumer_name text, out batch_id int8, out cur_tick_id int8, out prev_tick_id int8, out cur_tick_time timestamptz, out prev_tick_time timestamptz, out cur_tick_event_seq int8, out prev_tick_event_seq int8 ) as
Old function that returns just batch_id.
pgq.next_batch( in i_queue_name text, in i_consumer_name text ) returns int8
Makes next block of events active.
pgq.next_batch_custom( in i_queue_name text, in i_consumer_name text, in i_min_lag interval, in i_min_count int4, in i_min_interval interval, out batch_id int8, out cur_tick_id int8, out prev_tick_id int8, out cur_tick_time timestamptz, out prev_tick_time timestamptz, out cur_tick_event_seq int8, out prev_tick_event_seq int8 ) as
Get all events in batch.
pgq.get_batch_events( in x_batch_id bigint, out ev_id bigint, out ev_time timestamptz, out ev_txid bigint, out ev_retry int4, out ev_type text, out ev_data text, out ev_extra1 text, out ev_extra2 text, out ev_extra3 text, out ev_extra4 text ) returns setof record
Get events in batch using a cursor.
pgq.get_batch_cursor( in i_batch_id bigint, in i_cursor_name text, in i_quick_limit int4, in i_extra_where text, out ev_id bigint, out ev_time timestamptz, out ev_txid bigint, out ev_retry int4, out ev_type text, out ev_data text, out ev_extra1 text, out ev_extra2 text, out ev_extra3 text, out ev_extra4 text ) returns setof record
Get events in batch using a cursor.
pgq.get_batch_cursor( in i_batch_id bigint, in i_cursor_name text, in i_quick_limit int4, out ev_id bigint, out ev_time timestamptz, out ev_txid bigint, out ev_retry int4, out ev_type text, out ev_data text, out ev_extra1 text, out ev_extra2 text, out ev_extra3 text, out ev_extra4 text ) returns setof record
Put the event into retry queue, to be processed again later.
pgq.event_retry( x_batch_id bigint, x_event_id bigint, x_retry_time timestamptz ) returns integer
Put the event into retry queue, to be processed later again.
pgq.event_retry( x_batch_id bigint, x_event_id bigint, x_retry_seconds integer ) returns integer
Put whole batch into retry queue, to be processed again later.
pgq.batch_retry( i_batch_id bigint, i_retry_seconds integer ) returns integer
Closes a batch.
pgq.finish_batch( x_batch_id bigint ) returns integer
Get info about all queues.
pgq.get_queue_info( out queue_name text, out queue_ntables integer, out queue_cur_table integer, out queue_rotation_period interval, out queue_switch_time timestamptz, out queue_external_ticker boolean, out queue_ticker_paused boolean, out queue_ticker_max_count integer, out queue_ticker_max_lag interval, out queue_ticker_idle_period interval, out ticker_lag interval, out ev_per_sec float8, out ev_new bigint, out last_tick_id bigint ) returns setof record
Get info about particular queue.
pgq.get_queue_info( in i_queue_name text, out queue_name text, out queue_ntables integer, out queue_cur_table integer, out queue_rotation_period interval, out queue_switch_time timestamptz, out queue_external_ticker boolean, out queue_ticker_paused boolean, out queue_ticker_max_count integer, out queue_ticker_max_lag interval, out queue_ticker_idle_period interval, out ticker_lag interval, out ev_per_sec float8, out ev_new bigint, out last_tick_id bigint ) returns setof record
Returns info about all consumers on all queues.
pgq.get_consumer_info( out queue_name text, out consumer_name text, out lag interval, out last_seen interval, out last_tick bigint, out current_batch bigint, out next_tick bigint, out pending_events bigint ) returns setof record
Returns info about all consumers on single queue.
pgq.get_consumer_info( in i_queue_name text, out queue_name text, out consumer_name text, out lag interval, out last_seen interval, out last_tick bigint, out current_batch bigint, out next_tick bigint, out pending_events bigint ) returns setof record
Get info about particular consumer on particular queue.
pgq.get_consumer_info( in i_queue_name text, in i_consumer_name text, out queue_name text, out consumer_name text, out lag interval, out last_seen interval, out last_tick bigint, out current_batch bigint, out next_tick bigint, out pending_events bigint ) returns setof record
Returns version string for pgq.
pgq.version() returns text
Returns detailed info about a batch.
pgq.get_batch_info( in x_batch_id bigint, out queue_name text, out consumer_name text, out batch_start timestamptz, out batch_end timestamptz, out prev_tick_id bigint, out tick_id bigint, out lag interval, out seq_start bigint, out seq_end bigint ) as