Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/configuring.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,28 @@ adjustment to which replication set the table is part of. Setting a table to
unlogged will remove it from replication. Detaching a partition will not
remove it from replication.

#### `spock.log_origin_change`

`spock.log_origin_change` indicates whether changes to a row's
origin should be logged to the PostgreSQL log. Rows may be being updated
locally by regular SQL operations, or by replication from apply workers.
Note that rows that are changed locally (not from replication) have the
origin value of 0.

The default of `none` is recommended because otherwise the amount of entries
may become numerous. The other options allow for monitoring when updates
occur outside expected patterns.

The following configuration values are possible:

* `none` (the default)- do not log any origin change information
* `remote_only_differs`- only log origin changes when the existing row
was from one remote publisher and was changed by another
remote publisher
* `since_sub_creation`- log origin changes whether a publisher changed
a row that was previously from another publisher or updated it locally,
but only since the time when the subscription was created.

### `spock.save_resolutions`

`spock.save_resolutions` is a boolean value (the default is `false`) that
Expand Down
1 change: 1 addition & 0 deletions include/spock.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ extern int restart_delay_on_exception;
extern int spock_replay_queue_size; /* Deprecated - no longer used */
extern bool check_all_uc_indexes;
extern bool spock_enable_quiet_mode;
extern int log_origin_change;

extern char *shorten_hash(const char *str, int maxlen);
extern void gen_slot_name(Name slot_name, char *dbname,
Expand Down
20 changes: 20 additions & 0 deletions include/spock_conflict.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ extern int spock_conflict_resolver;
extern int spock_conflict_log_level;
extern bool spock_save_resolutions;

typedef enum
{
/* do not log */
SPOCK_ORIGIN_NONE,
/* log if remote change (ignore local origin, independent of time) */
SPOCK_ORIGIN_REMOTE_ONLY_DIFFERS,
/* log only if origin changed since start of subscription */
SPOCK_ORIGIN_DIFFERS_SINCE_SUB
} SpockSaveOriginConflictOption;


extern bool spock_tuple_find_replidx(ResultRelInfo *relinfo,
SpockTupleData *tuple,
TupleTableSlot *oldslot,
Oid *idxrelid);

extern Oid spock_tuple_find_conflict(ResultRelInfo *relinfo,
SpockTupleData *tuple,
TupleTableSlot *oldslot);

extern bool get_tuple_origin(SpockRelation *rel, HeapTuple local_tuple,
ItemPointer tid, TransactionId *xmin,
RepOriginId *local_origin, TimestampTz *local_ts);
Expand Down
1 change: 1 addition & 0 deletions include/spock_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct SpockSubscription
XLogRecPtr skiplsn; /* All changes finished at this LSN are
* skipped */
List *skip_schema; /* Array of schema names to skip */
TimestampTz created_at; /* When this subscription was created */
} SpockSubscription;

/* NULL-terminated arrays */
Expand Down
2 changes: 2 additions & 0 deletions sql/spock--5.0.5--5.0.6.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE spock.subscription
ADD COLUMN sub_created_at timestamptz;
3 changes: 2 additions & 1 deletion sql/spock--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ CREATE TABLE spock.subscription (
sub_apply_delay interval NOT NULL DEFAULT '0',
sub_force_text_transfer boolean NOT NULL DEFAULT 'f',
sub_skip_lsn pg_lsn NOT NULL DEFAULT '0/0',
sub_skip_schema text[]
sub_skip_schema text[],
sub_created_at timestamptz
);
-- Source for sub_id values.
CREATE SEQUENCE spock.sub_id_generator AS integer MINVALUE 1 CYCLE START WITH 1 OWNED BY spock.subscription.sub_id;
Expand Down
17 changes: 17 additions & 0 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ static const struct config_enum_entry SpockConflictResolvers[] = {
{NULL, 0, false}
};

static const struct config_enum_entry SpockOriginConflicts[] = {
{"none", SPOCK_ORIGIN_NONE, false},
{"remote_only_differs", SPOCK_ORIGIN_REMOTE_ONLY_DIFFERS, false},
{"since_sub_creation", SPOCK_ORIGIN_DIFFERS_SINCE_SUB, false},
{NULL, 0, false}
};

