Database functions for cascaded pgq.
Worker is consumer that also copies to queue contents to local node (branch), so it can act as provider to other nodes. There can be only one worker per node. Or zero if node is leaf. In addition to cascaded consumer logic above, it must -
Functions | Database functions for cascaded pgq. |
Global Node Map | |
pgq_node. | Add new node location. |
pgq_node. | Drop unreferenced node. |
pgq_node. | Get node list for the queue. |
Node operations | |
pgq_node. | Initialize node. |
pgq_node. | Drop node. |
pgq_node. | Get local node info for cascaded queue. |
pgq_node. | Checs if node is root. |
pgq_node. | Checs if node is leaf. |
pgq_node. | Get subscriber list for the local node. |
pgq_node. | Get consumer list that work on the local node. |
pgq_node. | Multi-step root demotion to branch. |
pgq_node. | Promote branch node to root. |
pgq_node. | Set node attributes. |
Provider side operations - worker | |
pgq_node. | Subscribe remote node to local node at custom position. |
pgq_node. | Unsubscribe remote node from local node. |
pgq_node. | Notify provider about subscribers lowest watermark. |
Subscriber side operations - worker | |
pgq_node. | Get info for consumer that maintains local node. |
pgq_node. | Move global watermark on branch/leaf, publish on root. |
pgq_node. | Move merge-leaf position on combined-branch. |
Subscriber side operations - any consumer | |
pgq_node. | Subscribe plain cascaded consumer to a target node. |
pgq_node. | Unregister cascaded consumer from local node. |
pgq_node. | Get info for cascaded consumer that targets local node. |
pgq_node. | Change provider for this consumer. |
pgq_node. | Set consumer uptodate flag..... |
pgq_node. | Set consumer paused flag. |
pgq_node. | Set last completed tick id for the cascaded consumer that it has committed to local node. |
pgq_node. | If batch processing fails, consumer can store it’s last error in db. |
Maintenance operations | |
pgq_node. | Move global watermark on root node. |
pgq_node. | Returns version string for pgq_node. |
pgq_node.register_location( in i_queue_name text, in i_node_name text, in i_node_location text, in i_dead boolean, out ret_code int4, out ret_note text ) returns record
Add new node location.
i_queue_name | queue name |
i_node_name | node name |
i_node_location | node connect string |
i_dead | dead flag for node |
ret_code | error code |
ret_note | error description |
200 | Ok |
pgq_node.unregister_location( in i_queue_name text, in i_node_name text, out ret_code int4, out ret_note text ) returns record
Drop unreferenced node.
i_queue_name | queue name |
i_node_name | node to drop |
ret_code | error code |
ret_note | error description |
200 | Ok |
301 | Location not found |
403 | Cannot drop node’s own or parent location |
pgq_node.get_queue_locations( in i_queue_name text, out node_name text, out node_location text, out dead boolean ) returns setof record
Get node list for the queue.
i_queue_name | queue name |
node_name | node name |
node_location | libpq connect string for the node |
dead | whether the node should be considered dead |
pgq_node.create_node( in i_queue_name text, in i_node_type text, in i_node_name text, in i_worker_name text, in i_provider_name text, in i_global_watermark bigint, in i_combined_queue text, out ret_code int4, out ret_note text ) returns record
Initialize node.
i_node_name | cascaded queue name |
i_node_type | node type |
i_node_name | node name |
i_worker_name | worker consumer name |
i_provider_name | provider node name for non-root nodes |
i_global_watermark | global lowest tick_id |
i_combined_queue | merge-leaf: target queue |
200 | Ok |
401 | node already initialized |
???? | maybe we coud use more error codes ? |
root | master node |
branch | subscriber node that can be provider to others |
leaf | subscriber node that cannot be provider to others Calls: None Tables directly manipulated: None |
pgq_node.drop_node( in i_queue_name text, in i_node_name text, out ret_code int4, out ret_note text ) returns record
Drop node. This needs to be run on all the members of a set to properly get rid of the node.
i_queue_name | queue name |
i_node_name | node_name |
ret_code | error code |
ret_note | error description |
200 | Ok |
304 | No such queue |
406 | That is a provider Calls: None Tables directly manipulated: None |
pgq_node.get_node_info( in i_queue_name text, out ret_code int4, out ret_note text, out node_type text, out node_name text, out global_watermark bigint, out local_watermark bigint, out provider_node text, out provider_location text, out combined_queue text, out combined_type text, out worker_name text, out worker_paused bool, out worker_uptodate bool, out worker_last_tick bigint, out node_attrs text ) returns record
Get local node info for cascaded queue.
i_queue_name | cascaded queue name |
node_type | local node type |
node_name | local node name |
global_watermark | queue’s global watermark |
local_watermark | queue’s local watermark, for this and below nodes |
provider_node | provider node name |
provider_location | provider connect string |
combined_queue | queue name for target set |
combined_type | node type of target set |
worker_name | consumer name that maintains this node |
worker_paused | is worker paused |
worker_uptodate | is worker seen the changes |
worker_last_tick | last committed tick_id by worker |
node_attrs | urlencoded dict of random attrs for worker (eg. sync_watermark) |
pgq_node.get_subscriber_info( in i_queue_name text, out node_name text, out worker_name text, out node_watermark int8 ) returns setof record
Get subscriber list for the local node.
It may be out-of-date, due to in-progress administrative change. Node’s local provider info ( pgq_node.get_node_info() or pgq_node.get_worker_state(1) ) is the authoritative source.
i_queue_name | cascaded queue name |
node_name | node name that uses current node as provider |
worker_name | consumer that maintains remote node |
local_watermark | lowest tick_id on subscriber |
pgq_node.get_consumer_info( in i_queue_name text, out consumer_name text, out provider_node text, out last_tick_id int8, out paused boolean, out uptodate boolean, out cur_error text ) returns setof record
Get consumer list that work on the local node.
i_queue_name | cascaded queue name |
consumer_name | cascaded consumer name |
provider_node | node from where the consumer reads from |
last_tick_id | last committed tick |
paused | if consumer is paused |
uptodate | if consumer is uptodate |
cur_error | failure reason |
pgq_node.demote_root( in i_queue_name text, in i_step int4, in i_new_provider text, out ret_code int4, out ret_note text, out last_tick int8 ) as
Multi-step root demotion to branch.
Step 1 | disable writing to queue. |
Step 2 | wait until writers go away, do tick. |
Step 3 | change type, register. |
i_queue_name | queue name |
i_step | step number |
i_new_provider | new provider node Returns: |
200 | success |
404 | node not initialized for queue |
301 | node is not root |
pgq_node.register_subscriber( in i_queue_name text, in i_remote_node_name text, in i_remote_worker_name text, in i_custom_tick_id int8, out ret_code int4, out ret_note text, out global_watermark bigint ) returns record
Subscribe remote node to local node at custom position. Should be used when changing provider for existing node.
i_node_name | set name |
i_remote_node_name | node name |
i_remote_worker_name | consumer name |
i_custom_tick_id | tick id [optional] |
ret_code | error code |
ret_note | description |
global_watermark | minimal watermark |
pgq_node.unregister_subscriber( in i_queue_name text, in i_remote_node_name text, out ret_code int4, out ret_note text ) returns record
Unsubscribe remote node from local node.
i_queue_name | set name |
i_remote_node_name | node name |
ret_code | error code |
ret_note | description |
pgq_node.set_subscriber_watermark( in i_queue_name text, in i_node_name text, in i_watermark bigint, out ret_code int4, out ret_note text ) returns record
Notify provider about subscribers lowest watermark.
Called on provider at interval by each worker
i_queue_name | cascaded queue name |
i_node_name | subscriber node name |
i_watermark | tick_id |
ret_code | error code |
ret_note | description |
pgq_node.get_worker_state( in i_queue_name text, out ret_code int4, out ret_note text, out node_type text, out node_name text, out completed_tick bigint, out provider_node text, out provider_location text, out paused boolean, out uptodate boolean, out cur_error text, out worker_name text, out global_watermark bigint, out local_watermark bigint, out local_queue_top bigint, out combined_queue text, out combined_type text ) returns record
Get info for consumer that maintains local node.
i_queue_name | cascaded queue name |
node_type | local node type |
node_name | local node name |
completed_tick | last committed tick |
provider_node | provider node name |
provider_location | connect string to provider node |
paused | this node should not do any work |
uptodate | if consumer has loaded last changes |
cur_error | failure reason |
pgq_node.set_global_watermark( in i_queue_name text, in i_watermark bigint, out ret_code int4, out ret_note text ) returns record
Move global watermark on branch/leaf, publish on root.
i_queue_name | queue name |
i_watermark | global tick_id that is processed everywhere. NULL on root, then local wm is published. |
pgq_node.set_partition_watermark( in i_combined_queue_name text, in i_part_queue_name text, in i_watermark bigint, out ret_code int4, out ret_note text ) returns record
Move merge-leaf position on combined-branch.
i_combined_queue_name | local combined queue name |
i_part_queue_name | local part queue name (merge-leaf) |
i_watermark | partition tick_id that came inside combined-root batch |
200 | success |
201 | no partition queue |
401 | worker registration not found |
pgq_node.register_consumer( in i_queue_name text, in i_consumer_name text, in i_provider_node text, in i_custom_tick_id int8, out ret_code int4, out ret_note text ) returns record
Subscribe plain cascaded consumer to a target node. That means it’s planning to read from remote node and write to local node.
i_queue_name | set name |
i_consumer_name | cascaded consumer name |
i_provider_node | node name |
i_custom_tick_id | tick id |
ret_code | error code |
200 | ok |
201 | already registered |
401 | no such queue |
ret_note | description |
pgq_node.unregister_consumer( in i_queue_name text, in i_consumer_name text, out ret_code int4, out ret_note text ) returns record
Unregister cascaded consumer from local node.
i_queue_name | cascaded queue name |
i_consumer_name | cascaded consumer name |
ret_code | error code |
200 | ok |
404 | no such queue |
ret_note | description |
pgq_node.get_consumer_state( in i_queue_name text, in i_consumer_name text, out ret_code int4, out ret_note text, out node_type text, out node_name text, out completed_tick bigint, out provider_node text, out provider_location text, out paused boolean, out uptodate boolean, out cur_error text ) returns record
Get info for cascaded consumer that targets local node.
i_node_name | cascaded queue name |
i_consumer_name | cascaded consumer name |
node_type | local node type |
node_name | local node name |
completed_tick | last committed tick |
provider_node | provider node name |
provider_location | connect string to provider node |
paused | this node should not do any work |
uptodate | if consumer has loaded last changes |
cur_error | failure reason |
pgq_node.change_consumer_provider( in i_queue_name text, in i_consumer_name text, in i_new_provider text, out ret_code int4, out ret_note text ) as
Change provider for this consumer.
i_queue_name | queue name |
i_consumer_name | consumer name |
i_new_provider | node name for new provider Returns: |
ret_code | error code |
200 | ok |
404 | no such consumer or new node |
ret_note | description |
pgq_node.set_consumer_uptodate( in i_queue_name text, in i_consumer_name text, in i_uptodate boolean, out ret_code int4, out ret_note text ) returns record
Set consumer uptodate flag.....
i_queue_name | queue name |
i_consumer_name | consumer name |
i_uptodate | new flag state |
200 | ok |
404 | consumer not known |
pgq_node.set_consumer_paused( in i_queue_name text, in i_consumer_name text, in i_paused boolean, out ret_code int4, out ret_note text ) as
Set consumer paused flag.
i_queue_name | cascaded queue name |
i_consumer_name | cascaded consumer name |
i_paused | new flag state Returns: |
200 | ok |
201 | already paused |
404 | consumer not found |
pgq_node.set_consumer_completed( in i_queue_name text, in i_consumer_name text, in i_tick_id int8, out ret_code int4, out ret_note text ) as
Set last completed tick id for the cascaded consumer that it has committed to local node.
i_queue_name | cascaded queue name |
i_consumer_name | cascaded consumer name |
i_tick_id | tick id Returns: |
200 | ok |
404 | consumer not known |
Add new node location.
pgq_node.register_location( in i_queue_name text, in i_node_name text, in i_node_location text, in i_dead boolean, out ret_code int4, out ret_note text ) returns record
Drop unreferenced node.
pgq_node.unregister_location( in i_queue_name text, in i_node_name text, out ret_code int4, out ret_note text ) returns record
Get node list for the queue.
pgq_node.get_queue_locations( in i_queue_name text, out node_name text, out node_location text, out dead boolean ) returns setof record
Initialize node.
pgq_node.create_node( in i_queue_name text, in i_node_type text, in i_node_name text, in i_worker_name text, in i_provider_name text, in i_global_watermark bigint, in i_combined_queue text, out ret_code int4, out ret_note text ) returns record
Drop node.
pgq_node.drop_node( in i_queue_name text, in i_node_name text, out ret_code int4, out ret_note text ) returns record
Get local node info for cascaded queue.
pgq_node.get_node_info( in i_queue_name text, out ret_code int4, out ret_note text, out node_type text, out node_name text, out global_watermark bigint, out local_watermark bigint, out provider_node text, out provider_location text, out combined_queue text, out combined_type text, out worker_name text, out worker_paused bool, out worker_uptodate bool, out worker_last_tick bigint, out node_attrs text ) returns record
Checs if node is root.
pgq_node.is_root_node( i_queue_name text ) returns bool
Checs if node is leaf.
pgq_node.is_leaf_node( i_queue_name text ) returns bool
Get subscriber list for the local node.
pgq_node.get_subscriber_info( in i_queue_name text, out node_name text, out worker_name text, out node_watermark int8 ) returns setof record
Get consumer list that work on the local node.
pgq_node.get_consumer_info( in i_queue_name text, out consumer_name text, out provider_node text, out last_tick_id int8, out paused boolean, out uptodate boolean, out cur_error text ) returns setof record
Multi-step root demotion to branch.
pgq_node.demote_root( in i_queue_name text, in i_step int4, in i_new_provider text, out ret_code int4, out ret_note text, out last_tick int8 ) as
Promote branch node to root.
pgq_node.promote_branch( in i_queue_name text, out ret_code int4, out ret_note text ) as
Subscribe remote node to local node at custom position.
pgq_node.register_subscriber( in i_queue_name text, in i_remote_node_name text, in i_remote_worker_name text, in i_custom_tick_id int8, out ret_code int4, out ret_note text, out global_watermark bigint ) returns record
Unsubscribe remote node from local node.
pgq_node.unregister_subscriber( in i_queue_name text, in i_remote_node_name text, out ret_code int4, out ret_note text ) returns record
Notify provider about subscribers lowest watermark.
pgq_node.set_subscriber_watermark( in i_queue_name text, in i_node_name text, in i_watermark bigint, out ret_code int4, out ret_note text ) returns record
Get info for consumer that maintains local node.
pgq_node.get_worker_state( in i_queue_name text, out ret_code int4, out ret_note text, out node_type text, out node_name text, out completed_tick bigint, out provider_node text, out provider_location text, out paused boolean, out uptodate boolean, out cur_error text, out worker_name text, out global_watermark bigint, out local_watermark bigint, out local_queue_top bigint, out combined_queue text, out combined_type text ) returns record
Move global watermark on branch/leaf, publish on root.
pgq_node.set_global_watermark( in i_queue_name text, in i_watermark bigint, out ret_code int4, out ret_note text ) returns record
Move merge-leaf position on combined-branch.
pgq_node.set_partition_watermark( in i_combined_queue_name text, in i_part_queue_name text, in i_watermark bigint, out ret_code int4, out ret_note text ) returns record
Subscribe plain cascaded consumer to a target node.
pgq_node.register_consumer( in i_queue_name text, in i_consumer_name text, in i_provider_node text, in i_custom_tick_id int8, out ret_code int4, out ret_note text ) returns record
Unregister cascaded consumer from local node.
pgq_node.unregister_consumer( in i_queue_name text, in i_consumer_name text, out ret_code int4, out ret_note text ) returns record
Get info for cascaded consumer that targets local node.
pgq_node.get_consumer_state( in i_queue_name text, in i_consumer_name text, out ret_code int4, out ret_note text, out node_type text, out node_name text, out completed_tick bigint, out provider_node text, out provider_location text, out paused boolean, out uptodate boolean, out cur_error text ) returns record
Change provider for this consumer.
pgq_node.change_consumer_provider( in i_queue_name text, in i_consumer_name text, in i_new_provider text, out ret_code int4, out ret_note text ) as
Set consumer uptodate flag.....
pgq_node.set_consumer_uptodate( in i_queue_name text, in i_consumer_name text, in i_uptodate boolean, out ret_code int4, out ret_note text ) returns record
Set consumer paused flag.
pgq_node.set_consumer_paused( in i_queue_name text, in i_consumer_name text, in i_paused boolean, out ret_code int4, out ret_note text ) as
Set last completed tick id for the cascaded consumer that it has committed to local node.
pgq_node.set_consumer_completed( in i_queue_name text, in i_consumer_name text, in i_tick_id int8, out ret_code int4, out ret_note text ) as
If batch processing fails, consumer can store it’s last error in db.
pgq_node.set_consumer_error( in i_queue_name text, in i_consumer_name text, in i_error_msg text, out ret_code int4, out ret_note text ) as
Move global watermark on root node.
pgq_node.maint_watermark( i_queue_name text ) returns int4
Returns version string for pgq_node.
pgq_node.version() returns text