Functions

Database functions for cascaded pgq.

Cascaded consumer flow

  • (1) [target] call pgq_node.get_consumer_state()
  • (2) If .paused is true, sleep, go to (1).  This is allows to control consumer remotely.
  • (3) If .uptodate is false, call pgq_node.set_consumer_uptodate(true).  This allows remote controller to know that consumer has seen the changes.
  • (4) [source] call pgq.next_batch().  If returns NULL, sleep, goto (1)
  • (5) [source] if batch already done, call pgq.finish_batch(), go to (1)
  • (6) [source] read events
  • (7) [target] process events, call pgq_node.set_consumer_completed() in same tx.
  • (8) [source] call pgq.finish_batch()

Cascaded worker flow

Worker is consumer that also copies to queue contents to local node (branch), so it can act as provider to other nodes.  There can be only one worker per node.  Or zero if node is leaf.  In addition to cascaded consumer logic above, it must -

  • [branch] copy all events to local queue and create ticks
  • [merge-leaf] copy all events to combined-queue
  • [branch] publish local watermark upwards to provider so it reaches root.
  • [branch] apply global watermark event to local node
  • [merge-leaf] wait-behind on combined-branch (failover combined-root).  It’s last_tick_id is set by combined-branch worker, it must call pgq.next_batch()+pgq.finish_batch() without processing events when behind, but not move further.  When the combined-branch becomes root, it will be in right position to continue updating.
Summary
FunctionsDatabase functions for cascaded pgq.
Global Node Map
pgq_node.register_location(4)Add new node location.
pgq_node.unregister_location(2)Drop unreferenced node.
pgq_node.get_queue_locations(1)Get node list for the queue.
Node operations
pgq_node.create_node(7)Initialize node.
pgq_node.drop_node(2)Drop node.
pgq_node.get_node_info(1)Get local node info for cascaded queue.
pgq_node.is_root_node(1)Checs if node is root.
pgq_node.is_leaf_node(1)Checs if node is leaf.
pgq_node.get_subscriber_info(1)Get subscriber list for the local node.
pgq_node.get_consumer_info(1)Get consumer list that work on the local node.
pgq_node.demote_root(3)Multi-step root demotion to branch.
pgq_node.promote_branch(1)Promote branch node to root.
pgq_node.create_attrs(2)Set node attributes.
Provider side operations - worker
pgq_node.register_subscriber(4)Subscribe remote node to local node at custom position.
pgq_node.unregister_subscriber(2)Unsubscribe remote node from local node.
pgq_node.set_subscriber_watermark(3)Notify provider about subscribers lowest watermark.
Subscriber side operations - worker
pgq_node.get_worker_state(1)Get info for consumer that maintains local node.
pgq_node.set_global_watermark(2)Move global watermark on branch/leaf, publish on root.
pgq_node.set_partition_watermark(3)Move merge-leaf position on combined-branch.
Subscriber side operations - any consumer
pgq_node.register_consumer(4)Subscribe plain cascaded consumer to a target node.
pgq_node.unregister_consumer(2)Unregister cascaded consumer from local node.
pgq_node.get_consumer_state(2)Get info for cascaded consumer that targets local node.
pgq_node.change_consumer_provider(3)Change provider for this consumer.
pgq_node.set_consumer_uptodate(3)Set consumer uptodate flag.....
pgq_node.set_consumer_paused(3)Set consumer paused flag.
pgq_node.set_consumer_completed(3)Set last completed tick id for the cascaded consumer that it has committed to local node.
pgq_node.set_consumer_error(3)If batch processing fails, consumer can store it’s last error in db.
Maintenance operations
pgq_node.maint_watermark(1)Move global watermark on root node.
pgq_node.version(0)Returns version string for pgq_node.

Global Node Map

pgq_node.register_location(4)

pgq_node.register_location(in i_queue_name text,
in i_node_name text,
in i_node_location text,
in i_dead boolean,
out ret_code int4,
out ret_note text) returns record

Add new node location.

Parameters

i_queue_namequeue name
i_node_namenode name
i_node_locationnode connect string
i_deaddead flag for node

Returns

ret_codeerror code
ret_noteerror description

Return Codes

200Ok

pgq_node.unregister_location(2)

pgq_node.unregister_location(in i_queue_name text,
in i_node_name text,
out ret_code int4,
out ret_note text) returns record

Drop unreferenced node.

Parameters

i_queue_namequeue name
i_node_namenode to drop

Returns

ret_codeerror code
ret_noteerror description

Return Codes