/* copied fom guc.c */
static const struct config_enum_entry server_message_level_options[] = {
{"debug", DEBUG2, true},
Expand Down Expand Up @@ -135,6 +142,7 @@ int restart_delay_on_exception;
int spock_replay_queue_size; /* Deprecated - no longer used */
bool check_all_uc_indexes = false;
bool spock_enable_quiet_mode = false;
int log_origin_change = SPOCK_ORIGIN_NONE;

static emit_log_hook_type prev_emit_log_hook = NULL;
static Checkpoint_hook_type prev_Checkpoint_hook = NULL;
Expand Down Expand Up @@ -1186,6 +1194,15 @@ _PG_init(void)
NULL,
NULL);

DefineCustomEnumVariable("spock.log_origin_change",
gettext_noop("If set, log when the origin of a tuple changes."),
NULL,
&log_origin_change,
SPOCK_ORIGIN_NONE,
SpockOriginConflicts,
PGC_SUSET, 0,
NULL, NULL, NULL);

if (IsBinaryUpgrade)
return;

Expand Down
80 changes: 67 additions & 13 deletions src/spock_conflict.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,47 +343,98 @@ spock_report_conflict(SpockConflictType conflict_type,
Oid conflict_idx_oid)
{
char local_tup_ts_str[MAXDATELEN] = "(unset)";
char local_origin_str[32];
StringInfoData localtup,
remotetup;
TupleDesc desc = RelationGetDescr(rel->rel);
const char *idxname = "(unknown)";
const char *qualrelname;
bool save_in_resolutions = true;

/* Ignore update-update conflict for same origin */
if (conflict_type == SPOCK_CT_UPDATE_EXISTS)
{
/*
* If updating a row that came from the same origin, do not report it
* as a conflict
* If updating a row that came from the same origin,
* do not report it as a conflict nor log
*/
if (local_tuple_origin == replorigin_session_origin)
return;

/* If updated in the same transaction, do not report it as a conflict */
/* If updated in the same transaction, do not report it as a conflict nor log */
if (local_tuple_origin == InvalidRepOriginId &&
TransactionIdEquals(local_tuple_xid, GetTopTransactionId()))
return;

/* Differing origin */
conflict_type = SPOCK_CT_UPDATE_ORIGIN_DIFFERS;

if (resolution == SpockResolution_ApplyRemote)
{
/*
* Remote tuple wins — this is normal replication flow, not a true
* conflict. Do not write to spock.resolutions, but optionally
* log to the PostgreSQL log based on the GUC setting.
*/
save_in_resolutions = false;

if (!found_local_origin)
return;

if (log_origin_change == SPOCK_ORIGIN_NONE)
return;

if (log_origin_change == SPOCK_ORIGIN_REMOTE_ONLY_DIFFERS &&
local_tuple_origin == InvalidRepOriginId)
return;

if (log_origin_change == SPOCK_ORIGIN_DIFFERS_SINCE_SUB)
{
/* Do not log if we do not have a sub creation timestamp */
if (!MySubscription || MySubscription->created_at == 0)
return;

/* If we could not determine the tuple timestamp, do not log */
if (local_tuple_commit_ts == 0)
return;

/*
* If the local tuple predates the subscription, it was
* loaded before replication was set up (e.g. pg_restore).
* Do not treat this as an origin-change conflict.
*/
if (local_tuple_commit_ts < MySubscription->created_at)
return;
}
}
}

/* Count statistics */
handle_stats_counter(rel->rel, MyApplyWorker->subid,
SPOCK_STATS_CONFLICT_COUNT, 1);

/* If configured log resolution to spock.resolutions table */
spock_conflict_log_table(conflict_type, rel, localtuple, oldkey,
if (save_in_resolutions)
{
/* Count statistics */
handle_stats_counter(rel->rel, MyApplyWorker->subid,
SPOCK_STATS_CONFLICT_COUNT, 1);

/* If configured log resolution to table */
spock_conflict_log_table(conflict_type, rel, localtuple, oldkey,
remotetuple, applytuple, resolution,
local_tuple_xid, found_local_origin,
local_tuple_origin, local_tuple_commit_ts,
conflict_idx_oid);
}

memset(local_tup_ts_str, 0, MAXDATELEN);
strlcpy(local_origin_str, "unknown", sizeof(local_origin_str));
if (found_local_origin)
{
strlcpy(local_tup_ts_str,
timestamptz_to_str(local_tuple_commit_ts),
MAXDATELEN);
if (local_tuple_origin != InvalidRepOriginId)
snprintf(local_origin_str, sizeof(local_origin_str), "%u",
(unsigned int) local_tuple_origin);
}

initStringInfo(&remotetup);

Expand Down Expand Up @@ -428,9 +479,9 @@ spock_report_conflict(SpockConflictType conflict_type,
SpockConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
errdetail("existing local tuple {%s} xid=%u,origin=%s,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
localtup.data, local_tuple_xid,
found_local_origin ? (int) local_tuple_origin : -1,
local_origin_str,
local_tup_ts_str,
remotetup.data,
replorigin_session_origin,
Expand Down Expand Up @@ -572,15 +623,18 @@ spock_conflict_log_table(SpockConflictType conflict_type,
/* conflict_resolution */
values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution));
/* local_origin */
values[7] = Int32GetDatum(found_local_origin ? (int) local_tuple_origin : -1);
if (found_local_origin && local_tuple_origin != InvalidRepOriginId)
values[7] = Int32GetDatum((int) local_tuple_origin);
else
nulls[7] = true;

/* local_tuple */
if (localtuple != NULL)
{
Datum datum;

datum = heap_copy_tuple_as_datum(localtuple, desc);
values[8] = spock_conflict_row_to_json(datum, false, &nulls[7]);
values[8] = spock_conflict_row_to_json(datum, false, &nulls[8]);
}
else
nulls[8] = true;
Expand All @@ -607,7 +661,7 @@ spock_conflict_log_table(SpockConflictType conflict_type,
Datum datum;

datum = heap_copy_tuple_as_datum(remotetuple, desc);
values[12] = spock_conflict_row_to_json(datum, false, &nulls[11]);
values[12] = spock_conflict_row_to_json(datum, false, &nulls[12]);
}
else
nulls[12] = true;
Expand Down
13 changes: 12 additions & 1 deletion src/spock_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ typedef struct SubscriptionTuple
NameData sub_slot_name;
} SubscriptionTuple;

#define Natts_subscription 14
#define Natts_subscription 15
#define Anum_sub_id 1
#define Anum_sub_name 2
#define Anum_sub_origin 3
Expand All @@ -110,6 +110,7 @@ typedef struct SubscriptionTuple
#define Anum_sub_force_text_transfer 12
#define Anum_sub_skip_lsn 13
#define Anum_sub_skip_schema 14
#define Anum_sub_created_at 15

/*
* List of extensions and schemas that we should skip globally.
Expand Down Expand Up @@ -905,6 +906,8 @@ create_subscription(SpockSubscription *sub)
else
nulls[Anum_sub_skip_schema - 1] = true;

values[Anum_sub_created_at - 1] = TimestampTzGetDatum(GetCurrentTimestamp());

tup = heap_form_tuple(tupDesc, values, nulls);

/* Insert the tuple to the catalog. */
Expand Down Expand Up @@ -968,6 +971,7 @@ alter_subscription(SpockSubscription *sub)

replaces[Anum_sub_id - 1] = false;
replaces[Anum_sub_name - 1] = false;
replaces[Anum_sub_created_at - 1] = false;

values[Anum_sub_origin - 1] = ObjectIdGetDatum(sub->origin_if->nodeid);
values[Anum_sub_target - 1] = ObjectIdGetDatum(sub->target_if->nodeid);
Expand Down Expand Up @@ -1133,6 +1137,13 @@ subscription_fromtuple(HeapTuple tuple, TupleDesc desc)
sub->skip_schema = skip_schema_names;
}

/* Get created_at. */
d = heap_getattr(tuple, Anum_sub_created_at, desc, &isnull);
if (isnull)
sub->created_at = 0;
else
sub->created_at = DatumGetTimestampTz(d);

return sub;
}

Expand Down
9 changes: 4 additions & 5 deletions tests/regress/expected/tuple_origin.out
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,11 @@ SELECT * FROM basic_conflict ORDER BY id;
2 | B
(2 rows)

-- We should now see a conflict
-- Origin changes are no longer saved to resolutions
SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict';
relname | conflict_type
-----------------------+-----------------------
public.basic_conflict | update_origin_differs
(1 row)
relname | conflict_type
---------+---------------
(0 rows)

-- Clean
TRUNCATE spock.resolutions;
Expand Down
2 changes: 1 addition & 1 deletion tests/regress/sql/tuple_origin.sql
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT * FROM basic_conflict ORDER BY id;

-- We should now see a conflict
-- Origin changes are no longer saved to resolutions
SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict';

-- Clean
Expand Down
3 changes: 3 additions & 0 deletions tests/tap/schedule
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ test: 009_zodan_add_remove_nodes
test: 010_zodan_add_remove_python
test: 012_zodan_basics

test: 013_origin_change_restore
test: 014_pgdump_restore_conflict

# Tests, consuming too much time to be launched on each check:
#test: 011_zodan_sync_third
#
Expand Down
Loading