diff --git a/docs/configuring.md b/docs/configuring.md index 18f36c7d..5c8579fb 100644 --- a/docs/configuring.md +++ b/docs/configuring.md @@ -202,6 +202,28 @@ adjustment to which replication set the table is part of. Setting a table to unlogged will remove it from replication. Detaching a partition will not remove it from replication. +### `spock.log_origin_change` + +`spock.log_origin_change` indicates whether changes to a row's +origin should be logged to the PostgreSQL log. Rows may be being updated +locally by regular SQL operations, or by replication from apply workers. +Note that rows that are changed locally (not from replication) have the +origin value of 0. + +The default of `none` is recommended because otherwise the amount of entries +may become numerous. The other options allow for monitoring when updates +occur outside expected patterns. + +The following configuration values are possible: + +* `none` (the default)- do not log any origin change information +* `remote_only_differs`- only log origin changes when the existing row + was from one remote publisher and was changed by another + remote publisher +* `since_sub_creation`- log origin changes whether a publisher changed + a row that was previously from another publisher or updated it locally, + but only since the time when the subscription was created. + ### `spock.save_resolutions` `spock.save_resolutions` is a boolean value (the default is `false`) that diff --git a/include/spock.h b/include/spock.h index f57aabe4..156619e3 100644 --- a/include/spock.h +++ b/include/spock.h @@ -53,6 +53,7 @@ extern int restart_delay_on_exception; extern int spock_replay_queue_size; /* Deprecated - no longer used */ extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; +extern int log_origin_change; extern char *shorten_hash(const char *str, int maxlen); extern void gen_slot_name(Name slot_name, char *dbname, diff --git a/include/spock_conflict.h b/include/spock_conflict.h index e805baa9..061fdef8 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -80,6 +80,26 @@ extern int spock_conflict_resolver; extern int spock_conflict_log_level; extern bool spock_save_resolutions; +typedef enum +{ + /* do not log */ + SPOCK_ORIGIN_NONE, + /* log if remote change (ignore local origin, independent of time) */ + SPOCK_ORIGIN_REMOTE_ONLY_DIFFERS, + /* log only if origin changed since start of subscription */ + SPOCK_ORIGIN_DIFFERS_SINCE_SUB +} SpockSaveOriginConflictOption; + + +extern bool spock_tuple_find_replidx(ResultRelInfo *relinfo, + SpockTupleData *tuple, + TupleTableSlot *oldslot, + Oid *idxrelid); + +extern Oid spock_tuple_find_conflict(ResultRelInfo *relinfo, + SpockTupleData *tuple, + TupleTableSlot *oldslot); + extern bool get_tuple_origin(SpockRelation *rel, HeapTuple local_tuple, ItemPointer tid, TransactionId *xmin, RepOriginId *local_origin, TimestampTz *local_ts); diff --git a/include/spock_node.h b/include/spock_node.h index 591fae74..0df5096a 100644 --- a/include/spock_node.h +++ b/include/spock_node.h @@ -57,6 +57,7 @@ typedef struct SpockSubscription XLogRecPtr skiplsn; /* All changes finished at this LSN are * skipped */ List *skip_schema; /* Array of schema names to skip */ + TimestampTz created_at; /* When this subscription was created */ } SpockSubscription; /* NULL-terminated arrays */ diff --git a/sql/spock--4.0.0--4.0.1.sql b/sql/spock--4.0.0--4.0.1.sql deleted file mode 100644 index 1206280a..00000000 --- a/sql/spock--4.0.0--4.0.1.sql +++ /dev/null @@ -1,27 +0,0 @@ -/* spock--4.0.0--4.0.1.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.1'" to load this file. \quit - -CREATE TABLE spock.exception_status ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - remote_xid bigint NOT NULL, - status text NOT NULL, - resolved_at timestamptz, - resolution_details jsonb, - PRIMARY KEY(remote_origin, remote_commit_ts) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.exception_status_detail ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - command_counter integer NOT NULL, - remote_xid bigint NOT NULL, - status text NOT NULL, - resolved_at timestamptz, - resolution_details jsonb, - PRIMARY KEY(remote_origin, remote_commit_ts, command_counter), - FOREIGN KEY(remote_origin, remote_commit_ts) - REFERENCES spock.exception_status -) WITH (user_catalog_table=true); diff --git a/sql/spock--4.0.1--4.0.2.sql b/sql/spock--4.0.1--4.0.2.sql deleted file mode 100644 index 38ef6df6..00000000 --- a/sql/spock--4.0.1--4.0.2.sql +++ /dev/null @@ -1,106 +0,0 @@ - -/* spock--4.0.1--4.0.2.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.2'" to load this file. \quit - --- ---- --- Change the layout and primary key of the exception_log* tables --- ---- -CREATE TABLE spock.exception_log_temp ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - command_counter integer NOT NULL, - retry_errored_at timestamptz NOT NULL, - remote_xid bigint NOT NULL, - local_origin oid, - local_commit_ts timestamptz, - table_schema text, - table_name text, - operation text, - local_tup jsonb, - remote_old_tup jsonb, - remote_new_tup jsonb, - ddl_statement text, - ddl_user text, - error_message text NOT NULL, - PRIMARY KEY(remote_origin, remote_commit_ts, - command_counter, retry_errored_at) -) WITH (user_catalog_table=true); - -INSERT INTO spock.exception_log_temp - SELECT remote_origin, - remote_commit_ts, - command_counter, - retry_errored_at, - remote_xid, - local_origin, - local_commit_ts, - table_schema, - table_name, - operation, - local_tup, - remote_old_tup, - remote_new_tup, - ddl_statement, - ddl_user, - error_message - FROM spock.exception_log; - -DROP TABLE spock.exception_log; -ALTER TABLE spock.exception_log_temp RENAME TO exception_log; - -CREATE TABLE spock.exception_status_temp ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - retry_errored_at timestamptz NOT NULL, - remote_xid bigint NOT NULL, - status text NOT NULL, - resolved_at timestamptz, - resolution_details jsonb, - PRIMARY KEY(remote_origin, remote_commit_ts, retry_errored_at) -) WITH (user_catalog_table=true); - -INSERT INTO spock.exception_status_temp - SELECT remote_origin, - remote_commit_ts, - 'epoch'::timestamptz, - remote_xid, - status, - resolved_at, - resolution_details - FROM spock.exception_status; - -CREATE TABLE spock.exception_status_detail_temp ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - command_counter integer NOT NULL, - retry_errored_at timestamptz NOT NULL, - remote_xid bigint NOT NULL, - status text NOT NULL, - resolved_at timestamptz, - resolution_details jsonb, - PRIMARY KEY(remote_origin, remote_commit_ts, - command_counter, retry_errored_at), - FOREIGN KEY(remote_origin, remote_commit_ts, retry_errored_at) - REFERENCES spock.exception_status_temp -) WITH (user_catalog_table=true); - -INSERT INTO spock.exception_status_detail_temp - SELECT remote_origin, - remote_commit_ts, - command_counter, - 'epoch'::timestamptz, - remote_xid, - status, - resolved_at, - resolution_details - FROM spock.exception_status_detail; - -DROP TABLE spock.exception_status_detail; -ALTER TABLE spock.exception_status_detail_temp - RENAME TO exception_status_detail; - -DROP TABLE spock.exception_status; -ALTER TABLE spock.exception_status_temp - RENAME TO exception_status; diff --git a/sql/spock--4.0.10--5.0.0.sql b/sql/spock--4.0.10--5.0.0.sql deleted file mode 100644 index 9caf51ec..00000000 --- a/sql/spock--4.0.10--5.0.0.sql +++ /dev/null @@ -1,175 +0,0 @@ - -/* spock--4.0.10--5.0.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '5.0'" to load this file. \quit - -CREATE TABLE spock.progress ( - node_id oid NOT NULL, - remote_node_id oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - remote_lsn pg_lsn NOT NULL, - remote_insert_lsn pg_lsn NOT NULL, - last_updated_ts timestamptz NOT NULL, - updated_by_decode bool NOT NULL, - PRIMARY KEY(node_id, remote_node_id) -) WITH (fillfactor=50); - -INSERT INTO spock.progress (node_id, remote_node_id, remote_commit_ts, remote_lsn, remote_insert_lsn, last_updated_ts, updated_by_decode) - SELECT sub_target, sub_origin, 'epoch'::timestamptz, '0/0', '0/0', 'epoch'::timestamptz, 'f' - FROM spock.subscription; - -DROP VIEW IF EXISTS spock.lag_tracker; -DROP FUNCTION spock.lag_tracker(); -CREATE VIEW spock.lag_tracker AS - SELECT - origin.node_name AS origin_name, - n.node_name AS receiver_name, - MAX(p.remote_commit_ts) AS commit_timestamp, - MAX(p.remote_lsn) AS last_received_lsn, - MAX(p.remote_insert_lsn) AS remote_insert_lsn, - CASE - WHEN CAST(MAX(CAST(p.updated_by_decode as int)) as bool) THEN pg_wal_lsn_diff(MAX(p.remote_insert_lsn), MAX(p.remote_lsn)) - ELSE 0 - END AS replication_lag_bytes, - CASE - WHEN CAST(MAX(CAST(p.updated_by_decode as int)) as bool) THEN now() - MAX(p.remote_commit_ts) - ELSE now() - MAX(p.last_updated_ts) - END AS replication_lag - FROM spock.progress p - LEFT JOIN spock.subscription sub ON (p.node_id = sub.sub_target and p.remote_node_id = sub.sub_origin) - LEFT JOIN spock.node origin ON sub.sub_origin = origin.node_id - LEFT JOIN spock.node n ON n.node_id = p.node_id - GROUP BY origin.node_name, n.node_name; - -CREATE FUNCTION spock.sync_event() -RETURNS pg_lsn RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_sync_event'; - -CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin_id oid, lsn pg_lsn, timeout int DEFAULT 0) -AS $$ -DECLARE - target_id oid; - elapsed_time numeric := 0; - progress_lsn pg_lsn; -BEGIN - IF origin_id IS NULL THEN - RAISE EXCEPTION 'Origin node ''%'' not found', origin; - END IF; - target_id := node_id FROM spock.node_info(); - - WHILE true LOOP - SELECT INTO progress_lsn remote_lsn - FROM spock.progress - WHERE node_id = target_id AND remote_node_id = origin_id; - IF progress_lsn >= lsn THEN - result = true; - RETURN; - END IF; - elapsed_time := elapsed_time + .2; - IF timeout <> 0 AND elapsed_time >= timeout THEN - result := false; - RETURN; - END IF; - - ROLLBACK; - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; - -CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin name, lsn pg_lsn, timeout int DEFAULT 0) -AS $$ -DECLARE - origin_id oid; - target_id oid; - elapsed_time numeric := 0; - progress_lsn pg_lsn; -BEGIN - origin_id := node_id FROM spock.node WHERE node_name = origin; - IF origin_id IS NULL THEN - RAISE EXCEPTION 'Origin node ''%'' not found', origin; - END IF; - target_id := node_id FROM spock.node_info(); - - WHILE true LOOP - SELECT INTO progress_lsn remote_lsn - FROM spock.progress - WHERE node_id = target_id AND remote_node_id = origin_id; - IF progress_lsn >= lsn THEN - result = true; - RETURN; - END IF; - elapsed_time := elapsed_time + .2; - IF timeout <> 0 AND elapsed_time >= timeout THEN - result := false; - RETURN; - END IF; - - ROLLBACK; - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; - --- --- Function definition updated to include new 'enabled' parameter. --- -DROP FUNCTION spock.sub_create(name, text, text[], boolean, boolean, text[], interval, boolean); -CREATE FUNCTION spock.sub_create(subscription_name name, provider_dsn text, - replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = false, - synchronize_data boolean = false, forward_origins text[] = '{}', apply_delay interval DEFAULT '0', - force_text_transfer boolean = false, - enabled boolean = true) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_subscription'; - --- ---- --- Function to determine LSN from commit timestamp --- ---- -CREATE FUNCTION spock.get_lsn_from_commit_ts(slot_name name, commit_ts timestamptz) -RETURNS pg_lsn STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_get_lsn_from_commit_ts'; - -CREATE OR REPLACE FUNCTION spock.get_apply_worker_status( - OUT worker_pid bigint, -- Changed from int to bigint - OUT worker_dboid int, - OUT worker_subid bigint, - OUT worker_status text -) -RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'get_apply_worker_status'; - -CREATE FUNCTION spock.wait_for_apply_worker(p_subbid bigint, timeout int DEFAULT 0) -RETURNS boolean -AS $$ -DECLARE - start_time timestamptz := clock_timestamp(); - elapsed_time int := 0; - current_status text; -BEGIN - -- Loop until the timeout is reached or the worker is no longer running - WHILE true LOOP - -- Call spock.get_apply_worker_status to check the worker's status - SELECT worker_status - INTO current_status - FROM spock.get_apply_worker_status() - WHERE worker_subid = p_subbid; - - -- If no row is found, return -1 - IF NOT FOUND THEN - RETURN false; - END IF; - - -- If the worker is no longer running, return 0 - IF current_status IS DISTINCT FROM 'running' THEN - RETURN false; - END IF; - - -- Check if the timeout has been reached - elapsed_time := EXTRACT(EPOCH FROM clock_timestamp() - start_time) * 1000; - IF timeout > 0 AND elapsed_time >= timeout THEN - RETURN true; - END IF; - - -- Sleep for a short interval before checking again - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; diff --git a/sql/spock--4.0.2--4.0.3.sql b/sql/spock--4.0.2--4.0.3.sql deleted file mode 100644 index fcb3d105..00000000 --- a/sql/spock--4.0.2--4.0.3.sql +++ /dev/null @@ -1,5 +0,0 @@ - -/* spock--4.0.2--4.0.3.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.3'" to load this file. \quit diff --git a/sql/spock--4.0.3--4.0.4.sql b/sql/spock--4.0.3--4.0.4.sql deleted file mode 100644 index fcb3d105..00000000 --- a/sql/spock--4.0.3--4.0.4.sql +++ /dev/null @@ -1,5 +0,0 @@ - -/* spock--4.0.2--4.0.3.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.3'" to load this file. \quit diff --git a/sql/spock--4.0.4--4.0.5.sql b/sql/spock--4.0.4--4.0.5.sql deleted file mode 100644 index a3548d94..00000000 --- a/sql/spock--4.0.4--4.0.5.sql +++ /dev/null @@ -1,5 +0,0 @@ - -/* spock--4.0.4--4.0.5.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.5'" to load this file. \quit diff --git a/sql/spock--4.0.5--4.0.6.sql b/sql/spock--4.0.5--4.0.6.sql deleted file mode 100644 index aad39f5c..00000000 --- a/sql/spock--4.0.5--4.0.6.sql +++ /dev/null @@ -1,5 +0,0 @@ - -/* spock--4.0.5--4.0.6.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.6'" to load this file. \quit diff --git a/sql/spock--4.0.6--4.0.7.sql b/sql/spock--4.0.6--4.0.7.sql deleted file mode 100644 index 5668aeb8..00000000 --- a/sql/spock--4.0.6--4.0.7.sql +++ /dev/null @@ -1,4 +0,0 @@ -/* spock--4.0.6--4.0.7.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.7'" to load this file. \quit diff --git a/sql/spock--4.0.7--4.0.8.sql b/sql/spock--4.0.7--4.0.8.sql deleted file mode 100644 index 7dd0394d..00000000 --- a/sql/spock--4.0.7--4.0.8.sql +++ /dev/null @@ -1,4 +0,0 @@ -/* spock--4.0.7--4.0.8.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.8'" to load this file. \quit diff --git a/sql/spock--4.0.8--4.0.9.sql b/sql/spock--4.0.8--4.0.9.sql deleted file mode 100644 index 744f82c0..00000000 --- a/sql/spock--4.0.8--4.0.9.sql +++ /dev/null @@ -1,4 +0,0 @@ -/* spock--4.0.8--4.0.9.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.9'" to load this file. \quit diff --git a/sql/spock--4.0.9--4.0.10.sql b/sql/spock--4.0.9--4.0.10.sql deleted file mode 100644 index c506ad90..00000000 --- a/sql/spock--4.0.9--4.0.10.sql +++ /dev/null @@ -1,4 +0,0 @@ -/* spock--4.0.9--4.0.10.sql */ - --- complain if script is sourced in psql, rather than via ALTER EXTENSION -\echo Use "ALTER EXTENSION spock UPDATE TO '4.0.10'" to load this file. \quit diff --git a/sql/spock--5.0.0--5.0.1.sql b/sql/spock--5.0.0--5.0.1.sql index 1991da96..f376dc34 100644 --- a/sql/spock--5.0.0--5.0.1.sql +++ b/sql/spock--5.0.0--5.0.1.sql @@ -3,6 +3,14 @@ -- complain if script is sourced in psql, rather than via ALTER EXTENSION \echo Use "ALTER EXTENSION spock UPDATE TO '5.0.1'" to load this file. \quit +DO $$ +BEGIN + IF spock.spock_version_num() < 50100 THEN + RAISE EXCEPTION 'This upgrade step requires the spock 5.1 binary. ' + 'Please install the spock 5.1 package before running ' + 'ALTER EXTENSION spock UPDATE.'; + END IF; +END $$; CREATE OR REPLACE FUNCTION spock.replicate_ddl(command text, replication_sets text[] DEFAULT '{ddl_sql}', diff --git a/sql/spock--5.0.0.sql b/sql/spock--5.0.0.sql deleted file mode 100644 index 9ccad9ca..00000000 --- a/sql/spock--5.0.0.sql +++ /dev/null @@ -1,790 +0,0 @@ -\echo Use "CREATE EXTENSION spock" to load this file. \quit - -CREATE TABLE spock.node ( - node_id oid NOT NULL PRIMARY KEY, - node_name name NOT NULL UNIQUE, - location text, - country text, - info jsonb -) WITH (user_catalog_table=true); - -CREATE TABLE spock.node_interface ( - if_id oid NOT NULL PRIMARY KEY, - if_name name NOT NULL, -- default same as node name - if_nodeid oid REFERENCES node(node_id) ON UPDATE CASCADE, - if_dsn text NOT NULL, - UNIQUE (if_nodeid, if_name) -); - -CREATE TABLE spock.local_node ( - node_id oid PRIMARY KEY REFERENCES node(node_id), - node_local_interface oid NOT NULL REFERENCES node_interface(if_id) -); - -CREATE TABLE spock.subscription ( - sub_id oid NOT NULL PRIMARY KEY, - sub_name name NOT NULL UNIQUE, - sub_origin oid NOT NULL REFERENCES node(node_id) ON UPDATE CASCADE, - sub_target oid NOT NULL REFERENCES node(node_id) ON UPDATE CASCADE, - sub_origin_if oid NOT NULL REFERENCES node_interface(if_id), - sub_target_if oid NOT NULL REFERENCES node_interface(if_id), - sub_enabled boolean NOT NULL DEFAULT true, - sub_slot_name name NOT NULL, - sub_replication_sets text[], - sub_forward_origins text[], - sub_apply_delay interval NOT NULL DEFAULT '0', - sub_force_text_transfer boolean NOT NULL DEFAULT 'f', - sub_skip_lsn pg_lsn NOT NULL DEFAULT '0/0' -); - -CREATE TABLE spock.local_sync_status ( - sync_kind "char" NOT NULL CHECK (sync_kind IN ('i', 's', 'd', 'f')), - sync_subid oid NOT NULL REFERENCES spock.subscription(sub_id), - sync_nspname name, - sync_relname name, - sync_status "char" NOT NULL, - sync_statuslsn pg_lsn NOT NULL, - UNIQUE (sync_subid, sync_nspname, sync_relname) -); - -CREATE TABLE spock.exception_log ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - command_counter integer NOT NULL, - retry_errored_at timestamptz NOT NULL, - remote_xid bigint NOT NULL, - local_origin oid, - local_commit_ts timestamptz, - table_schema text, - table_name text, - operation text, - local_tup jsonb, - remote_old_tup jsonb, - remote_new_tup jsonb, - ddl_statement text, - ddl_user text, - error_message text NOT NULL, - PRIMARY KEY(remote_origin, remote_commit_ts, - command_counter, retry_errored_at) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.exception_status ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - retry_errored_at timestamptz NOT NULL, - remote_xid bigint NOT NULL, - status text NOT NULL, - resolved_at timestamptz, - resolution_details jsonb, - PRIMARY KEY(remote_origin, remote_commit_ts, retry_errored_at) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.exception_status_detail ( - remote_origin oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - command_counter integer NOT NULL, - retry_errored_at timestamptz NOT NULL, - remote_xid bigint NOT NULL, - status text NOT NULL, - resolved_at timestamptz, - resolution_details jsonb, - PRIMARY KEY(remote_origin, remote_commit_ts, - command_counter, retry_errored_at), - FOREIGN KEY(remote_origin, remote_commit_ts, retry_errored_at) - REFERENCES spock.exception_status -) WITH (user_catalog_table=true); - -CREATE TABLE spock.progress ( - node_id oid NOT NULL, - remote_node_id oid NOT NULL, - remote_commit_ts timestamptz NOT NULL, - remote_lsn pg_lsn NOT NULL, - remote_insert_lsn pg_lsn NOT NULL, - last_updated_ts timestamptz NOT NULL, - updated_by_decode bool NOT NULL, - PRIMARY KEY(node_id, remote_node_id) -) WITH (fillfactor=50); - -CREATE FUNCTION spock.node_create(node_name name, dsn text, - location text DEFAULT NULL, country text DEFAULT NULL, - info jsonb DEFAULT NULL) -RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_node'; -CREATE FUNCTION spock.node_drop(node_name name, ifexists boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_drop_node'; - -CREATE FUNCTION spock.node_add_interface(node_name name, interface_name name, dsn text) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_node_add_interface'; -CREATE FUNCTION spock.node_drop_interface(node_name name, interface_name name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_node_drop_interface'; - -CREATE FUNCTION spock.sub_create(subscription_name name, provider_dsn text, - replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = false, - synchronize_data boolean = false, forward_origins text[] = '{}', apply_delay interval DEFAULT '0', - force_text_transfer boolean = false, - enabled boolean = true) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_subscription'; -CREATE FUNCTION spock.sub_drop(subscription_name name, ifexists boolean DEFAULT false) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_drop_subscription'; - -CREATE FUNCTION spock.sub_alter_interface(subscription_name name, interface_name name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_interface'; - -CREATE FUNCTION spock.sub_disable(subscription_name name, immediate boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_disable'; -CREATE FUNCTION spock.sub_enable(subscription_name name, immediate boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_enable'; - -CREATE FUNCTION spock.sub_add_repset(subscription_name name, replication_set name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_add_replication_set'; -CREATE FUNCTION spock.sub_remove_repset(subscription_name name, replication_set name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_remove_replication_set'; -CREATE FUNCTION spock.sub_alter_skiplsn(subscription_name name, lsn pg_lsn) - RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_skip_lsn'; - -CREATE FUNCTION spock.sub_show_status(subscription_name name DEFAULT NULL, - OUT subscription_name text, OUT status text, OUT provider_node text, - OUT provider_dsn text, OUT slot_name text, OUT replication_sets text[], - OUT forward_origins text[]) -RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_show_subscription_status'; - -CREATE TABLE spock.replication_set ( - set_id oid NOT NULL PRIMARY KEY, - set_nodeid oid NOT NULL REFERENCES node(node_id) ON UPDATE CASCADE, - set_name name NOT NULL, - replicate_insert boolean NOT NULL DEFAULT true, - replicate_update boolean NOT NULL DEFAULT true, - replicate_delete boolean NOT NULL DEFAULT true, - replicate_truncate boolean NOT NULL DEFAULT true, - UNIQUE (set_nodeid, set_name) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.replication_set_table ( - set_id oid NOT NULL, - set_reloid regclass NOT NULL, - set_att_list text[], - set_row_filter pg_node_tree, - PRIMARY KEY(set_id, set_reloid) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.replication_set_seq ( - set_id oid NOT NULL, - set_seqoid regclass NOT NULL, - PRIMARY KEY(set_id, set_seqoid) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.sequence_state ( - seqoid oid NOT NULL PRIMARY KEY, - cache_size integer NOT NULL, - last_value bigint NOT NULL -) WITH (user_catalog_table=true); - -CREATE TABLE spock.depend ( - classid oid NOT NULL, - objid oid NOT NULL, - objsubid integer NOT NULL, - - refclassid oid NOT NULL, - refobjid oid NOT NULL, - refobjsubid integer NOT NULL, - - deptype "char" NOT NULL -) WITH (user_catalog_table=true); - -CREATE TABLE spock.pii ( - id int generated always as identity, - pii_schema text NOT NULL, - pii_table text NOT NULL, - pii_column text NOT NULL, - PRIMARY KEY(id) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.resolutions ( - id int generated always as identity, - node_name name NOT NULL, - log_time timestamptz NOT NULL, - relname text, - idxname text, - conflict_type text, - conflict_resolution text, - - -- columns for local changes - local_origin int, - local_tuple text, - local_xid xid, - local_timestamp timestamptz, - - -- columns for remote changes - remote_origin int, - remote_tuple text, - remote_xid xid, - remote_timestamp timestamptz, - remote_lsn pg_lsn, - - PRIMARY KEY(id, node_name) -) WITH (user_catalog_table=true); - -CREATE VIEW spock.TABLES AS - WITH set_relations AS ( - SELECT s.set_name, r.set_reloid - FROM spock.replication_set_table r, - spock.replication_set s, - spock.local_node n - WHERE s.set_nodeid = n.node_id - AND s.set_id = r.set_id - ), - user_tables AS ( - SELECT r.oid, n.nspname, r.relname, r.relreplident - FROM pg_catalog.pg_class r, - pg_catalog.pg_namespace n - WHERE r.relkind IN ('r', 'p') - AND r.relpersistence = 'p' - AND n.oid = r.relnamespace - AND n.nspname !~ '^pg_' - AND n.nspname != 'information_schema' - AND n.nspname != 'spock' - ) - SELECT r.oid AS relid, n.nspname, r.relname, s.set_name - FROM pg_catalog.pg_namespace n, - pg_catalog.pg_class r, - set_relations s - WHERE r.relkind IN ('r', 'p') - AND n.oid = r.relnamespace - AND r.oid = s.set_reloid - UNION - SELECT t.oid AS relid, t.nspname, t.relname, NULL - FROM user_tables t - WHERE t.oid NOT IN (SELECT set_reloid FROM set_relations); - -CREATE FUNCTION spock.repset_create(set_name name, - replicate_insert boolean = true, replicate_update boolean = true, - replicate_delete boolean = true, replicate_truncate boolean = true) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_replication_set'; -CREATE FUNCTION spock.repset_alter(set_name name, - replicate_insert boolean DEFAULT NULL, replicate_update boolean DEFAULT NULL, - replicate_delete boolean DEFAULT NULL, replicate_truncate boolean DEFAULT NULL) -RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_replication_set'; -CREATE FUNCTION spock.repset_drop(set_name name, ifexists boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_drop_replication_set'; - -CREATE FUNCTION spock.repset_add_table(set_name name, relation regclass, synchronize_data boolean DEFAULT false, - columns text[] DEFAULT NULL, row_filter text DEFAULT NULL, include_partitions boolean default true) -RETURNS boolean CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_table'; -CREATE FUNCTION spock.repset_add_all_tables(set_name name, schema_names text[], synchronize_data boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_all_tables'; -CREATE FUNCTION spock.repset_remove_table(set_name name, relation regclass, include_partitions boolean default true) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_remove_table'; - -CREATE FUNCTION spock.repset_add_seq(set_name name, relation regclass, synchronize_data boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_sequence'; -CREATE FUNCTION spock.repset_add_all_seqs(set_name name, schema_names text[], synchronize_data boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_all_sequences'; -CREATE FUNCTION spock.repset_remove_seq(set_name name, relation regclass) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_remove_sequence'; - -CREATE FUNCTION spock.repset_add_partition(parent regclass, partition regclass default NULL, - row_filter text default NULL) -RETURNS int CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_partition'; - -CREATE FUNCTION spock.repset_remove_partition(parent regclass, partition regclass default NULL) -RETURNS int CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_remove_partition'; - -CREATE FUNCTION spock.sub_alter_sync(subscription_name name, truncate boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_synchronize'; - -CREATE FUNCTION spock.sub_resync_table(subscription_name name, relation regclass, - truncate boolean DEFAULT true) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_resynchronize_table'; - -CREATE FUNCTION spock.sync_seq(relation regclass) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_synchronize_sequence'; - -CREATE FUNCTION spock.table_data_filtered(reltyp anyelement, relation regclass, repsets text[]) -RETURNS SETOF anyelement CALLED ON NULL INPUT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_table_data_filtered'; - -CREATE FUNCTION spock.repset_show_table(relation regclass, repsets text[], OUT relid oid, OUT nspname text, - OUT relname text, OUT att_list text[], OUT has_row_filter boolean, OUT relkind "char", OUT relispartition boolean) -RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_show_repset_table_info'; - -CREATE FUNCTION spock.sub_show_table(subscription_name name, relation regclass, OUT nspname text, OUT relname text, OUT status text) -RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_show_subscription_table'; - -CREATE TABLE spock.queue ( - queued_at timestamp with time zone NOT NULL, - role name NOT NULL, - replication_sets text[], - message_type "char" NOT NULL, - message json NOT NULL -); - -CREATE FUNCTION spock.replicate_ddl(command text, - replication_sets text[] DEFAULT '{ddl_sql}', - search_path text DEFAULT '', - role text DEFAULT CURRENT_USER) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replicate_ddl_command'; - -CREATE FUNCTION spock.replicate_ddl(command text[], - replication_sets text[] DEFAULT '{ddl_sql}', - search_path text DEFAULT current_setting('search_path'), - role text DEFAULT CURRENT_USER) -RETURNS SETOF boolean STRICT VOLATILE LANGUAGE sql AS - 'SELECT spock.replicate_ddl(cmd, $2, $3, $4) FROM (SELECT unnest(command) cmd)'; - -CREATE FUNCTION spock.node_info(OUT node_id oid, OUT node_name text, - OUT sysid text, OUT dbname text, OUT replication_sets text, - OUT location text, OUT country text, OUT info jsonb) -RETURNS record -STABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'spock_node_info'; - -CREATE FUNCTION spock.spock_gen_slot_name(name, name, name) -RETURNS name -IMMUTABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME'; - -CREATE FUNCTION spock_version() RETURNS text -LANGUAGE c AS 'MODULE_PATHNAME'; - -CREATE FUNCTION spock_version_num() RETURNS integer -LANGUAGE c AS 'MODULE_PATHNAME'; - -CREATE FUNCTION spock_max_proto_version() RETURNS integer -LANGUAGE c AS 'MODULE_PATHNAME'; - -CREATE FUNCTION spock_min_proto_version() RETURNS integer -LANGUAGE c AS 'MODULE_PATHNAME'; - -CREATE FUNCTION spock.get_country() RETURNS text -LANGUAGE sql AS -$$ SELECT current_setting('spock.country') $$; - -CREATE FUNCTION -spock.wait_slot_confirm_lsn(slotname name, target pg_lsn) -RETURNS void LANGUAGE c AS 'spock','spock_wait_slot_confirm_lsn'; - -CREATE FUNCTION spock.sub_wait_for_sync(subscription_name name) -RETURNS void RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_wait_for_subscription_sync_complete'; - -CREATE FUNCTION spock.table_wait_for_sync(subscription_name name, relation regclass) -RETURNS void RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_wait_for_table_sync_complete'; - -CREATE FUNCTION spock.sync_event() -RETURNS pg_lsn RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_sync_event'; - -CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin_id oid, lsn pg_lsn, timeout int DEFAULT 0) -AS $$ -DECLARE - target_id oid; - elapsed_time numeric := 0; - progress_lsn pg_lsn; -BEGIN - IF origin_id IS NULL THEN - RAISE EXCEPTION 'Origin node ''%'' not found', origin; - END IF; - target_id := node_id FROM spock.node_info(); - - WHILE true LOOP - SELECT INTO progress_lsn remote_lsn - FROM spock.progress - WHERE node_id = target_id AND remote_node_id = origin_id; - IF progress_lsn >= lsn THEN - result = true; - RETURN; - END IF; - elapsed_time := elapsed_time + .2; - IF timeout <> 0 AND elapsed_time >= timeout THEN - result := false; - RETURN; - END IF; - - ROLLBACK; - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; - -CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin name, lsn pg_lsn, timeout int DEFAULT 0) -AS $$ -DECLARE - origin_id oid; - target_id oid; - elapsed_time numeric := 0; - progress_lsn pg_lsn; -BEGIN - origin_id := node_id FROM spock.node WHERE node_name = origin; - IF origin_id IS NULL THEN - RAISE EXCEPTION 'Origin node ''%'' not found', origin; - END IF; - target_id := node_id FROM spock.node_info(); - - WHILE true LOOP - SELECT INTO progress_lsn remote_lsn - FROM spock.progress - WHERE node_id = target_id AND remote_node_id = origin_id; - IF progress_lsn >= lsn THEN - result = true; - RETURN; - END IF; - elapsed_time := elapsed_time + .2; - IF timeout <> 0 AND elapsed_time >= timeout THEN - result := false; - RETURN; - END IF; - - ROLLBACK; - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; - -CREATE FUNCTION spock.xact_commit_timestamp_origin("xid" xid, OUT "timestamp" timestamptz, OUT "roident" oid) -RETURNS record RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_xact_commit_timestamp_origin'; - -CREATE FUNCTION spock.get_channel_stats( - OUT subid oid, - OUT relid oid, - OUT n_tup_ins bigint, - OUT n_tup_upd bigint, - OUT n_tup_del bigint, - OUT n_conflict bigint, - OUT n_dca bigint) -RETURNS SETOF record -LANGUAGE c AS 'MODULE_PATHNAME', 'get_channel_stats'; - -CREATE FUNCTION spock.reset_channel_stats() RETURNS void -LANGUAGE c AS 'MODULE_PATHNAME', 'reset_channel_stats'; - -CREATE VIEW spock.channel_table_stats AS - SELECT H.subid, H.relid, - CASE H.subid - WHEN 0 THEN '' - ELSE S.sub_name - END AS sub_name, - pg_catalog.quote_ident(N.nspname) || '.' || pg_catalog.quote_ident(C.relname) AS table_name, - H.n_tup_ins, H.n_tup_upd, H.n_tup_del, - H.n_conflict, H.n_dca - FROM spock.get_channel_stats() AS H - LEFT JOIN spock.subscription AS S ON S.sub_id = H.subid - LEFT JOIN pg_catalog.pg_class AS C ON C.oid = H.relid - LEFT JOIN pg_catalog.pg_namespace AS N ON N.oid = C.relnamespace; - -CREATE VIEW spock.channel_summary_stats AS - SELECT subid, sub_name, - sum(n_tup_ins) AS n_tup_ins, - sum(n_tup_upd) AS n_tup_upd, - sum(n_tup_del) AS n_tup_del, - sum(n_conflict) AS n_conflict, - sum(n_dca) AS n_dca - FROM spock.channel_table_stats - GROUP BY subid, sub_name; - -CREATE VIEW spock.lag_tracker AS - SELECT - origin.node_name AS origin_name, - n.node_name AS receiver_name, - MAX(p.remote_commit_ts) AS commit_timestamp, - MAX(p.remote_lsn) AS last_received_lsn, - MAX(p.remote_insert_lsn) AS remote_insert_lsn, - CASE - WHEN CAST(MAX(CAST(p.updated_by_decode as int)) as bool) THEN pg_wal_lsn_diff(MAX(p.remote_insert_lsn), MAX(p.remote_lsn)) - ELSE 0 - END AS replication_lag_bytes, - CASE - WHEN CAST(MAX(CAST(p.updated_by_decode as int)) as bool) THEN now() - MAX(p.remote_commit_ts) - ELSE now() - MAX(p.last_updated_ts) - END AS replication_lag - FROM spock.progress p - LEFT JOIN spock.subscription sub ON (p.node_id = sub.sub_target and p.remote_node_id = sub.sub_origin) - LEFT JOIN spock.node origin ON sub.sub_origin = origin.node_id - LEFT JOIN spock.node n ON n.node_id = p.node_id - GROUP BY origin.node_name, n.node_name; - -CREATE FUNCTION spock.md5_agg_sfunc(text, anyelement) - RETURNS text - LANGUAGE sql -AS -$$ - SELECT md5($1 || $2::text) -$$; -CREATE AGGREGATE spock.md5_agg (ORDER BY anyelement) -( - STYPE = text, - SFUNC = spock.md5_agg_sfunc, - INITCOND = '' -); - --- ---------------------------------------------------------------------- --- Spock Read Only --- ---------------------------------------------------------------------- -CREATE FUNCTION spock.terminate_active_transactions() RETURNS bool - AS 'MODULE_PATHNAME', 'spockro_terminate_active_transactions' - LANGUAGE C STRICT; - --- ---------------------------------------------------------------------- --- We check the PostgreSQL major version number in case a future --- catalog change forces us to provide different functions for --- different versions. --- ---------------------------------------------------------------------- -DO $version_dependent$ -DECLARE - pgmajor integer; -BEGIN - pgmajor = regexp_replace(regexp_replace(version(), '^PostgreSQL ', ''), '[^0-9].*', '')::integer; - - CASE - WHEN pgmajor IN (15, 16, 17, 18) THEN - --- ---------------------------------------------------------------------- --- convert_column_to_int8() --- --- Change the data type of a column to int8 and recursively alter --- all columns that reference this one through foreign key constraints. --- ---------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION spock.convert_column_to_int8(p_rel regclass, p_attnum smallint) -RETURNS integer -SET search_path = pg_catalog -AS $$ -DECLARE - v_attr record; - v_fk record; - v_attidx integer; - v_cmd text; - v_num_altered integer := 0; -BEGIN - -- ---- - -- Get the attribute definition - -- ---- - SELECT * INTO v_attr - FROM pg_namespace N - JOIN pg_class C - ON N.oid = C.relnamespace - JOIN pg_attribute A - ON C.oid = A.attrelid - WHERE A.attrelid = p_rel - AND A.attnum = p_attnum; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Attribute % of reation % not found', p_attnum, p_rel; - END IF; - - -- ---- - -- If the attribute type is not bigint, we change it - -- ---- - IF v_attr.atttypid <> 'int8'::regtype THEN - v_cmd = 'ALTER TABLE ' || - quote_ident(v_attr.nspname) || '.' || - quote_ident(v_attr.relname) || - ' ALTER COLUMN ' || - quote_ident(v_attr.attname) || - ' SET DATA TYPE int8'; - RAISE NOTICE 'EXECUTE %', v_cmd; - EXECUTE v_cmd; - - v_num_altered = v_num_altered + 1; - END IF; - - -- ---- - -- Convert foreign keys referencing this column as well - -- ---- - FOR v_fk IN - SELECT * FROM pg_constraint F - JOIN pg_class C - ON C.oid = F.conrelid - JOIN pg_namespace N - ON N.oid = C.relnamespace - WHERE F.contype = 'f' - AND F.confrelid = v_attr.attrelid - LOOP - -- ---- - -- Lookup the attribute index in the possibly compount FK - -- ---- - v_attidx = array_position(v_fk.confkey, v_attr.attnum); - IF v_attidx IS NULL THEN - CONTINUE; - END IF; - - -- ---- - -- Recurse for the referencing column - -- ---- - v_num_altered = v_num_altered + - spock.convert_column_to_int8(v_fk.conrelid, - v_fk.conkey[v_attidx]); - END LOOP; - RETURN v_num_altered; -END; -$$ LANGUAGE plpgsql; - --- ---------------------------------------------------------------------- --- convert_sequence_to_snowflake() --- --- Convert the DEFAULT expression for a sequence to snowflake's nextval() --- function. Eventually change the data type of columns using it --- to bigint. --- ---------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION spock.convert_sequence_to_snowflake(p_seqid regclass) -RETURNS integer -SET search_path = pg_catalog -AS $$ -DECLARE - v_attrdef record; - v_attr record; - v_seq record; - v_cmd text; - v_num_altered integer := 0; -BEGIN - -- ---- - -- We are looking for column defaults that use the requested - -- sequence and the function nextval(). - -- ---- - FOR v_attrdef IN - SELECT AD.*, - pg_get_expr(AD.adbin, AD.adrelid, true) adstr - FROM pg_depend D - JOIN pg_attrdef AD - ON D.refclassid = 'pg_class'::regclass - AND AD.adrelid = D.refobjid - AND AD.adnum = D.refobjsubid - WHERE D.classid = 'pg_class'::regclass - AND D.objid = p_seqid - LOOP - IF v_attrdef.adstr NOT LIKE 'nextval(%' THEN - CONTINUE; - END IF; - - -- ---- - -- Get the attribute definition - -- ---- - SELECT * INTO v_attr - FROM pg_namespace N - JOIN pg_class C - ON N.oid = C.relnamespace - JOIN pg_attribute A - ON C.oid = A.attrelid - WHERE A.attrelid = v_attrdef.adrelid - AND A.attnum = v_attrdef.adnum; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Attribute for % not found', v_attrdef.adstr; - END IF; - - -- ---- - -- Get the sequence definition - -- ---- - SELECT * INTO v_seq - FROM pg_namespace N - JOIN pg_class C - ON N.oid = C.relnamespace - WHERE C.oid = p_seqid; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Sequence with Oid % not found', p_seqid; - END IF; - - -- ---- - -- If the attribute type is not bigint, we change it - -- ---- - v_num_altered = v_num_altered + - spock.convert_column_to_int8(v_attr.attrelid, v_attr.attnum); - - -- ---- - -- Now we can change the default to snowflake.nextval() - -- ---- - v_cmd = 'ALTER TABLE ' || - quote_ident(v_attr.nspname) || '.' || - quote_ident(v_attr.relname) || - ' ALTER COLUMN ' || - quote_ident(v_attr.attname) || - ' SET DEFAULT snowflake.nextval(''' || - quote_ident(v_seq.nspname) || '.' || - quote_ident(v_seq.relname) || - '''::regclass)'; - RAISE NOTICE 'EXECUTE %', v_cmd; - EXECUTE v_cmd; - - v_num_altered = v_num_altered + 1; - END LOOP; - RETURN v_num_altered; -END; -$$ LANGUAGE plpgsql; - - -- END pgmajor in (15, 16, 17, 18) - ELSE - RAISE EXCEPTION 'Unsupported PostgreSQL major version %', pgmajor; - END CASE; --- End of PG major version dependent PL/pgSQL definitions -END; -$version_dependent$ LANGUAGE plpgsql; - --- ---- --- Generic delta apply functions for all numeric data types --- ---- -CREATE FUNCTION spock.delta_apply(int2, int2, int2) -RETURNS int2 LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_int2'; -CREATE FUNCTION spock.delta_apply(int4, int4, int4) -RETURNS int4 LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_int4'; -CREATE FUNCTION spock.delta_apply(int8, int8, int8) -RETURNS int8 LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_int8'; -CREATE FUNCTION spock.delta_apply(float4, float4, float4) -RETURNS float4 LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_float4'; -CREATE FUNCTION spock.delta_apply(float8, float8, float8) -RETURNS float8 LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_float8'; -CREATE FUNCTION spock.delta_apply(numeric, numeric, numeric) -RETURNS numeric LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_numeric'; -CREATE FUNCTION spock.delta_apply(money, money, money) -RETURNS money LANGUAGE c AS 'MODULE_PATHNAME', 'delta_apply_money'; - --- ---- --- Function to control REPAIR mode --- ---- -CREATE FUNCTION spock.repair_mode(enabled bool) -RETURNS pg_catalog.pg_lsn LANGUAGE c -AS 'MODULE_PATHNAME', 'spock_repair_mode'; - --- ---- --- Function to determine LSN from commit timestamp --- ---- -CREATE FUNCTION spock.get_lsn_from_commit_ts(slot_name name, commit_ts timestamptz) -RETURNS pg_lsn STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_get_lsn_from_commit_ts'; - -CREATE OR REPLACE FUNCTION spock.get_apply_worker_status( - OUT worker_pid bigint, -- Changed from int to bigint - OUT worker_dboid int, - OUT worker_subid bigint, - OUT worker_status text -) -RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'get_apply_worker_status'; - -CREATE FUNCTION spock.wait_for_apply_worker(p_subbid bigint, timeout int DEFAULT 0) -RETURNS boolean -AS $$ -DECLARE - start_time timestamptz := clock_timestamp(); - elapsed_time int := 0; - current_status text; -BEGIN - -- Loop until the timeout is reached or the worker is no longer running - WHILE true LOOP - -- Call spock.get_apply_worker_status to check the worker's status - SELECT worker_status - INTO current_status - FROM spock.get_apply_worker_status() - WHERE worker_subid = p_subbid; - - -- If no row is found, return -1 - IF NOT FOUND THEN - RETURN false; - END IF; - - -- If the worker is no longer running, return 0 - IF current_status IS DISTINCT FROM 'running' THEN - RETURN false; - END IF; - - -- Check if the timeout has been reached - elapsed_time := EXTRACT(EPOCH FROM clock_timestamp() - start_time) * 1000; - IF timeout > 0 AND elapsed_time >= timeout THEN - RETURN true; - END IF; - - -- Sleep for a short interval before checking again - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; diff --git a/sql/spock--5.0.1--5.0.2.sql b/sql/spock--5.0.1--5.0.2.sql index 015b4c81..6f67af2c 100644 --- a/sql/spock--5.0.1--5.0.2.sql +++ b/sql/spock--5.0.1--5.0.2.sql @@ -3,6 +3,15 @@ -- complain if script is sourced in psql, rather than via ALTER EXTENSION \echo Use "ALTER EXTENSION spock UPDATE TO '5.0.2'" to load this file. \quit +DO $$ +BEGIN + IF spock.spock_version_num() < 50100 THEN + RAISE EXCEPTION 'This upgrade step requires the spock 5.1 binary. ' + 'Please install the spock 5.1 package before running ' + 'ALTER EXTENSION spock UPDATE.'; + END IF; +END $$; + ALTER TABLE spock.subscription ADD sub_skip_schema text; diff --git a/sql/spock--5.0.2--5.0.3.sql b/sql/spock--5.0.2--5.0.3.sql index 5c3f1f5e..985d6ccb 100644 --- a/sql/spock--5.0.2--5.0.3.sql +++ b/sql/spock--5.0.2--5.0.3.sql @@ -3,3 +3,12 @@ -- complain if script is sourced in psql, rather than via ALTER EXTENSION \echo Use "ALTER EXTENSION spock UPDATE TO '5.0.3'" to load this file. \quit + +DO $$ +BEGIN + IF spock.spock_version_num() < 50100 THEN + RAISE EXCEPTION 'This upgrade step requires the spock 5.1 binary. ' + 'Please install the spock 5.1 package before running ' + 'ALTER EXTENSION spock UPDATE.'; + END IF; +END $$; diff --git a/sql/spock--5.0.3--5.0.4.sql b/sql/spock--5.0.3--5.0.4.sql index 9c465b81..6aa6739f 100644 --- a/sql/spock--5.0.3--5.0.4.sql +++ b/sql/spock--5.0.3--5.0.4.sql @@ -3,3 +3,12 @@ -- complain if script is sourced in psql, rather than via ALTER EXTENSION \echo Use "ALTER EXTENSION spock UPDATE TO '5.0.4'" to load this file. \quit + +DO $$ +BEGIN + IF spock.spock_version_num() < 50100 THEN + RAISE EXCEPTION 'This upgrade step requires the spock 5.1 binary. ' + 'Please install the spock 5.1 package before running ' + 'ALTER EXTENSION spock UPDATE.'; + END IF; +END $$; diff --git a/sql/spock--5.0.4--5.0.5.sql b/sql/spock--5.0.4--5.0.5.sql new file mode 100644 index 00000000..87b01e27 --- /dev/null +++ b/sql/spock--5.0.4--5.0.5.sql @@ -0,0 +1,105 @@ +/* spock--5.0.4--5.0.5.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION spock UPDATE TO '5.0.5'" to load this file. \quit + +DO $$ +BEGIN + IF spock.spock_version_num() < 50100 THEN + RAISE EXCEPTION 'This upgrade step requires the spock 5.1 binary. ' + 'Please install the spock 5.1 package before running ' + 'ALTER EXTENSION spock UPDATE.'; + END IF; +END $$; + +DROP PROCEDURE IF EXISTS spock.wait_for_sync_event(oid, pg_lsn, int); + +CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin_id oid, lsn pg_lsn, timeout int DEFAULT 0) +AS $$ +DECLARE + target_id oid; + elapsed_time numeric := 0; + progress_lsn pg_lsn; +BEGIN + IF origin_id IS NULL THEN + RAISE EXCEPTION 'Origin node ''%'' not found', origin_id; + END IF; + target_id := node_id FROM spock.node_info(); + + WHILE true LOOP + -- If an unresolvable issue occurs with the apply worker, the LR + -- progress gets stuck, and we need to check the subscription's state + -- carefully. + IF NOT EXISTS (SELECT * FROM spock.subscription + WHERE sub_origin = origin_id AND + sub_target = target_id AND + sub_enabled = true) THEN + RAISE EXCEPTION 'Replication % => % does not have any enabled subscription yet', + origin_id, target_id; + END IF; + + SELECT INTO progress_lsn remote_lsn + FROM spock.progress + WHERE node_id = target_id AND remote_node_id = origin_id; + IF progress_lsn >= lsn THEN + result = true; + RETURN; + END IF; + elapsed_time := elapsed_time + .2; + IF timeout <> 0 AND elapsed_time >= timeout THEN + result := false; + RETURN; + END IF; + + ROLLBACK; + PERFORM pg_sleep(0.2); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +DROP PROCEDURE IF EXISTS spock.wait_for_sync_event(name, pg_lsn, int); + +CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin name, lsn pg_lsn, timeout int DEFAULT 0) +AS $$ +DECLARE + origin_id oid; + target_id oid; + elapsed_time numeric := 0; + progress_lsn pg_lsn; +BEGIN + origin_id := node_id FROM spock.node WHERE node_name = origin; + IF origin_id IS NULL THEN + RAISE EXCEPTION 'Origin node ''%'' not found', origin; + END IF; + target_id := node_id FROM spock.node_info(); + + WHILE true LOOP + -- If an unresolvable issue occurs with the apply worker, the LR + -- progress gets stuck, and we need to check the subscription's state + -- carefully. + IF NOT EXISTS (SELECT * FROM spock.subscription + WHERE sub_origin = origin_id AND + sub_target = target_id AND + sub_enabled = true) THEN + RAISE EXCEPTION 'Replication % => % does not have any enabled subscription yet', + origin_id, target_id; + END IF; + + SELECT INTO progress_lsn remote_lsn + FROM spock.progress + WHERE node_id = target_id AND remote_node_id = origin_id; + IF progress_lsn >= lsn THEN + result = true; + RETURN; + END IF; + elapsed_time := elapsed_time + .2; + IF timeout <> 0 AND elapsed_time >= timeout THEN + result := false; + RETURN; + END IF; + + ROLLBACK; + PERFORM pg_sleep(0.2); + END LOOP; +END; +$$ LANGUAGE plpgsql; diff --git a/sql/spock--5.0.5--5.0.6.sql b/sql/spock--5.0.5--5.0.6.sql new file mode 100644 index 00000000..006c2fac --- /dev/null +++ b/sql/spock--5.0.5--5.0.6.sql @@ -0,0 +1,16 @@ +/* spock--5.0.5--5.0.6.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION spock UPDATE TO '5.0.6'" to load this file. \quit + +DO $$ +BEGIN + IF spock.spock_version_num() < 50100 THEN + RAISE EXCEPTION 'This upgrade step requires the spock 5.1 binary. ' + 'Please install the spock 5.1 package before running ' + 'ALTER EXTENSION spock UPDATE.'; + END IF; +END $$; + +ALTER TABLE spock.subscription + ADD COLUMN sub_created_at timestamptz; diff --git a/sql/spock--5.0.4--6.0.0-devel.sql b/sql/spock--5.0.6--6.0.0-devel.sql similarity index 99% rename from sql/spock--5.0.4--6.0.0-devel.sql rename to sql/spock--5.0.6--6.0.0-devel.sql index c5a2b1db..909d96ae 100644 --- a/sql/spock--5.0.4--6.0.0-devel.sql +++ b/sql/spock--5.0.6--6.0.0-devel.sql @@ -1,4 +1,4 @@ -/* spock--5.0.4--6.0.0.devel.sql */ +/* spock--5.0.6--6.0.0-devel.sql */ -- complain if script is sourced in psql, rather than via ALTER EXTENSION \echo Use "ALTER EXTENSION spock UPDATE TO '6.0.0-devel'" to load this file. \quit diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index d11bbeda..9cf0088d 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -35,7 +35,8 @@ CREATE TABLE spock.subscription ( sub_apply_delay interval NOT NULL DEFAULT '0', sub_force_text_transfer boolean NOT NULL DEFAULT 'f', sub_skip_lsn pg_lsn NOT NULL DEFAULT '0/0', - sub_skip_schema text[] + sub_skip_schema text[], + sub_created_at timestamptz ); -- Source for sub_id values. CREATE SEQUENCE spock.sub_id_generator AS integer MINVALUE 1 CYCLE START WITH 1 OWNED BY spock.subscription.sub_id; diff --git a/src/spock.c b/src/spock.c index 787ad115..79f0634c 100644 --- a/src/spock.c +++ b/src/spock.c @@ -81,6 +81,13 @@ static const struct config_enum_entry SpockConflictResolvers[] = { {NULL, 0, false} }; +static const struct config_enum_entry SpockOriginConflicts[] = { + {"none", SPOCK_ORIGIN_NONE, false}, + {"remote_only_differs", SPOCK_ORIGIN_REMOTE_ONLY_DIFFERS, false}, + {"since_sub_creation", SPOCK_ORIGIN_DIFFERS_SINCE_SUB, false}, + {NULL, 0, false} +}; + /* copied fom guc.c */ static const struct config_enum_entry server_message_level_options[] = { {"debug", DEBUG2, true}, @@ -135,6 +142,7 @@ int restart_delay_on_exception; int spock_replay_queue_size; /* Deprecated - no longer used */ bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; +int log_origin_change = SPOCK_ORIGIN_NONE; static emit_log_hook_type prev_emit_log_hook = NULL; static Checkpoint_hook_type prev_Checkpoint_hook = NULL; @@ -1185,6 +1193,15 @@ _PG_init(void) NULL, NULL); + DefineCustomEnumVariable("spock.log_origin_change", + gettext_noop("If set, log when the origin of a tuple changes."), + NULL, + &log_origin_change, + SPOCK_ORIGIN_NONE, + SpockOriginConflicts, + PGC_SUSET, 0, + NULL, NULL, NULL); + if (IsBinaryUpgrade) return; diff --git a/src/spock_conflict.c b/src/spock_conflict.c index e79a519f..8debc1f2 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -343,47 +343,98 @@ spock_report_conflict(SpockConflictType conflict_type, Oid conflict_idx_oid) { char local_tup_ts_str[MAXDATELEN] = "(unset)"; + char local_origin_str[32]; StringInfoData localtup, remotetup; TupleDesc desc = RelationGetDescr(rel->rel); const char *idxname = "(unknown)"; const char *qualrelname; + bool save_in_resolutions = true; /* Ignore update-update conflict for same origin */ if (conflict_type == SPOCK_CT_UPDATE_EXISTS) { /* - * If updating a row that came from the same origin, do not report it - * as a conflict + * If updating a row that came from the same origin, + * do not report it as a conflict nor log */ if (local_tuple_origin == replorigin_session_origin) return; - /* If updated in the same transaction, do not report it as a conflict */ + /* If updated in the same transaction, do not report it as a conflict nor log */ if (local_tuple_origin == InvalidRepOriginId && TransactionIdEquals(local_tuple_xid, GetTopTransactionId())) return; /* Differing origin */ conflict_type = SPOCK_CT_UPDATE_ORIGIN_DIFFERS; + + if (resolution == SpockResolution_ApplyRemote) + { + /* + * Remote tuple wins — this is normal replication flow, not a true + * conflict. Do not write to spock.resolutions, but optionally + * log to the PostgreSQL log based on the GUC setting. + */ + save_in_resolutions = false; + + if (!found_local_origin) + return; + + if (log_origin_change == SPOCK_ORIGIN_NONE) + return; + + if (log_origin_change == SPOCK_ORIGIN_REMOTE_ONLY_DIFFERS && + local_tuple_origin == InvalidRepOriginId) + return; + + if (log_origin_change == SPOCK_ORIGIN_DIFFERS_SINCE_SUB) + { + /* Do not log if we do not have a sub creation timestamp */ + if (!MySubscription || MySubscription->created_at == 0) + return; + + /* If we could not determine the tuple timestamp, do not log */ + if (local_tuple_commit_ts == 0) + return; + + /* + * If the local tuple predates the subscription, it was + * loaded before replication was set up (e.g. pg_restore). + * Do not treat this as an origin-change conflict. + */ + if (local_tuple_commit_ts < MySubscription->created_at) + return; + } + } } - /* Count statistics */ - handle_stats_counter(rel->rel, MyApplyWorker->subid, - SPOCK_STATS_CONFLICT_COUNT, 1); - /* If configured log resolution to spock.resolutions table */ - spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, + if (save_in_resolutions) + { + /* Count statistics */ + handle_stats_counter(rel->rel, MyApplyWorker->subid, + SPOCK_STATS_CONFLICT_COUNT, 1); + + /* If configured log resolution to table */ + spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, remotetuple, applytuple, resolution, local_tuple_xid, found_local_origin, local_tuple_origin, local_tuple_commit_ts, conflict_idx_oid); + } memset(local_tup_ts_str, 0, MAXDATELEN); + strlcpy(local_origin_str, "unknown", sizeof(local_origin_str)); if (found_local_origin) + { strlcpy(local_tup_ts_str, timestamptz_to_str(local_tuple_commit_ts), MAXDATELEN); + if (local_tuple_origin != InvalidRepOriginId) + snprintf(local_origin_str, sizeof(local_origin_str), "%u", + (unsigned int) local_tuple_origin); + } initStringInfo(&remotetup); @@ -428,9 +479,9 @@ spock_report_conflict(SpockConflictType conflict_type, SpockConflictTypeNames[conflict_type], qualrelname, idxname, conflict_resolution_to_string(resolution)), - errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X", + errdetail("existing local tuple {%s} xid=%u,origin=%s,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X", localtup.data, local_tuple_xid, - found_local_origin ? (int) local_tuple_origin : -1, + local_origin_str, local_tup_ts_str, remotetup.data, replorigin_session_origin, @@ -572,7 +623,10 @@ spock_conflict_log_table(SpockConflictType conflict_type, /* conflict_resolution */ values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution)); /* local_origin */ - values[7] = Int32GetDatum(found_local_origin ? (int) local_tuple_origin : -1); + if (found_local_origin && local_tuple_origin != InvalidRepOriginId) + values[7] = Int32GetDatum((int) local_tuple_origin); + else + nulls[7] = true; /* local_tuple */ if (localtuple != NULL) @@ -580,7 +634,7 @@ spock_conflict_log_table(SpockConflictType conflict_type, Datum datum; datum = heap_copy_tuple_as_datum(localtuple, desc); - values[8] = spock_conflict_row_to_json(datum, false, &nulls[7]); + values[8] = spock_conflict_row_to_json(datum, false, &nulls[8]); } else nulls[8] = true; @@ -607,7 +661,7 @@ spock_conflict_log_table(SpockConflictType conflict_type, Datum datum; datum = heap_copy_tuple_as_datum(remotetuple, desc); - values[12] = spock_conflict_row_to_json(datum, false, &nulls[11]); + values[12] = spock_conflict_row_to_json(datum, false, &nulls[12]); } else nulls[12] = true; diff --git a/src/spock_node.c b/src/spock_node.c index 3c6df9e5..87112f9e 100644 --- a/src/spock_node.c +++ b/src/spock_node.c @@ -95,7 +95,7 @@ typedef struct SubscriptionTuple NameData sub_slot_name; } SubscriptionTuple; -#define Natts_subscription 14 +#define Natts_subscription 15 #define Anum_sub_id 1 #define Anum_sub_name 2 #define Anum_sub_origin 3 @@ -110,6 +110,7 @@ typedef struct SubscriptionTuple #define Anum_sub_force_text_transfer 12 #define Anum_sub_skip_lsn 13 #define Anum_sub_skip_schema 14 +#define Anum_sub_created_at 15 /* * List of extensions and schemas that we should skip globally. @@ -905,6 +906,8 @@ create_subscription(SpockSubscription *sub) else nulls[Anum_sub_skip_schema - 1] = true; + values[Anum_sub_created_at - 1] = TimestampTzGetDatum(GetCurrentTimestamp()); + tup = heap_form_tuple(tupDesc, values, nulls); /* Insert the tuple to the catalog. */ @@ -968,6 +971,7 @@ alter_subscription(SpockSubscription *sub) replaces[Anum_sub_id - 1] = false; replaces[Anum_sub_name - 1] = false; + replaces[Anum_sub_created_at - 1] = false; values[Anum_sub_origin - 1] = ObjectIdGetDatum(sub->origin_if->nodeid); values[Anum_sub_target - 1] = ObjectIdGetDatum(sub->target_if->nodeid); @@ -1133,6 +1137,13 @@ subscription_fromtuple(HeapTuple tuple, TupleDesc desc) sub->skip_schema = skip_schema_names; } + /* Get created_at. */ + d = heap_getattr(tuple, Anum_sub_created_at, desc, &isnull); + if (isnull) + sub->created_at = 0; + else + sub->created_at = DatumGetTimestampTz(d); + return sub; } diff --git a/tests/regress/expected/tuple_origin.out b/tests/regress/expected/tuple_origin.out index 7a7cbd2b..adf54e2c 100644 --- a/tests/regress/expected/tuple_origin.out +++ b/tests/regress/expected/tuple_origin.out @@ -257,12 +257,11 @@ SELECT * FROM basic_conflict ORDER BY id; 2 | B (2 rows) --- We should now see a conflict +-- Origin changes are no longer saved to resolutions SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; - relname | conflict_type ------------------------+----------------------- - public.basic_conflict | update_origin_differs -(1 row) + relname | conflict_type +---------+--------------- +(0 rows) -- Clean TRUNCATE spock.resolutions; diff --git a/tests/regress/sql/tuple_origin.sql b/tests/regress/sql/tuple_origin.sql index e925e7c0..ccd30fe6 100644 --- a/tests/regress/sql/tuple_origin.sql +++ b/tests/regress/sql/tuple_origin.sql @@ -150,7 +150,7 @@ SELECT spock.wait_slot_confirm_lsn(NULL, NULL); \c :subscriber_dsn SELECT * FROM basic_conflict ORDER BY id; --- We should now see a conflict +-- Origin changes are no longer saved to resolutions SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; -- Clean diff --git a/tests/tap/schedule b/tests/tap/schedule index 2ed763bf..fa272635 100644 --- a/tests/tap/schedule +++ b/tests/tap/schedule @@ -17,6 +17,9 @@ test: 009_zodan_add_remove_nodes test: 010_zodan_add_remove_python test: 012_zodan_basics +test: 013_origin_change_restore +test: 014_pgdump_restore_conflict + # Tests, consuming too much time to be launched on each check: #test: 011_zodan_sync_third # diff --git a/tests/tap/t/013_origin_change_restore.pl b/tests/tap/t/013_origin_change_restore.pl new file mode 100644 index 00000000..12725b28 --- /dev/null +++ b/tests/tap/t/013_origin_change_restore.pl @@ -0,0 +1,226 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use SpockTest qw(create_cluster destroy_cluster system_or_bail command_ok + get_test_config scalar_query psql_or_bail); +use Cwd; + +# ============================================================================= +# Test: 013_origin_change_restore.pl - Origin-change logging after pg_restore +# ============================================================================= +# Verify that: +# 1. pg_restored data (predating subscription) does NOT produce origin-change +# conflict log entries when log_origin_change = 'since_sub_creation' +# 2. Post-subscription origin changes ARE logged (since_sub_creation) +# 3. Mode 'none' suppresses all origin-change logging +# 4. Mode 'remote_only_differs' suppresses logging for locally-written tuples +# +# Origin-change conflicts are written to the PostgreSQL log (not spock.resolutions), +# so all assertions check the log file content. + +# ---- helpers ---------------------------------------------------------------- + +sub get_log_size { + my ($logfile) = @_; + return -s $logfile || 0; +} + +sub get_log_content_since { + my ($logfile, $offset) = @_; + open my $fh, '<', $logfile or die "Cannot open $logfile: $!"; + seek $fh, $offset, 0; + local $/; + my $content = <$fh>; + close $fh; + return $content // ''; +} + +sub wait_for_value { + my ($node, $query, $expected, $timeout) = @_; + $timeout //= 30; + for my $i (1..($timeout * 10)) { + my $result = scalar_query($node, $query); + return $result if $result eq $expected; + system_or_bail 'sleep', '0.1' if $i < $timeout * 10; + } + return scalar_query($node, $query); +} + +# ---- cluster setup (5 TAP tests emitted by create_cluster) ----------------- +create_cluster(2, 'Create 2-node cluster for origin-change restore test'); + +my $config = get_test_config(); +my $node_ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; + +# Compute path to node 2's PostgreSQL log +my $parent_dir = Cwd::abs_path(getcwd() . "/.."); +my $n2_logfile = "$parent_dir/tap/logs/00$node_ports->[1].log"; + +# Verify both nodes are present +my $n1 = scalar_query(1, "SELECT EXISTS (SELECT 1 FROM spock.node WHERE node_name = 'n1')"); +is($n1, 't', 'Node n1 exists'); + +my $n2 = scalar_query(2, "SELECT EXISTS (SELECT 1 FROM spock.node WHERE node_name = 'n2')"); +is($n2, 't', 'Node n2 exists'); + +# ---- Step 1: create table + seed data on node 1 ---------------------------- +psql_or_bail(1, "CREATE TABLE test_origin (id INTEGER PRIMARY KEY, data TEXT)"); +psql_or_bail(1, "INSERT INTO test_origin VALUES (1, 'row-A'), (2, 'row-B'), (3, 'row-C')"); + +my $count_n1 = scalar_query(1, "SELECT COUNT(*) FROM test_origin"); +is($count_n1, '3', 'Node 1 has 3 rows after INSERT'); + +# ---- Step 2: pg_dump node 1 ------------------------------------------------ +my $dump_file = "/tmp/test_origin_dump_$$.dump"; +system_or_bail("$pg_bin/pg_dump", '-p', $node_ports->[0], + '-d', $dbname, '-t', 'test_origin', '-Fc', '-f', $dump_file); +pass('pg_dump from node 1 succeeded'); + +# ---- Step 3: pg_restore to node 2 ------------------------------------------ +system_or_bail("$pg_bin/pg_restore", '-p', $node_ports->[1], + '-d', $dbname, $dump_file); +pass('pg_restore to node 2 succeeded'); + +my $count_n2 = scalar_query(2, "SELECT COUNT(*) FROM test_origin"); +is($count_n2, '3', 'Node 2 has 3 rows after pg_restore'); + +# ---- Step 4: enable GUC on node 2 ------------------------------------------ +psql_or_bail(2, "ALTER SYSTEM SET spock.log_origin_change = 'since_sub_creation'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +pass('Enabled log_origin_change = since_sub_creation on node 2'); + +# ---- Step 5: create repset + subscription ----------------------------------- +psql_or_bail(1, "SELECT spock.repset_create('origin_test_repset')"); +psql_or_bail(1, "SELECT spock.repset_add_table('origin_test_repset', 'test_origin')"); +pass('Created repset and added table on node 1'); + +my $conn_string = "host=$host dbname=$dbname port=$node_ports->[0] " + . "user=$db_user password=$db_password"; +psql_or_bail(2, + "SELECT spock.sub_create('sub_origin_test', " + . "'$conn_string', " + . "ARRAY['origin_test_repset'], false, false)"); + +my $sub_exists = scalar_query(2, + "SELECT EXISTS (SELECT 1 FROM spock.subscription " + . "WHERE sub_name = 'sub_origin_test')"); +is($sub_exists, 't', 'Subscription sub_origin_test created'); + +# Wait for the subscription to become active (poll up to 30s) +psql_or_bail(2, "DO \$\$ +BEGIN + FOR i IN 1..300 LOOP + IF EXISTS (SELECT 1 FROM spock.sub_show_status() + WHERE subscription_name = 'sub_origin_test' + AND status = 'replicating') THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; + RAISE EXCEPTION 'subscription did not reach replicating state'; +END; +\$\$"); + +# ============================================================================= +# Test 1: since_sub_creation — pg_restored data should NOT produce log entry +# ============================================================================= +my $log_pos = get_log_size($n2_logfile); + +psql_or_bail(1, "UPDATE test_origin SET data = 'row-A-updated' WHERE id = 1"); + +my $val = wait_for_value(2, + "SELECT data FROM test_origin WHERE id = 1", + 'row-A-updated'); +is($val, 'row-A-updated', 'UPDATE replicated to node 2'); + +# Allow time for any log flushing +system_or_bail 'sleep', '2'; +my $log_content = get_log_content_since($n2_logfile, $log_pos); +unlike($log_content, qr/CONFLICT:.*remote update_origin_differs on relation public\.test_origin/, + 'since_sub_creation: no conflict logged for pg_restored data'); + +# ============================================================================= +# Test 2: since_sub_creation — post-subscription data SHOULD produce log entry +# ============================================================================= +# Update locally on node 2 to change origin (becomes locally-written, +# with a commit timestamp after subscription creation) +psql_or_bail(2, "UPDATE test_origin SET data = 'local-1' WHERE id = 1"); + +$log_pos = get_log_size($n2_logfile); + +psql_or_bail(1, "UPDATE test_origin SET data = 'from-n1-v2' WHERE id = 1"); +$val = wait_for_value(2, + "SELECT data FROM test_origin WHERE id = 1", + 'from-n1-v2'); +BAIL_OUT("UPDATE did not replicate") unless $val eq 'from-n1-v2'; + +system_or_bail 'sleep', '2'; +$log_content = get_log_content_since($n2_logfile, $log_pos); +like($log_content, qr/CONFLICT:.*remote update_origin_differs on relation public\.test_origin/, + 'since_sub_creation: conflict logged for post-subscription data'); + +# ============================================================================= +# Test 3: mode 'none' — suppresses all origin-change logging +# ============================================================================= +psql_or_bail(2, "ALTER SYSTEM SET spock.log_origin_change = 'none'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); + +# Update locally to change origin +psql_or_bail(2, "UPDATE test_origin SET data = 'local-2' WHERE id = 2"); + +$log_pos = get_log_size($n2_logfile); + +psql_or_bail(1, "UPDATE test_origin SET data = 'from-n1-v2b' WHERE id = 2"); +$val = wait_for_value(2, + "SELECT data FROM test_origin WHERE id = 2", + 'from-n1-v2b'); +BAIL_OUT("UPDATE did not replicate") unless $val eq 'from-n1-v2b'; + +system_or_bail 'sleep', '2'; +$log_content = get_log_content_since($n2_logfile, $log_pos); +unlike($log_content, qr/CONFLICT:.*remote update_origin_differs on relation public\.test_origin/, + 'none: no conflict logged'); + +# ============================================================================= +# Test 4: remote_only_differs — locally-written tuple NOT logged +# ============================================================================= +psql_or_bail(2, "ALTER SYSTEM SET spock.log_origin_change = 'remote_only_differs'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); + +# Update locally so local tuple has origin = InvalidRepOriginId +psql_or_bail(2, "UPDATE test_origin SET data = 'local-3' WHERE id = 3"); + +$log_pos = get_log_size($n2_logfile); + +psql_or_bail(1, "UPDATE test_origin SET data = 'from-n1-v3' WHERE id = 3"); +$val = wait_for_value(2, + "SELECT data FROM test_origin WHERE id = 3", + 'from-n1-v3'); +BAIL_OUT("UPDATE did not replicate") unless $val eq 'from-n1-v3'; + +system_or_bail 'sleep', '2'; +$log_content = get_log_content_since($n2_logfile, $log_pos); +unlike($log_content, qr/CONFLICT:.*remote update_origin_differs on relation public\.test_origin/, + 'remote_only_differs: no conflict logged for locally-written tuple'); + +# ---- teardown --------------------------------------------------------------- +psql_or_bail(2, "SELECT spock.sub_drop('sub_origin_test')"); + +# Wait for subscription to be fully removed (poll up to 10s) +for my $i (1..100) { + my $still_exists = scalar_query(2, + "SELECT EXISTS (SELECT 1 FROM spock.subscription " + . "WHERE sub_name = 'sub_origin_test')"); + last if $still_exists eq 'f'; + system_or_bail 'sleep', '0.1' if $i < 100; +} + +destroy_cluster('Destroy 2-node origin-change restore test cluster'); + +done_testing(); diff --git a/tests/tap/t/014_pgdump_restore_conflict.pl b/tests/tap/t/014_pgdump_restore_conflict.pl new file mode 100755 index 00000000..7c11a6da --- /dev/null +++ b/tests/tap/t/014_pgdump_restore_conflict.pl @@ -0,0 +1,343 @@ +use strict; +use warnings; +use Test::More tests => 20; +use lib '.'; +use SpockTest qw(create_cluster destroy_cluster system_or_bail command_ok get_test_config scalar_query psql_or_bail); + + +# ============================================================================= +# Test: 014_pgdump_restore_conflict.pl - pg_dump/restore Conflict Scenario +# ============================================================================= +# This test reproduces a common customer setup pattern: +# +# 1. Customer has existing PostgreSQL database +# 2. They want to set up multi-master replication +# 3. They use pg_dump/pg_restore to initialize the subscriber +# 4. They create subscription after restore +# 5. They observe conflicts even though data is identical +# +# Root Cause: +# - Rows loaded via pg_dump have no replication origin tracking (origin = NULL) +# - When updates arrive via replication, spock detects "conflict" because +# the local row exists but has no origin +# - This causes UPDATE_UPDATE conflicts with identical data +# +# This test verifies: +# - pg_restored rows predate the subscription +# - Origin-change conflicts are suppressed for pre-subscription data +# - Data replicates correctly without conflicts +# - Subsequent updates also work without conflicts +# ============================================================================= + +# Create 2-node cluster +create_cluster(2, 'Create 2-node cluster for pg_dump/restore test'); + +my $config = get_test_config(); +my $node_ports = $config->{node_ports}; +my $node_datadirs = $config->{node_datadirs}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; + +# ---- log helpers ------------------------------------------------------------- + +sub get_log_size { + my ($logfile) = @_; + return -s $logfile || 0; +} + +sub get_log_content_since { + my ($logfile, $offset) = @_; + open my $fh, '<', $logfile or die "Cannot open $logfile: $!"; + seek $fh, $offset, 0; + local $/; + my $content = <$fh>; + close $fh; + return $content // ''; +} + +sub wait_for_value { + my ($node, $query, $expected, $timeout) = @_; + $timeout //= 30; + for my $i (1..($timeout * 10)) { + my $result = scalar_query($node, $query); + return $result if $result eq $expected; + system_or_bail 'sleep', '0.1' if $i < $timeout * 10; + } + return scalar_query($node, $query); +} + +# Compute path to node 2's PostgreSQL log (logging_collector writes to /logs/) +my $n2_logfile = "$node_datadirs->[1]/logs/00$node_ports->[1].log"; + +# ============================================================================= +# PART 1: Setup - Create data on node1 (publisher/writer) +# ============================================================================= + +diag("=== Part 1: Create initial data on publisher (node1) ==="); + +# Create test table on node1 only (no DDL replication yet) +psql_or_bail(1, " + CREATE TABLE customer_data ( + id SERIAL PRIMARY KEY, + name VARCHAR(100) NOT NULL, + email VARCHAR(100) UNIQUE, + balance DECIMAL(10,2) DEFAULT 0, + updated_at TIMESTAMP DEFAULT now() + ) +"); +pass('Test table created on node1'); + +# Insert initial data +psql_or_bail(1, " + INSERT INTO customer_data (name, email, balance) VALUES + ('Alice', 'alice\@example.com', 1000.00), + ('Bob', 'bob\@example.com', 2500.50), + ('Charlie', 'charlie\@example.com', 750.25), + ('Diana', 'diana\@example.com', 3200.00), + ('Eve', 'eve\@example.com', 150.75) +"); +pass('Initial data inserted on node1'); + +my $node1_count = scalar_query(1, "SELECT COUNT(*) FROM customer_data"); +is($node1_count, '5', 'Node1 has 5 rows'); + +# ============================================================================= +# PART 2: pg_dump from node1, pg_restore to node2 +# ============================================================================= + +diag("=== Part 2: pg_dump/pg_restore to subscriber (node2) ==="); + +# pg_dump the table from node1 +my $dump_file = "/tmp/customer_data_dump.sql"; +system_or_bail("$pg_bin/pg_dump", + "-h", $host, + "-p", $node_ports->[0], + "-U", $db_user, + "-d", $dbname, + "-t", "customer_data", + "--no-owner", + "--no-acl", + "-f", $dump_file +); +pass('pg_dump completed from node1'); + +# pg_restore (actually psql since it's plain SQL) to node2 +system_or_bail("$pg_bin/psql", + "-h", $host, + "-p", $node_ports->[1], + "-U", $db_user, + "-d", $dbname, + "-f", $dump_file +); +pass('pg_restore completed to node2'); + +# Verify data exists on node2 +my $node2_count = scalar_query(2, "SELECT COUNT(*) FROM customer_data"); +is($node2_count, '5', 'Node2 has 5 rows after pg_restore'); + +# Verify data is identical +my $node1_checksum = scalar_query(1, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +my $node2_checksum = scalar_query(2, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +is($node1_checksum, $node2_checksum, 'Data checksum matches between nodes'); + +# ============================================================================= +# PART 3: Verify pg_restore rows have no origin tracking +# ============================================================================= + +diag("=== Part 3: Verify rows have no origin tracking ==="); + +# Check commit timestamps exist (track_commit_timestamp should be on) +my $has_commit_ts = scalar_query(2, "SHOW track_commit_timestamp"); +is($has_commit_ts, 'on', 'track_commit_timestamp is enabled'); + +# Get commit timestamps of rows on node2 +# Rows from pg_restore will have timestamps clustered around restore time +my $row_timestamps = scalar_query(2, " + SELECT COUNT(DISTINCT date_trunc('second', pg_xact_commit_timestamp(xmin))) + FROM customer_data +"); +diag("Distinct commit timestamp seconds for pg_restore rows: $row_timestamps"); +ok($row_timestamps <= 2, 'pg_restore rows have clustered commit timestamps (same transaction)'); + +# ============================================================================= +# PART 4: Create subscription AFTER pg_restore +# ============================================================================= + +diag("=== Part 4: Create subscription after pg_restore ==="); + +# Check if table is already in replication set (DDL replication may have added it) +my $in_repset = scalar_query(1, + "SELECT EXISTS (SELECT 1 FROM spock.replication_set_table WHERE set_reloid = 'customer_data'::regclass)" +); +if ($in_repset eq 't') { + pass('Table already in replication set (auto-added by DDL replication)'); +} else { + psql_or_bail(1, "SELECT spock.repset_add_table('default', 'customer_data')"); + pass('Table added to replication set on node1'); +} + +# Create subscription on node2 -> node1 +my $conn_string = "host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password"; +psql_or_bail(2, " + SELECT spock.sub_create( + 'pgdump_test_sub', + '$conn_string', + ARRAY['default'], + synchronize_structure := false, + synchronize_data := false + ) +"); +pass('Subscription created on node2 (no sync - data already exists from pg_restore)'); + +# Wait for subscription to be ready +system_or_bail 'sleep', '5'; + +my $sub_status = scalar_query(2, "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'pgdump_test_sub'"); +is($sub_status, 't', 'Subscription is enabled'); + +# ============================================================================= +# PART 5: Make updates on node1 and verify conflicts +# ============================================================================= + +diag("=== Part 5: Make updates on publisher, verify conflicts ==="); + +# Configure conflict logging and origin-change detection +psql_or_bail(2, "ALTER SYSTEM SET spock.conflict_log_level = 'LOG'"); +psql_or_bail(2, "ALTER SYSTEM SET spock.conflict_resolution = 'last_update_wins'"); +psql_or_bail(2, "ALTER SYSTEM SET spock.log_origin_change = 'since_sub_creation'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); + +# Need to restart apply worker for GUC changes to take effect +psql_or_bail(2, "SELECT spock.sub_disable('pgdump_test_sub')"); +system_or_bail 'sleep', '2'; +psql_or_bail(2, "SELECT spock.sub_enable('pgdump_test_sub', true)"); +system_or_bail 'sleep', '3'; + +# Record log position before updates +my $log_pos = get_log_size($n2_logfile); + +# Make updates on node1 (publisher) +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 100 WHERE id = 1"); +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 200 WHERE id = 2"); +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 50 WHERE id = 3"); +pass('Updates executed on node1'); + +# Wait for replication by polling for data +my $alice_balance_n1 = scalar_query(1, "SELECT balance FROM customer_data WHERE id = 1"); +my $alice_balance_n2 = wait_for_value(2, + "SELECT balance FROM customer_data WHERE id = 1", + $alice_balance_n1); +is($alice_balance_n1, $alice_balance_n2, 'Balance replicated correctly (data matches)'); + +# ============================================================================= +# PART 6: Verify conflicts occurred with local_origin = NULL +# ============================================================================= + +diag("=== Part 6: Verify NO conflicts for pg_restored (pre-subscription) data ==="); + +# Allow time for log flushing +system_or_bail 'sleep', '2'; +my $log_content = get_log_content_since($n2_logfile, $log_pos); + +# pg_restored rows predate the subscription, so origin-change conflicts should +# be suppressed (skip pre-subscription origin-change conflicts) +unlike($log_content, qr/CONFLICT:.*update_origin_differs on relation public\.customer_data/, + 'No origin-change conflicts logged for pg_restored (pre-subscription) data'); + +# Verify data was applied without conflict (rows updated correctly) +my $bob_balance_n1 = scalar_query(1, "SELECT balance FROM customer_data WHERE id = 2"); +my $bob_balance_n2 = wait_for_value(2, + "SELECT balance FROM customer_data WHERE id = 2", + $bob_balance_n1); +is($bob_balance_n1, $bob_balance_n2, 'Bob balance replicated correctly'); + +my $charlie_balance_n1 = scalar_query(1, "SELECT balance FROM customer_data WHERE id = 3"); +my $charlie_balance_n2 = wait_for_value(2, + "SELECT balance FROM customer_data WHERE id = 3", + $charlie_balance_n1); +is($charlie_balance_n1, $charlie_balance_n2, 'Charlie balance replicated correctly'); + +pass('Pre-subscription data updates applied without conflicts'); + + +# ============================================================================= +# PART 7: Verify post-subscription origin change IS logged +# ============================================================================= + +diag("=== Part 7: Verify post-subscription origin change IS logged ==="); + +# Update locally on node 2 to change the origin (becomes locally-written, +# with a commit timestamp after subscription creation) +psql_or_bail(2, "UPDATE customer_data SET balance = balance + 1 WHERE id = 4"); + +my $log_pos_part7 = get_log_size($n2_logfile); + +# Now update the same row from node 1 — this should trigger an origin-change conflict +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 500 WHERE id = 4"); +my $diana_balance_n1 = scalar_query(1, "SELECT balance FROM customer_data WHERE id = 4"); +my $diana_balance_n2 = wait_for_value(2, + "SELECT balance FROM customer_data WHERE id = 4", + $diana_balance_n1); +is($diana_balance_n1, $diana_balance_n2, 'Post-subscription update replicated correctly'); + +system_or_bail 'sleep', '2'; +my $log_part7 = get_log_content_since($n2_logfile, $log_pos_part7); +like($log_part7, qr/CONFLICT:.*update_origin_differs on relation public\.customer_data/, + 'Origin-change conflict IS logged for post-subscription local update'); + +# ============================================================================= +# PART 8: Verify data is identical despite conflicts +# ============================================================================= + +diag("=== Part 8: Verify data is identical after all updates ==="); + +# Final data verification +my $final_checksum_n1 = scalar_query(1, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +my $final_checksum_n2 = scalar_query(2, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +is($final_checksum_n1, $final_checksum_n2, 'Final data checksum matches - replication worked correctly'); + +# ============================================================================= +# PART 9: Verify subsequent updates don't cause conflicts +# ============================================================================= + +diag("=== Part 9: Verify subsequent replicated updates don't cause new conflicts ==="); + +# Record log position before the subsequent update +my $log_pos_part9 = get_log_size($n2_logfile); + +# Make another update on node1 +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 10 WHERE id = 1"); + +# Wait for replication +my $expected_bal = scalar_query(1, "SELECT balance FROM customer_data WHERE id = 1"); +wait_for_value(2, "SELECT balance FROM customer_data WHERE id = 1", $expected_bal); + +# Check if new conflict occurred in the log +# After the first update, the row should have proper origin, so no conflict +system_or_bail 'sleep', '2'; +my $log_part9 = get_log_content_since($n2_logfile, $log_pos_part9); +unlike($log_part9, qr/CONFLICT:.*update_origin_differs on relation public\.customer_data/, + 'No new conflicts after row was updated via replication (origin now tracked)'); + +# ============================================================================= +# CLEANUP +# ============================================================================= + +# Drop the subscription we created (destroy_cluster expects standard naming) +psql_or_bail(2, "SELECT spock.sub_drop('pgdump_test_sub')"); + +# Clean up dump file +unlink $dump_file if -e $dump_file; + +destroy_cluster('Cleanup pg_dump/restore conflict test cluster'); diff --git a/tests/tap/t/100_progress_period.pl b/tests/tap/t/100_progress_period.pl new file mode 100755 index 00000000..7d2a8789 --- /dev/null +++ b/tests/tap/t/100_progress_period.pl @@ -0,0 +1,200 @@ +#!/usr/bin/env perl +# +# 100_progress_period.pl - Measure replication catch-up time and progress +# table update frequency under pgbench load. +# +# This test builds up a WAL backlog by disabling the subscription during +# a pgbench run, then re-enables it and measures pure catch-up time. +# With deferred spock.progress writes, catch-up should be faster because +# the apply worker spends less time on catalog I/O. +# +# Run manually: +# cd tests/tap/t && prove -v 100_progress_period.pl +# +# Tunable via environment variables: +# PGBENCH_SCALE - pgbench scale factor (default: 10) +# PGBENCH_CLIENTS - number of pgbench clients (default: 32) +# PGBENCH_JOBS - number of pgbench threads (default: 32) +# PGBENCH_TIME - pgbench duration in seconds (default: 30) +# +use strict; +use warnings; +use Test::More tests => 14; +use Time::HiRes qw(gettimeofday tv_interval); +use Carp; +use lib '.'; +use SpockTest qw(create_cluster destroy_cluster system_or_bail + command_ok get_test_config scalar_query psql_or_bail); + +$SIG{__DIE__} = sub { Carp::confess @_ }; +$SIG{INT} = sub { die("interrupted by SIGINT") }; + +my $PGBENCH_SCALE = $ENV{PGBENCH_SCALE} // 10; +my $PGBENCH_CLIENTS = $ENV{PGBENCH_CLIENTS} // 32; +my $PGBENCH_JOBS = $ENV{PGBENCH_JOBS} // 32; +my $PGBENCH_TIME = $ENV{PGBENCH_TIME} // 30; + +# ── 1. Create a 2-node cluster ────────────────────────────────────────── + +create_cluster(2, 'Create 2-node progress-period test cluster'); + +my $config = get_test_config(); +my $ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_pass = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; + +my $provider_port = $ports->[0]; +my $subscriber_port = $ports->[1]; +my $provider_connstr = + "host=$host dbname=$dbname port=$provider_port user=$db_user password=$db_pass"; + +# ── 2. Initialise pgbench on provider only ────────────────────────────── + +note "initializing pgbench with scale=$PGBENCH_SCALE ..."; +system_or_bail "$pg_bin/pgbench", '-i', '-s', $PGBENCH_SCALE, + '-h', $host, '-p', $provider_port, '-U', $db_user, $dbname; +pass('pgbench initialized on provider'); + +# ── 3. Create subscription and wait for initial sync ──────────────────── + +system_or_bail "$pg_bin/psql", '-p', $subscriber_port, '-d', $dbname, '-c', + "SELECT spock.sub_create( + 'test_sub', + '$provider_connstr', + ARRAY['default', 'default_insert_only'], + true, + true + )"; + +my $max_wait = 120; +my $replicating = 0; +for (1 .. $max_wait) { + my $status = `$pg_bin/psql -p $subscriber_port -d $dbname -t -c "SELECT status FROM spock.sub_show_status() WHERE subscription_name = 'test_sub'"`; + chomp $status; + $status =~ s/\s+//g; + if ($status eq 'replicating') { + $replicating = 1; + last; + } + sleep 1; +} +BAIL_OUT('subscription never reached replicating state') unless $replicating; +pass('subscription is replicating'); + +# Let initial sync settle and confirm data matches before we start +system_or_bail "$pg_bin/psql", '-p', $provider_port, '-d', $dbname, + '-c', "SET statement_timeout = '60s'; SELECT spock.wait_slot_confirm_lsn(NULL, NULL)"; + +# ── 4. Disable subscription to build up a backlog ─────────────────────── + +system_or_bail "$pg_bin/psql", '-p', $subscriber_port, '-d', $dbname, '-c', + "SELECT spock.sub_disable('test_sub')"; + +# Wait for subscription to go down +for (1 .. 30) { + my $status = `$pg_bin/psql -p $subscriber_port -d $dbname -t -c "SELECT status FROM spock.sub_show_status() WHERE subscription_name = 'test_sub'"`; + chomp $status; + $status =~ s/\s+//g; + last if $status eq 'down'; + sleep 1; +} +note "subscription disabled, building WAL backlog ..."; + +# ── 5. Run pgbench while subscription is disabled ─────────────────────── + +note "running pgbench: scale=$PGBENCH_SCALE clients=$PGBENCH_CLIENTS " + . "jobs=$PGBENCH_JOBS time=${PGBENCH_TIME}s"; + +system_or_bail "$pg_bin/pgbench", + '-T', $PGBENCH_TIME, + '-c', $PGBENCH_CLIENTS, + '-j', $PGBENCH_JOBS, + '-s', $PGBENCH_SCALE, + '-h', $host, '-p', $provider_port, '-U', $db_user, $dbname; +pass('pgbench completed (backlog built)'); + +# Snapshot provider row counts now (pgbench is done, no more changes) +my %provider_counts; +for my $tbl (qw(pgbench_accounts pgbench_tellers pgbench_history)) { + $provider_counts{$tbl} = scalar_query(1, "SELECT count(*) FROM $tbl"); +} +note "provider pgbench_history rows: $provider_counts{pgbench_history}"; + +# ── 6. Reset stats, re-enable subscription, and time the catch-up ─────── + +system_or_bail "$pg_bin/psql", '-p', $subscriber_port, '-d', $dbname, '-c', + "SELECT pg_stat_reset()"; +sleep 2; + +my $upd_before = `$pg_bin/psql -p $subscriber_port -d $dbname -t -c "SELECT COALESCE(n_tup_upd, 0) FROM pg_stat_user_tables WHERE schemaname = 'spock' AND relname = 'progress'"`; +chomp $upd_before; +$upd_before =~ s/\s+//g; +$upd_before = int($upd_before || 0); + +# Re-enable subscription -- this is where catch-up begins +my $t_catchup = [gettimeofday()]; + +system_or_bail "$pg_bin/psql", '-p', $subscriber_port, '-d', $dbname, '-c', + "SELECT spock.sub_enable('test_sub')"; + +# Wait for it to come back up +for (1 .. 60) { + my $status = `$pg_bin/psql -p $subscriber_port -d $dbname -t -c "SELECT status FROM spock.sub_show_status() WHERE subscription_name = 'test_sub'"`; + chomp $status; + $status =~ s/\s+//g; + last if $status eq 'replicating'; + sleep 1; +} + +# Now wait for the slot to confirm it has caught up to the provider's +# current WAL position +system_or_bail "$pg_bin/psql", '-p', $provider_port, '-d', $dbname, + '-c', "SET statement_timeout = '300s'; SELECT spock.wait_slot_confirm_lsn(NULL, NULL)"; + +my $catchup_secs = tv_interval($t_catchup); +note sprintf("catch-up time: %.2f seconds", $catchup_secs); +pass('subscriber caught up'); + +# ── 7. Verify row counts match ────────────────────────────────────────── + +for my $tbl (qw(pgbench_accounts pgbench_tellers pgbench_history)) { + my $cnt_sub = scalar_query(2, "SELECT count(*) FROM $tbl"); + is($cnt_sub, $provider_counts{$tbl}, + "$tbl row counts match ($provider_counts{$tbl} rows)"); +} + +# ── 8. Check progress table update frequency ──────────────────────────── + +sleep 2; + +my $upd_after = `$pg_bin/psql -p $subscriber_port -d $dbname -t -c "SELECT COALESCE(n_tup_upd, 0) FROM pg_stat_user_tables WHERE schemaname = 'spock' AND relname = 'progress'"`; +chomp $upd_after; +$upd_after =~ s/\s+//g; +$upd_after = int($upd_after || 0); + +my $progress_updates = $upd_after - $upd_before; + +note "============================================================"; +note sprintf(" catch-up time: %.2f s", $catchup_secs); +note sprintf(" spock.progress updates: %d", $progress_updates); +if ($catchup_secs > 0) { + note sprintf(" updates per second: %.1f", $progress_updates / $catchup_secs); +} +note sprintf(" transactions replayed: %s", $provider_counts{pgbench_history}); +note "============================================================"; + +# With deferred writes we expect roughly 1 update/sec, not 1 per xact. +my $max_expected = $catchup_secs * 3 + 10; +ok($progress_updates <= $max_expected, + "progress table updates ($progress_updates) within expected range (<= $max_expected)"); + +# ── 9. Cleanup ────────────────────────────────────────────────────────── + +system_or_bail "$pg_bin/psql", '-p', $subscriber_port, '-d', $dbname, '-c', + "SELECT spock.sub_drop('test_sub')"; + +destroy_cluster('Destroy 2-node progress-period test cluster'); +#done_testing();