200Ok
301Location not found
403Cannot drop node’s own or parent location

pgq_node.get_queue_locations(1)

pgq_node.get_queue_locations(in i_queue_name text,
out node_name text,
out node_location text,
out dead boolean) returns setof record

Get node list for the queue.

Parameters

i_queue_namequeue name

Returns

node_namenode name
node_locationlibpq connect string for the node
deadwhether the node should be considered dead

Node operations

pgq_node.create_node(7)

pgq_node.create_node(in i_queue_name text,
in i_node_type text,
in i_node_name text,
in i_worker_name text,
in i_provider_name text,
in i_global_watermark bigint,
in i_combined_queue text,
out ret_code int4,
out ret_note text) returns record

Initialize node.

Parameters

i_node_namecascaded queue name
i_node_typenode type
i_node_namenode name
i_worker_nameworker consumer name
i_provider_nameprovider node name for non-root nodes
i_global_watermarkglobal lowest tick_id
i_combined_queuemerge-leaf: target queue

Returns

200Ok
401node already initialized
????maybe we coud use more error codes ?

Node Types

rootmaster node
branchsubscriber node that can be provider to others
leafsubscriber node that cannot be provider to others Calls: None Tables directly manipulated: None

pgq_node.drop_node(2)

pgq_node.drop_node(in i_queue_name text,
in i_node_name text,
out ret_code int4,
out ret_note text) returns record

Drop node.  This needs to be run on all the members of a set to properly get rid of the node.

Parameters

i_queue_namequeue name
i_node_namenode_name

Returns

ret_codeerror code
ret_noteerror description

Return Codes

200Ok
304No such queue
406That is a provider Calls: None Tables directly manipulated: None

pgq_node.get_node_info(1)

pgq_node.get_node_info(in i_queue_name text,
out ret_code int4,
out ret_note text,
out node_type text,
out node_name text,
out global_watermark bigint,
out local_watermark bigint,
out provider_node text,
out provider_location text,
out combined_queue text,
out combined_type text,
out worker_name text,
out worker_paused bool,
out worker_uptodate bool,
out worker_last_tick bigint,
out node_attrs text) returns record

Get local node info for cascaded queue.

Parameters

i_queue_namecascaded queue name

Returns

node_typelocal node type
node_namelocal node name
global_watermarkqueue’s global watermark
local_watermarkqueue’s local watermark, for this and below nodes
provider_nodeprovider node name
provider_locationprovider connect string
combined_queuequeue name for target set
combined_typenode type of target set
worker_nameconsumer name that maintains this node
worker_pausedis worker paused
worker_uptodateis worker seen the changes
worker_last_ticklast committed tick_id by worker
node_attrsurlencoded dict of random attrs for worker (eg. sync_watermark)

pgq_node.is_root_node(1)

pgq_node.is_root_node(i_queue_name text) returns bool

Checs if node is root.

Parameters

i_queue_namequeue name Returns:
trueif this this the root node for queue

pgq_node.is_leaf_node(1)

pgq_node.is_leaf_node(i_queue_name text) returns bool

Checs if node is leaf.

Parameters

i_queue_namequeue name Returns:
trueif this this the leaf node for queue

pgq_node.get_subscriber_info(1)

pgq_node.get_subscriber_info(in i_queue_name text,
out node_name text,
out worker_name text,
out node_watermark int8) returns setof record

Get subscriber list for the local node.

It may be out-of-date, due to in-progress administrative change.  Node’s local provider info ( pgq_node.get_node_info() or pgq_node.get_worker_state(1) ) is the authoritative source.

Parameters

i_queue_namecascaded queue name

Returns

node_namenode name that uses current node as provider
worker_nameconsumer that maintains remote node
local_watermarklowest tick_id on subscriber

pgq_node.get_consumer_info(1)

pgq_node.get_consumer_info(in i_queue_name text,
out consumer_name text,
out provider_node text,
out last_tick_id int8,
out paused boolean,
out uptodate boolean,
out cur_error text) returns setof record

Get consumer list that work on the local node.

Parameters

i_queue_namecascaded queue name

Returns

consumer_namecascaded consumer name
provider_nodenode from where the consumer reads from
last_tick_idlast committed tick
pausedif consumer is paused
uptodateif consumer is uptodate
cur_errorfailure reason

pgq_node.demote_root(3)

pgq_node.demote_root(in i_queue_name text,
in i_step int4,
in i_new_provider text,
out ret_code int4,
out ret_note text,
out last_tick int8) as

Multi-step root demotion to branch.

Must be be called for each step in sequence

