From 937bc5976f08daf0cc7f13f140cfca9d8017d43c Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Thu, 7 May 2026 12:14:22 +0000 Subject: [PATCH 1/2] fix(replication): distribute destination omni among the subscribers --- .../logical/publisher/publisher_impl.rs | 7 +- .../replication/logical/subscriber/mod.rs | 1 + .../logical/subscriber/omni_ownership.rs | 112 ++++++++++++++++++ .../replication/logical/subscriber/stream.rs | 13 +- .../replication/logical/subscriber/tests.rs | 102 ++++++++++++---- 5 files changed, 211 insertions(+), 24 deletions(-) create mode 100644 pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 8b25df02a..8241f9c28 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -13,6 +13,7 @@ use tracing::{debug, info, warn}; use super::super::{ensure_validation, publisher::Table, Error}; use super::ReplicationSlot; +use crate::backend::replication::logical::subscriber::omni_ownership::OmniOwnership; use crate::backend::replication::logical::subscriber::stream::StreamSubscriber; use crate::backend::replication::publisher::progress::Progress; use crate::backend::replication::publisher::Lsn; @@ -196,6 +197,7 @@ impl Publisher { self.create_slots(source).await?; } + let n_sources = source.shards().len(); for (number, _) in source.shards().iter().enumerate() { // Use table offsets from data sync // or from loading them above. @@ -204,7 +206,10 @@ impl Publisher { .get(&number) .ok_or(Error::NoReplicationTables(number))?; // Handles the logical replication stream messages. - let mut stream = StreamSubscriber::new(dest, tables); + // Each subscriber owns a partition of destination shards for omni-table DML + // (dest_shard % n_sources == source_shard), preventing cross-subscriber deadlocks. + let mut stream = + StreamSubscriber::new(dest, tables, OmniOwnership::new(number, n_sources)); // Take ownership of the slot for replication. let mut slot = self diff --git a/pgdog/src/backend/replication/logical/subscriber/mod.rs b/pgdog/src/backend/replication/logical/subscriber/mod.rs index 9df30191b..27d1853b9 100644 --- a/pgdog/src/backend/replication/logical/subscriber/mod.rs +++ b/pgdog/src/backend/replication/logical/subscriber/mod.rs @@ -1,5 +1,6 @@ pub mod context; pub mod copy; +pub mod omni_ownership; pub mod parallel_connection; pub mod stream; diff --git a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs new file mode 100644 index 000000000..1aaa3a410 --- /dev/null +++ b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs @@ -0,0 +1,112 @@ +/// Controls which destination shards a subscriber writes to for omni (unsharded) tables. +/// +/// Partitions destinations via `dest_shard % n_sources == source_shard` so that each +/// subscriber owns a disjoint subset, preventing cross-subscriber row-lock deadlocks. +#[derive(Debug, Clone, Copy)] +pub struct OmniOwnership { + source_shard: usize, + n_sources: usize, +} + +impl OmniOwnership { + pub fn new(source_shard: usize, n_sources: usize) -> Self { + Self { + source_shard, + n_sources, + } + } + + /// Returns true if this subscriber should write omni-table DML to `dest_shard`. + pub fn owns(&self, dest_shard: usize) -> bool { + if self.n_sources <= 1 { + return true; + } + dest_shard % self.n_sources == self.source_shard + } +} + +impl Default for OmniOwnership { + fn default() -> Self { + Self::new(0, 1) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn omni_single_source_owns_all_dests() { + let p = OmniOwnership::new(0, 1); + assert!(p.owns(0)); + assert!(p.owns(1)); + assert!(p.owns(2)); + assert!(p.owns(7)); + } + + #[test] + fn omni_zero_sources_owns_all_dests() { + let p = OmniOwnership::new(0, 0); + assert!(p.owns(0)); + assert!(p.owns(1)); + assert!(p.owns(3)); + } + + #[test] + fn omni_equal_sources_and_dests() { + // n_sources == n_dests == 3: strict 1:1, each source owns only its own index. + assert!(OmniOwnership::new(0, 3).owns(0)); + assert!(OmniOwnership::new(1, 3).owns(1)); + assert!(OmniOwnership::new(2, 3).owns(2)); + + assert!(!OmniOwnership::new(1, 3).owns(0)); + assert!(!OmniOwnership::new(2, 3).owns(0)); + assert!(!OmniOwnership::new(0, 3).owns(1)); + assert!(!OmniOwnership::new(2, 3).owns(1)); + assert!(!OmniOwnership::new(0, 3).owns(2)); + assert!(!OmniOwnership::new(1, 3).owns(2)); + } + + #[test] + fn omni_fewer_sources_than_dests() { + // n_sources=2, n_dests=4: sub-0 owns even dests, sub-1 owns odd dests. + let p0 = OmniOwnership::new(0, 2); + assert!(p0.owns(0)); + assert!(p0.owns(2)); + assert!(!p0.owns(1)); + assert!(!p0.owns(3)); + + let p1 = OmniOwnership::new(1, 2); + assert!(p1.owns(1)); + assert!(p1.owns(3)); + assert!(!p1.owns(0)); + assert!(!p1.owns(2)); + } + + #[test] + fn omni_more_sources_than_dests_all_dests_covered() { + // n_sources=5, n_dests=3: subs 0-2 each own their matching dest exclusively. + assert!(OmniOwnership::new(0, 5).owns(0)); + assert!(OmniOwnership::new(1, 5).owns(1)); + assert!(OmniOwnership::new(2, 5).owns(2)); + + assert!(!OmniOwnership::new(1, 5).owns(0)); + assert!(!OmniOwnership::new(2, 5).owns(0)); + assert!(!OmniOwnership::new(0, 5).owns(1)); + assert!(!OmniOwnership::new(2, 5).owns(1)); + assert!(!OmniOwnership::new(0, 5).owns(2)); + assert!(!OmniOwnership::new(1, 5).owns(2)); + } + + #[test] + fn omni_more_sources_than_dests_excess_sources_idle() { + // n_sources=5, n_dests=3: subs 3 and 4 own no destinations. + assert!(!OmniOwnership::new(3, 5).owns(0)); + assert!(!OmniOwnership::new(3, 5).owns(1)); + assert!(!OmniOwnership::new(3, 5).owns(2)); + + assert!(!OmniOwnership::new(4, 5).owns(0)); + assert!(!OmniOwnership::new(4, 5).owns(1)); + assert!(!OmniOwnership::new(4, 5).owns(2)); + } +} diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 4d4052701..834ebf378 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -18,6 +18,7 @@ use super::super::publisher::{tables_missing_unique_index, NonIdentityColumnsPre use super::super::{ ensure_validation, publisher::Table, Error, TableValidationError, TableValidationErrorKind, }; +use super::omni_ownership::OmniOwnership; use super::StreamContext; use crate::net::messages::replication::logical::tuple_data::{Identifier, TupleData}; use crate::net::messages::replication::logical::update::Update as XLogUpdate; @@ -128,10 +129,13 @@ pub struct StreamSubscriber { // Missed rows. missed_rows: MissedRows, + + // Determines which destination shards this subscriber owns for omni tables. + partition: OmniOwnership, } impl StreamSubscriber { - pub fn new(cluster: &Cluster, tables: &[Table]) -> Self { + pub fn new(cluster: &Cluster, tables: &[Table], partition: OmniOwnership) -> Self { let cluster = cluster.logical_stream(); Self { cluster, @@ -158,6 +162,7 @@ impl StreamSubscriber { in_transaction: false, missed_rows: MissedRows::default(), keys: HashMap::default(), + partition, } } @@ -223,6 +228,8 @@ impl StreamSubscriber { // Dispatch a pre-built bind to the matching shard(s). async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> { + // Locals avoid borrowing self inside the iter_mut closure. + let partition = self.partition; let mut conns: Vec<_> = self .connections .iter_mut() @@ -230,7 +237,7 @@ impl StreamSubscriber { .filter(|(shard, _)| match val { Shard::Direct(direct) => *shard == *direct, Shard::Multi(multi) => multi.contains(shard), - _ => true, + Shard::All => partition.owns(*shard), }) .map(|(_, server)| server) .collect(); @@ -995,7 +1002,7 @@ mod tests { fn make_subscriber() -> StreamSubscriber { let cluster = Cluster::new_test(&config()); - StreamSubscriber::new(&cluster, &[]) + StreamSubscriber::new(&cluster, &[], OmniOwnership::default()) } #[test] diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs index c70a371ca..eb5e0b45a 100644 --- a/pgdog/src/backend/replication/logical/subscriber/tests.rs +++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs @@ -30,6 +30,7 @@ use crate::{ }, }; +use super::omni_ownership::OmniOwnership; use super::stream::StreamSubscriber; fn random_id() -> String { @@ -254,23 +255,26 @@ fn x_update(u: XLogUpdate) -> CopyData { fn make_subscriber() -> StreamSubscriber { let cluster = Cluster::new_test(&config()); let tables = vec![make_sharded_table(), make_sharded_test_b_table()]; - StreamSubscriber::new(&cluster, &tables) + StreamSubscriber::new(&cluster, &tables, OmniOwnership::default()) } fn make_subscriber_with_tables(tables: Vec) -> StreamSubscriber { let cluster = Cluster::new_test(&config()); - StreamSubscriber::new(&cluster, &tables) + StreamSubscriber::new(&cluster, &tables, OmniOwnership::default()) } -fn make_subscriber_with_tables_two_databases(tables: Vec
) -> StreamSubscriber { +fn make_subscriber_with_tables_two_databases( + tables: Vec
, + partition: OmniOwnership, +) -> StreamSubscriber { let cluster = Cluster::new_test_two_databases(&config()); - StreamSubscriber::new(&cluster, &tables) + StreamSubscriber::new(&cluster, &tables, partition) } fn make_subscriber_single_shard() -> StreamSubscriber { let cluster = Cluster::new_test_single_shard(&config()); let tables = vec![make_sharded_table(), make_sharded_test_b_table()]; - StreamSubscriber::new(&cluster, &tables) + StreamSubscriber::new(&cluster, &tables, OmniOwnership::default()) } /// Count rows matching the given `WHERE` predicate using a separate connection. @@ -582,7 +586,7 @@ async fn partition_leaves_share_destination() { leaf_b.table.parent_name = "sharded".to_string(); let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b]); + let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b], OmniOwnership::default()); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1523,7 +1527,11 @@ fn omni_insert_copy_data(oid: Oid, a: &str, b: &str) -> CopyData { #[tokio::test] async fn full_identity_nothing_rejected() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_replica_identity_nothing_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_replica_identity_nothing_table()], + OmniOwnership::default(), + ); sub.connect().await.unwrap(); let oid = Oid(16390); @@ -1557,7 +1565,11 @@ async fn full_identity_nothing_rejected() { #[tokio::test] async fn full_identity_omni_no_unique_index_rejected() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_omni_table()], + OmniOwnership::default(), + ); // Enforce precondition: the table must exist but have no qualifying unique index. // A stale unique index from a prior run would make tables_missing_unique_index() return empty, @@ -1596,7 +1608,11 @@ async fn full_identity_omni_no_unique_index_rejected() { #[tokio::test] async fn full_identity_insert_sharded() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_sharded_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1622,7 +1638,11 @@ async fn full_identity_insert_sharded() { #[tokio::test] async fn full_identity_update_fast_path() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_sharded_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1677,7 +1697,11 @@ async fn full_identity_update_fast_path() { #[tokio::test] async fn full_identity_update_slow_path() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_sharded_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1737,7 +1761,11 @@ async fn full_identity_update_slow_path() { #[tokio::test] async fn full_identity_update_slow_path_realistic_old_tuple() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_sharded_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1794,7 +1822,11 @@ async fn full_identity_update_slow_path_realistic_old_tuple() { #[tokio::test] async fn full_identity_update_all_toasted_is_noop() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_sharded_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1836,7 +1868,11 @@ async fn full_identity_update_all_toasted_is_noop() { #[tokio::test] async fn full_identity_delete() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_sharded_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1874,7 +1910,11 @@ async fn full_identity_delete() { #[tokio::test] async fn full_identity_insert_omni_dedup() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_dedup_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_omni_dedup_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; // Ensure destination table exists with unique index before relation() runs. @@ -1933,7 +1973,11 @@ async fn full_identity_insert_omni_dedup() { #[tokio::test] async fn full_identity_update_duplicate_rows() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_dup_rows_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; ensure_table(&mut verify, "public.full_dup_rows").await; @@ -1999,7 +2043,11 @@ async fn full_identity_update_duplicate_rows() { #[tokio::test] async fn full_identity_delete_duplicate_rows() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_dup_rows_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; ensure_table(&mut verify, "public.full_dup_rows").await; @@ -2066,7 +2114,11 @@ async fn full_identity_delete_duplicate_rows() { #[tokio::test] async fn full_identity_update_matches_null_column() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_dup_rows_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; // full_dup_rows has no NOT NULL on value — we can seed a NULL row. @@ -2127,7 +2179,11 @@ async fn full_identity_update_matches_null_column() { #[tokio::test] async fn full_identity_delete_matches_null_column() { let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]); + let mut sub = StreamSubscriber::new( + &cluster, + &[make_full_identity_dup_rows_table()], + OmniOwnership::default(), + ); let mut verify = test_server().await; ensure_table(&mut verify, "public.full_dup_rows").await; @@ -2302,7 +2358,13 @@ async fn cross_subscriber_omni_deadlock_two_databases() { let id1 = id1.clone(); let round_barrier = Arc::clone(&round_barrier); tokio::spawn(async move { - let mut sub = make_subscriber_with_tables_two_databases(vec![make_settings_table()]); + // Each subscriber owns a disjoint subset of destination shards: + // sub-0 → dest-0, sub-1 → dest-1 (dest_shard % 2 == sub_idx). + // This is the destination-partitioned apply fix for the cross-subscriber deadlock. + let mut sub = make_subscriber_with_tables_two_databases( + vec![make_settings_table()], + OmniOwnership::new(sub_idx, 2), + ); sub.connect().await.unwrap(); // Distinct LSN ranges so neither subscriber's LSN gating skips the other's events. From c2a94ec8d24fcc8e502f8d49772c145224e884a9 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Sun, 10 May 2026 11:15:05 +0000 Subject: [PATCH 2/2] add test & fix single shard --- integration/resharding/dev.sh | 32 ++++++++- .../issues/omni-table-subscriber-deadlock.md | 70 ++++++++++++------- .../logical/subscriber/omni_ownership.rs | 4 ++ .../replication/logical/subscriber/stream.rs | 11 ++- 4 files changed, 84 insertions(+), 33 deletions(-) diff --git a/integration/resharding/dev.sh b/integration/resharding/dev.sh index 012ae23e1..ca27fbf82 100644 --- a/integration/resharding/dev.sh +++ b/integration/resharding/dev.sh @@ -151,10 +151,16 @@ wait_for_no_copy_rows tasks title wait_for_no_copy_rows task_comments body wait_for_no_copy_rows settings name + +# pg_count PORT TABLE — row count via a direct postgres connection (bypasses pgdog). +pg_count() { PGPASSWORD=pgdog psql -h 127.0.0.1 -p "$1" -U pgdog -d postgres -tAc "SELECT COUNT(*) FROM $2"; } + +# check_row_count_matches TABLE +# Verifies total row count matches between source (via pgdog) and destination (via pgdog). +# Suitable for sharded tables where pgdog aggregates across all shards. check_row_count_matches() { local table="$1" - local source_count - local destination_count + local source_count destination_count source_count=$(psql -d source -tAc "SELECT COUNT(*) FROM ${table}") destination_count=$(psql -d destination -tAc "SELECT COUNT(*) FROM ${table}") @@ -167,11 +173,31 @@ check_row_count_matches() { echo "OK ${table}: ${source_count} rows" } +# check_omni_each_shard TABLE +# For omni (non-sharded) tables: queries each destination shard directly and asserts +# it holds the full source row count. A query through pgdog hits one shard and cannot +# detect a shard that is missing rows. +check_omni_each_shard() { + local table="$1" + local source_count dest0_count dest1_count + + source_count=$(pg_count 15432 "${table}") + dest0_count=$(pg_count 15434 "${table}") + dest1_count=$(pg_count 15435 "${table}") + + if [ "${source_count}" -ne "${dest0_count}" ] || [ "${source_count}" -ne "${dest1_count}" ]; then + echo "MISMATCH omni ${table}: source=${source_count} dest-0(15434)=${dest0_count} dest-1(15435)=${dest1_count} (expected ${source_count} on each)" + exit 1 + fi + + echo "OK omni ${table}: ${source_count} rows on each shard" +} + check_row_count_matches tenants check_row_count_matches accounts check_row_count_matches projects check_row_count_matches tasks check_row_count_matches task_comments -check_row_count_matches settings +check_omni_each_shard settings cleanup diff --git a/pgdog/docs/issues/omni-table-subscriber-deadlock.md b/pgdog/docs/issues/omni-table-subscriber-deadlock.md index bbfd8b50c..9a94ac3d3 100644 --- a/pgdog/docs/issues/omni-table-subscriber-deadlock.md +++ b/pgdog/docs/issues/omni-table-subscriber-deadlock.md @@ -137,24 +137,7 @@ errors. Combined with Solution 1, it catches the multi-row cases sequential appl | Both | ✓ | bounded | ✓ | Ship `lock_timeout` first — lower-risk, immediate protection against every deadlock shape. -Sequential apply reduces single-row contention and is a prerequisite for any -destination-partitioned apply (below). - -### Further: destination-partitioned apply (long-term) - -Each destination shard is written by exactly one source subscriber for omni-table DML: - -``` -owns_omni_destination(source_shard, dest_shard, n_sources) = dest_shard % n_sources == source_shard -``` - -Removes cross-subscriber row-lock contention entirely. Requires every source subscriber to see -every omni-table WAL event. If omni writes can originate from a single source only, partitioning -silently leaves other destinations stale — routing must first establish source-presence before -partitioning is safe. - ---- - +Sequential apply reduces single-row contention. Both are superseded by Solution 6. ### Solution 3: per-table async writer task One long-lived Tokio task per replicated table. Subscribers buffer the full WAL transaction in @@ -304,21 +287,54 @@ Treat as a `send()`-loop optimization parallel to Solutions 1+2, not part of the --- +### Solution 6: modulo-partition subscriber ownership (implemented) + +The root cause is that every subscriber writes every omni-table row to every destination. +The fix assigns each subscriber a disjoint subset of destination shards so no two subscribers +ever hold locks on the same row at the same time. + +The partitioning rule lives in `OmniOwnership::owns()` (`omni_ownership.rs`): subscriber +`source_shard` owns destination shard `d` when `d % n_sources == source_shard`. With two +subscribers and two destinations, sub-0 owns even-indexed destinations and sub-1 owns +odd-indexed ones — they never touch the same row on the same server simultaneously. + +Enforcement happens in `send()` (`stream.rs`). The connection filter always passes each +shard through `partition.owns()`, with one exception: when there are multiple destination +connections, `Shard::Direct` and `Shard::Multi` carry an explicit shard computed from the +row's shard key and must land precisely — ownership filtering doesn't apply there. + +The edge case that required care: with a single destination shard, the query router +collapses `Shard::All` to `Shard::Direct(0)` for every table, including omni tables that +have no shard key. Without a guard, the ownership check would be bypassed and all +subscribers would write to the one destination — the original bug, just on a smaller +cluster. The `n_conns == 1` path in the filter catches this, routing single-connection +dispatches through `partition.owns()` regardless of how the shard was computed. + +**Correctness prerequisite:** every subscriber must receive every omni-table WAL event. +Logical replication publishes each WAL record to all subscribers of the same publication, +so this holds in any standard deployment. + +**Sharded tables are unaffected.** `OmniOwnership` is constructed with `n_sources = 1` +for sharded-table subscribers, causing `owns()` to return `true` unconditionally — the +behavior is identical to the pre-fix code. + +--- + ### Comparison | | Deadlock fixed | Cross-table/shard atomicity | Throughput impact | Memory risk | Complexity | Recommendation | |---|:---:|:---:|:---:|:---:|:---:|---| -| Solution 1: sequential per-dest apply | single-row only | preserved | minor (serial dest RTTs per row) | none | low | Ship | -| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Ship first | -| **Solutions 1 + 2 combined** | **yes (bounded for multi-row)** | **preserved** | **minor** | **none** | **low** | **Recommended** | -| Solution 3: per-table writer | yes | broken | severe: N-way → 1-way per omni table | unbounded | high | Not recommended | -| Solution 4: per-shard writer | silent divergence† | broken | severe: removes all destination parallelism | unbounded | high | **Discard** | -| Solution 5: buffer + `Sync` | no | preserved | minor (apply lag) | unbounded | low | Optional optimization | -| Destination-partitioned apply | yes | preserved | none | none | high | Long-term structural fix | +| Solution 1: sequential per-dest apply | single-row only | preserved | minor | none | low | Superseded | +| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Superseded | +| Solution 3: per-table writer | yes | broken | severe | unbounded | high | Not recommended | +| Solution 4: per-shard writer | silent divergence† | broken | severe | unbounded | high | Discard | +| Solution 5: buffer + `Sync` | no | preserved | minor | unbounded | low | Optional optimization | +| **Modulo-partition subscriber ownership** | **yes (structural)** | **preserved** | **none** | **none** | **low** | **Implemented** | †Solution 4 produces persistent, undetectable row-level disagreement across shards when two subscribers race on the same omni row. The deadlock is observable and recoverable; this divergence is neither. -Ship 1+2 short-term; pursue destination-partitioned apply long-term. Solutions 3 and 4 trade the -deadlock for worse failure modes and throughput. Solution 5 is orthogonal. +Modulo-partition subscriber ownership is the structural fix and is now implemented in +`send()`. Solutions 1 and 2 remain valid as defence-in-depth. Solutions 3 and 4 trade the +deadlock for worse failure modes and throughput. Solution 5 is orthogonal. \ No newline at end of file diff --git a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs index 1aaa3a410..a29b2c9d6 100644 --- a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs +++ b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs @@ -10,6 +10,10 @@ pub struct OmniOwnership { impl OmniOwnership { pub fn new(source_shard: usize, n_sources: usize) -> Self { + debug_assert!( + n_sources == 0 || source_shard < n_sources, + "source_shard ({source_shard}) must be < n_sources ({n_sources})" + ); Self { source_shard, n_sources, diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 834ebf378..f9a450c45 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -230,14 +230,19 @@ impl StreamSubscriber { async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> { // Locals avoid borrowing self inside the iter_mut closure. let partition = self.partition; + let n_conns = self.connections.len(); let mut conns: Vec<_> = self .connections .iter_mut() .enumerate() .filter(|(shard, _)| match val { - Shard::Direct(direct) => *shard == *direct, - Shard::Multi(multi) => multi.contains(shard), - Shard::All => partition.owns(*shard), + // With a single destination shard the router collapses Shard::All + // to Direct(0), bypassing the partition ownership check. Apply + // partition.owns() for all variants when there is only one connection + // so that omni-table writes are still partitioned across subscribers. + Shard::Direct(direct) if n_conns > 1 => *shard == *direct, + Shard::Multi(multi) if n_conns > 1 => multi.contains(shard), + _ => partition.owns(*shard), }) .map(|(_, server)| server) .collect();