Step 1disable writing to queue.
Step 2wait until writers go away, do tick.
Step 3change type, register.

Parameters

i_queue_namequeue name
i_stepstep number
i_new_providernew provider node Returns:
200success
404node not initialized for queue
301node is not root

pgq_node.promote_branch(1)

pgq_node.promote_branch(in i_queue_name text,
out ret_code int4,
out ret_note text) as

Promote branch node to root.

Parameters

i_queue_namequeue name

Returns

200success
404node not initialized for queue
301node is not branch

pgq_node.create_attrs(2)

Set node attributes.

Parameters

i_node_namecascaded queue name
i_node_attrsurlencoded node attrs

Returns

200ok
404node not found

Provider side operations - worker

pgq_node.register_subscriber(4)

pgq_node.register_subscriber(in i_queue_name text,
in i_remote_node_name text,
in i_remote_worker_name text,
in i_custom_tick_id int8,
out ret_code int4,
out ret_note text,
out global_watermark bigint) returns record

Subscribe remote node to local node at custom position.  Should be used when changing provider for existing node.

Parameters

i_node_nameset name
i_remote_node_namenode name
i_remote_worker_nameconsumer name
i_custom_tick_idtick id [optional]

Returns

ret_codeerror code
ret_notedescription
global_watermarkminimal watermark

pgq_node.unregister_subscriber(2)

pgq_node.unregister_subscriber(in i_queue_name text,
in i_remote_node_name text,
out ret_code int4,
out ret_note text) returns record

Unsubscribe remote node from local node.

Parameters

i_queue_nameset name
i_remote_node_namenode name

Returns

ret_codeerror code
ret_notedescription

pgq_node.set_subscriber_watermark(3)

pgq_node.set_subscriber_watermark(in i_queue_name text,
in i_node_name text,
in i_watermark bigint,
out ret_code int4,
out ret_note text) returns record

Notify provider about subscribers lowest watermark.

Called on provider at interval by each worker

Parameters

i_queue_namecascaded queue name
i_node_namesubscriber node name
i_watermarktick_id

Returns

ret_codeerror code
ret_notedescription

Subscriber side operations - worker

pgq_node.get_worker_state(1)

pgq_node.get_worker_state(in i_queue_name text,
out ret_code int4,
out ret_note text,
out node_type text,
out node_name text,
out completed_tick bigint,
out provider_node text,
out provider_location text,
out paused boolean,
out uptodate boolean,
out cur_error text,
out worker_name text,
out global_watermark bigint,
out local_watermark bigint,
out local_queue_top bigint,
out combined_queue text,
out combined_type text) returns record

Get info for consumer that maintains local node.

Parameters

i_queue_namecascaded queue name

Returns

node_typelocal node type
node_namelocal node name
completed_ticklast committed tick
provider_nodeprovider node name
provider_locationconnect string to provider node
pausedthis node should not do any work
uptodateif consumer has loaded last changes
cur_errorfailure reason

pgq_node.set_global_watermark(2)

pgq_node.set_global_watermark(in i_queue_name text,
in i_watermark bigint,
out ret_code int4,
out ret_note text) returns record

Move global watermark on branch/leaf, publish on root.

Parameters

i_queue_namequeue name
i_watermarkglobal tick_id that is processed everywhere.  NULL on root, then local wm is published.

pgq_node.set_partition_watermark(3)

pgq_node.set_partition_watermark(in i_combined_queue_name text,
in i_part_queue_name text,
in i_watermark bigint,
out ret_code int4,
out ret_note text) returns record

Move merge-leaf position on combined-branch.

Parameters

i_combined_queue_namelocal combined queue name
i_part_queue_namelocal part queue name (merge-leaf)
i_watermarkpartition tick_id that came inside combined-root batch

Returns

200success
201no partition queue
401worker registration not found

Subscriber side operations - any consumer

pgq_node.register_consumer(4)

pgq_node.register_consumer(in i_queue_name text,
in i_consumer_name text,
in i_provider_node text,
in i_custom_tick_id int8,
out ret_code int4,
out ret_note text) returns record

Subscribe plain cascaded consumer to a target node.  That means it’s planning to read from remote node and write to local node.

Parameters

i_queue_nameset name
i_consumer_namecascaded consumer name
i_provider_nodenode name
i_custom_tick_idtick id

Returns

ret_codeerror code
200ok
201already registered
401no such queue
ret_notedescription

pgq_node.unregister_consumer(2)

pgq_node.unregister_consumer(in i_queue_name text,
in i_consumer_name text,
out ret_code int4,
out ret_note text) returns record

Unregister cascaded consumer from local node.

Parameters

i_queue_namecascaded queue name
i_consumer_namecascaded consumer name

Returns

ret_codeerror code
200ok
404no such queue
ret_notedescription

pgq_node.get_consumer_state(2)

pgq_node.get_consumer_state(in i_queue_name text,
in i_consumer_name text,
out ret_code int4,
out ret_note text,
out node_type text,
out node_name text,
out completed_tick bigint,
out provider_node text,
out provider_location text,
out paused boolean,
out uptodate boolean,
out cur_error text) returns record

Get info for cascaded consumer that targets local node.

Parameters

i_node_namecascaded queue name
i_consumer_namecascaded consumer name

Returns

node_typelocal node type
node_namelocal node name
completed_ticklast committed tick
provider_nodeprovider node name
provider_locationconnect string to provider node
pausedthis node should not do any work
uptodateif consumer has loaded last changes
cur_errorfailure reason

pgq_node.change_consumer_provider(3)

pgq_node.change_consumer_provider(in i_queue_name text,
in i_consumer_name text,
in i_new_provider text,
out ret_code int4,
out ret_note text) as

Change provider for this consumer.

Parameters

i_queue_namequeue name
i_consumer_nameconsumer name
i_new_providernode name for new provider Returns:
ret_codeerror code
200ok
404no such consumer or new node
ret_notedescription

pgq_node.set_consumer_uptodate(3)

pgq_node.set_consumer_uptodate(in i_queue_name text,
in i_consumer_name text,
in i_uptodate boolean,
out ret_code int4,
out ret_note text) returns record

Set consumer uptodate flag.....

Parameters

i_queue_namequeue name
i_consumer_nameconsumer name
i_uptodatenew flag state

Returns

200ok
404consumer not known

pgq_node.set_consumer_paused(3)

pgq_node.set_consumer_paused(in i_queue_name text,
in i_consumer_name text,
in i_paused boolean,
out ret_code int4,
out ret_note text) as

Set consumer paused flag.

Parameters

i_queue_namecascaded queue name
i_consumer_namecascaded consumer name
i_pausednew flag state Returns:
200ok
201already paused
404consumer not found

pgq_node.set_consumer_completed(3)

pgq_node.set_consumer_completed(in i_queue_name text,
in i_consumer_name text,
in i_tick_id int8,
out ret_code int4,
out ret_note text) as

Set last completed tick id for the cascaded consumer that it has committed to local node.

Parameters

i_queue_namecascaded queue name
i_consumer_namecascaded consumer name
i_tick_idtick id Returns:
200ok
404consumer not known

pgq_node.set_consumer_error(3)

pgq_node.set_consumer_error(in i_queue_name text,
in i_consumer_name text,
in i_error_msg text,
out ret_code int4,
out ret_note text) as

If batch processing fails, consumer can store it’s last error in db.  Returns: 100 - ok 101 - consumer not known

Maintenance operations

pgq_node.maint_watermark(1)

pgq_node.maint_watermark(i_queue_name text) returns int4

Move global watermark on root node.

Returns

0tells pgqd to call just once

pgq_node.version(0)

pgq_node.version() returns text

Returns version string for pgq_node.  ATM it is based on SkyTools version and only bumped when database code changes.

pgq_node.register_location(in i_queue_name text,
in i_node_name text,
in i_node_location text,
in i_dead boolean,
out ret_code int4,
out ret_note text) returns record
Add new node location.
pgq_node.unregister_location(in i_queue_name text,
in i_node_name text,
out ret_code int4,
out ret_note text) returns record
Drop unreferenced node.
pgq_node.get_queue_locations(in i_queue_name text,
out node_name text,
out node_location text,
out dead boolean) returns setof record
Get node list for the queue.
pgq_node.create_node(in i_queue_name text,
in i_node_type text,
in i_node_name text,
in i_worker_name text,
in i_provider_name text,
in i_global_watermark bigint,
in i_combined_queue text,
out ret_code int4,
out ret_note text) returns record
Initialize node.
pgq_node.drop_node(in i_queue_name text,
in i_node_name text,
out ret_code int4,
out ret_note text) returns record
Drop node.
pgq_node.get_node_info(in i_queue_name text,
out ret_code int4,
out ret_note text,
out node_type text,
out node_name text,
out global_watermark bigint,
out local_watermark bigint,
out provider_node text,
out provider_location text,
out combined_queue text,
out combined_type text,
out worker_name text,
out worker_paused bool,
out worker_uptodate bool,
out worker_last_tick bigint,
out node_attrs text) returns record
Get local node info for cascaded queue.
pgq_node.is_root_node(i_queue_name text) returns bool
Checs if node is root.
pgq_node.is_leaf_node(i_queue_name text) returns bool
Checs if node is leaf.
pgq_node.get_subscriber_info(in i_queue_name text,
out node_name text,
out worker_name text,
out node_watermark int8) returns setof record
Get subscriber list for the local node.
pgq_node.get_consumer_info(in i_queue_name text,
out consumer_name text,
out provider_node text,
out last_tick_id int8,
out paused boolean,
out uptodate boolean,
out cur_error text) returns setof record
Get consumer list that work on the local node.
pgq_node.demote_root(in i_queue_name text,
in i_step int4,
in i_new_provider text,
out ret_code int4,
out ret_note text,
out last_tick int8) as
Multi-step root demotion to branch.
pgq_node.promote_branch(in i_queue_name text,
out ret_code int4,
out ret_note text) as
Promote branch node to root.
pgq_node.register_subscriber(in i_queue_name text,
in i_remote_node_name text,
in i_remote_worker_name text,
in i_custom_tick_id int8,
out ret_code int4,
out ret_note text,
out global_watermark bigint) returns record
Subscribe remote node to local node at custom position.
pgq_node.unregister_subscriber(in i_queue_name text,
in i_remote_node_name text,
out ret_code int4,
out ret_note text) returns record
Unsubscribe remote node from local node.
pgq_node.set_subscriber_watermark(in i_queue_name text,
in i_node_name text,
in i_watermark bigint,
out ret_code int4,
out ret_note text) returns record
Notify provider about subscribers lowest watermark.
pgq_node.get_worker_state(in i_queue_name text,
out ret_code int4,
out ret_note text,
out node_type text,
out node_name text,
out completed_tick bigint,
out provider_node text,
out provider_location text,
out paused boolean,
out uptodate boolean,
out cur_error text,
out worker_name text,
out global_watermark bigint,
out local_watermark bigint,
out local_queue_top bigint,
out combined_queue text,
out combined_type text) returns record
Get info for consumer that maintains local node.
pgq_node.set_global_watermark(in i_queue_name text,
in i_watermark bigint,
out ret_code int4,
out ret_note text) returns record
Move global watermark on branch/leaf, publish on root.
pgq_node.set_partition_watermark(in i_combined_queue_name text,
in i_part_queue_name text,
in i_watermark bigint,
out ret_code int4,
out ret_note text) returns record
Move merge-leaf position on combined-branch.
pgq_node.register_consumer(in i_queue_name text,
in i_consumer_name text,
in i_provider_node text,
in i_custom_tick_id int8,
out ret_code int4,
out ret_note text) returns record
Subscribe plain cascaded consumer to a target node.
pgq_node.unregister_consumer(in i_queue_name text,
in i_consumer_name text,
out ret_code int4,
out ret_note text) returns record
Unregister cascaded consumer from local node.
pgq_node.get_consumer_state(in i_queue_name text,
in i_consumer_name text,
out ret_code int4,
out ret_note text,
out node_type text,
out node_name text,
out completed_tick bigint,
out provider_node text,
out provider_location text,
out paused boolean,
out uptodate boolean,
out cur_error text) returns record
Get info for cascaded consumer that targets local node.
pgq_node.change_consumer_provider(in i_queue_name text,
in i_consumer_name text,
in i_new_provider text,
out ret_code int4,
out ret_note text) as
Change provider for this consumer.
pgq_node.set_consumer_uptodate(in i_queue_name text,
in i_consumer_name text,
in i_uptodate boolean,
out ret_code int4,
out ret_note text) returns record
Set consumer uptodate flag.....
pgq_node.set_consumer_paused(in i_queue_name text,
in i_consumer_name text,
in i_paused boolean,
out ret_code int4,
out ret_note text) as
Set consumer paused flag.
pgq_node.set_consumer_completed(in i_queue_name text,
in i_consumer_name text,
in i_tick_id int8,
out ret_code int4,
out ret_note text) as
Set last completed tick id for the cascaded consumer that it has committed to local node.
pgq_node.set_consumer_error(in i_queue_name text,
in i_consumer_name text,
in i_error_msg text,
out ret_code int4,
out ret_note text) as
If batch processing fails, consumer can store it’s last error in db.
pgq_node.maint_watermark(i_queue_name text) returns int4
Move global watermark on root node.
pgq_node.version() returns text
Returns version string for pgq_node.
Close