From 937bc5976f08daf0cc7f13f140cfca9d8017d43c Mon Sep 17 00:00:00 2001
From: meskill <8974488+meskill@users.noreply.github.com>
Date: Thu, 7 May 2026 12:14:22 +0000
Subject: [PATCH 1/2] fix(replication): distribute destination omni among the
subscribers
---
.../logical/publisher/publisher_impl.rs | 7 +-
.../replication/logical/subscriber/mod.rs | 1 +
.../logical/subscriber/omni_ownership.rs | 112 ++++++++++++++++++
.../replication/logical/subscriber/stream.rs | 13 +-
.../replication/logical/subscriber/tests.rs | 102 ++++++++++++----
5 files changed, 211 insertions(+), 24 deletions(-)
create mode 100644 pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
index 8b25df02a..8241f9c28 100644
--- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
+++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
@@ -13,6 +13,7 @@ use tracing::{debug, info, warn};
use super::super::{ensure_validation, publisher::Table, Error};
use super::ReplicationSlot;
+use crate::backend::replication::logical::subscriber::omni_ownership::OmniOwnership;
use crate::backend::replication::logical::subscriber::stream::StreamSubscriber;
use crate::backend::replication::publisher::progress::Progress;
use crate::backend::replication::publisher::Lsn;
@@ -196,6 +197,7 @@ impl Publisher {
self.create_slots(source).await?;
}
+ let n_sources = source.shards().len();
for (number, _) in source.shards().iter().enumerate() {
// Use table offsets from data sync
// or from loading them above.
@@ -204,7 +206,10 @@ impl Publisher {
.get(&number)
.ok_or(Error::NoReplicationTables(number))?;
// Handles the logical replication stream messages.
- let mut stream = StreamSubscriber::new(dest, tables);
+ // Each subscriber owns a partition of destination shards for omni-table DML
+ // (dest_shard % n_sources == source_shard), preventing cross-subscriber deadlocks.
+ let mut stream =
+ StreamSubscriber::new(dest, tables, OmniOwnership::new(number, n_sources));
// Take ownership of the slot for replication.
let mut slot = self
diff --git a/pgdog/src/backend/replication/logical/subscriber/mod.rs b/pgdog/src/backend/replication/logical/subscriber/mod.rs
index 9df30191b..27d1853b9 100644
--- a/pgdog/src/backend/replication/logical/subscriber/mod.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/mod.rs
@@ -1,5 +1,6 @@
pub mod context;
pub mod copy;
+pub mod omni_ownership;
pub mod parallel_connection;
pub mod stream;
diff --git a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
new file mode 100644
index 000000000..1aaa3a410
--- /dev/null
+++ b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
@@ -0,0 +1,112 @@
+/// Controls which destination shards a subscriber writes to for omni (unsharded) tables.
+///
+/// Partitions destinations via `dest_shard % n_sources == source_shard` so that each
+/// subscriber owns a disjoint subset, preventing cross-subscriber row-lock deadlocks.
+#[derive(Debug, Clone, Copy)]
+pub struct OmniOwnership {
+ source_shard: usize,
+ n_sources: usize,
+}
+
+impl OmniOwnership {
+ pub fn new(source_shard: usize, n_sources: usize) -> Self {
+ Self {
+ source_shard,
+ n_sources,
+ }
+ }
+
+ /// Returns true if this subscriber should write omni-table DML to `dest_shard`.
+ pub fn owns(&self, dest_shard: usize) -> bool {
+ if self.n_sources <= 1 {
+ return true;
+ }
+ dest_shard % self.n_sources == self.source_shard
+ }
+}
+
+impl Default for OmniOwnership {
+ fn default() -> Self {
+ Self::new(0, 1)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn omni_single_source_owns_all_dests() {
+ let p = OmniOwnership::new(0, 1);
+ assert!(p.owns(0));
+ assert!(p.owns(1));
+ assert!(p.owns(2));
+ assert!(p.owns(7));
+ }
+
+ #[test]
+ fn omni_zero_sources_owns_all_dests() {
+ let p = OmniOwnership::new(0, 0);
+ assert!(p.owns(0));
+ assert!(p.owns(1));
+ assert!(p.owns(3));
+ }
+
+ #[test]
+ fn omni_equal_sources_and_dests() {
+ // n_sources == n_dests == 3: strict 1:1, each source owns only its own index.
+ assert!(OmniOwnership::new(0, 3).owns(0));
+ assert!(OmniOwnership::new(1, 3).owns(1));
+ assert!(OmniOwnership::new(2, 3).owns(2));
+
+ assert!(!OmniOwnership::new(1, 3).owns(0));
+ assert!(!OmniOwnership::new(2, 3).owns(0));
+ assert!(!OmniOwnership::new(0, 3).owns(1));
+ assert!(!OmniOwnership::new(2, 3).owns(1));
+ assert!(!OmniOwnership::new(0, 3).owns(2));
+ assert!(!OmniOwnership::new(1, 3).owns(2));
+ }
+
+ #[test]
+ fn omni_fewer_sources_than_dests() {
+ // n_sources=2, n_dests=4: sub-0 owns even dests, sub-1 owns odd dests.
+ let p0 = OmniOwnership::new(0, 2);
+ assert!(p0.owns(0));
+ assert!(p0.owns(2));
+ assert!(!p0.owns(1));
+ assert!(!p0.owns(3));
+
+ let p1 = OmniOwnership::new(1, 2);
+ assert!(p1.owns(1));
+ assert!(p1.owns(3));
+ assert!(!p1.owns(0));
+ assert!(!p1.owns(2));
+ }
+
+ #[test]
+ fn omni_more_sources_than_dests_all_dests_covered() {
+ // n_sources=5, n_dests=3: subs 0-2 each own their matching dest exclusively.
+ assert!(OmniOwnership::new(0, 5).owns(0));
+ assert!(OmniOwnership::new(1, 5).owns(1));
+ assert!(OmniOwnership::new(2, 5).owns(2));
+
+ assert!(!OmniOwnership::new(1, 5).owns(0));
+ assert!(!OmniOwnership::new(2, 5).owns(0));
+ assert!(!OmniOwnership::new(0, 5).owns(1));
+ assert!(!OmniOwnership::new(2, 5).owns(1));
+ assert!(!OmniOwnership::new(0, 5).owns(2));
+ assert!(!OmniOwnership::new(1, 5).owns(2));
+ }
+
+ #[test]
+ fn omni_more_sources_than_dests_excess_sources_idle() {
+ // n_sources=5, n_dests=3: subs 3 and 4 own no destinations.
+ assert!(!OmniOwnership::new(3, 5).owns(0));
+ assert!(!OmniOwnership::new(3, 5).owns(1));
+ assert!(!OmniOwnership::new(3, 5).owns(2));
+
+ assert!(!OmniOwnership::new(4, 5).owns(0));
+ assert!(!OmniOwnership::new(4, 5).owns(1));
+ assert!(!OmniOwnership::new(4, 5).owns(2));
+ }
+}
diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs
index 4d4052701..834ebf378 100644
--- a/pgdog/src/backend/replication/logical/subscriber/stream.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs
@@ -18,6 +18,7 @@ use super::super::publisher::{tables_missing_unique_index, NonIdentityColumnsPre
use super::super::{
ensure_validation, publisher::Table, Error, TableValidationError, TableValidationErrorKind,
};
+use super::omni_ownership::OmniOwnership;
use super::StreamContext;
use crate::net::messages::replication::logical::tuple_data::{Identifier, TupleData};
use crate::net::messages::replication::logical::update::Update as XLogUpdate;
@@ -128,10 +129,13 @@ pub struct StreamSubscriber {
// Missed rows.
missed_rows: MissedRows,
+
+ // Determines which destination shards this subscriber owns for omni tables.
+ partition: OmniOwnership,
}
impl StreamSubscriber {
- pub fn new(cluster: &Cluster, tables: &[Table]) -> Self {
+ pub fn new(cluster: &Cluster, tables: &[Table], partition: OmniOwnership) -> Self {
let cluster = cluster.logical_stream();
Self {
cluster,
@@ -158,6 +162,7 @@ impl StreamSubscriber {
in_transaction: false,
missed_rows: MissedRows::default(),
keys: HashMap::default(),
+ partition,
}
}
@@ -223,6 +228,8 @@ impl StreamSubscriber {
// Dispatch a pre-built bind to the matching shard(s).
async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> {
+ // Locals avoid borrowing self inside the iter_mut closure.
+ let partition = self.partition;
let mut conns: Vec<_> = self
.connections
.iter_mut()
@@ -230,7 +237,7 @@ impl StreamSubscriber {
.filter(|(shard, _)| match val {
Shard::Direct(direct) => *shard == *direct,
Shard::Multi(multi) => multi.contains(shard),
- _ => true,
+ Shard::All => partition.owns(*shard),
})
.map(|(_, server)| server)
.collect();
@@ -995,7 +1002,7 @@ mod tests {
fn make_subscriber() -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
- StreamSubscriber::new(&cluster, &[])
+ StreamSubscriber::new(&cluster, &[], OmniOwnership::default())
}
#[test]
diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs
index c70a371ca..eb5e0b45a 100644
--- a/pgdog/src/backend/replication/logical/subscriber/tests.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs
@@ -30,6 +30,7 @@ use crate::{
},
};
+use super::omni_ownership::OmniOwnership;
use super::stream::StreamSubscriber;
fn random_id() -> String {
@@ -254,23 +255,26 @@ fn x_update(u: XLogUpdate) -> CopyData {
fn make_subscriber() -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
let tables = vec![make_sharded_table(), make_sharded_test_b_table()];
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, OmniOwnership::default())
}
fn make_subscriber_with_tables(tables: Vec
) -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, OmniOwnership::default())
}
-fn make_subscriber_with_tables_two_databases(tables: Vec) -> StreamSubscriber {
+fn make_subscriber_with_tables_two_databases(
+ tables: Vec,
+ partition: OmniOwnership,
+) -> StreamSubscriber {
let cluster = Cluster::new_test_two_databases(&config());
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, partition)
}
fn make_subscriber_single_shard() -> StreamSubscriber {
let cluster = Cluster::new_test_single_shard(&config());
let tables = vec![make_sharded_table(), make_sharded_test_b_table()];
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, OmniOwnership::default())
}
/// Count rows matching the given `WHERE` predicate using a separate connection.
@@ -582,7 +586,7 @@ async fn partition_leaves_share_destination() {
leaf_b.table.parent_name = "sharded".to_string();
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b]);
+ let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b], OmniOwnership::default());
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1523,7 +1527,11 @@ fn omni_insert_copy_data(oid: Oid, a: &str, b: &str) -> CopyData {
#[tokio::test]
async fn full_identity_nothing_rejected() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_replica_identity_nothing_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_replica_identity_nothing_table()],
+ OmniOwnership::default(),
+ );
sub.connect().await.unwrap();
let oid = Oid(16390);
@@ -1557,7 +1565,11 @@ async fn full_identity_nothing_rejected() {
#[tokio::test]
async fn full_identity_omni_no_unique_index_rejected() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_omni_table()],
+ OmniOwnership::default(),
+ );
// Enforce precondition: the table must exist but have no qualifying unique index.
// A stale unique index from a prior run would make tables_missing_unique_index() return empty,
@@ -1596,7 +1608,11 @@ async fn full_identity_omni_no_unique_index_rejected() {
#[tokio::test]
async fn full_identity_insert_sharded() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1622,7 +1638,11 @@ async fn full_identity_insert_sharded() {
#[tokio::test]
async fn full_identity_update_fast_path() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1677,7 +1697,11 @@ async fn full_identity_update_fast_path() {
#[tokio::test]
async fn full_identity_update_slow_path() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1737,7 +1761,11 @@ async fn full_identity_update_slow_path() {
#[tokio::test]
async fn full_identity_update_slow_path_realistic_old_tuple() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1794,7 +1822,11 @@ async fn full_identity_update_slow_path_realistic_old_tuple() {
#[tokio::test]
async fn full_identity_update_all_toasted_is_noop() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1836,7 +1868,11 @@ async fn full_identity_update_all_toasted_is_noop() {
#[tokio::test]
async fn full_identity_delete() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1874,7 +1910,11 @@ async fn full_identity_delete() {
#[tokio::test]
async fn full_identity_insert_omni_dedup() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_dedup_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_omni_dedup_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
// Ensure destination table exists with unique index before relation() runs.
@@ -1933,7 +1973,11 @@ async fn full_identity_insert_omni_dedup() {
#[tokio::test]
async fn full_identity_update_duplicate_rows() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
ensure_table(&mut verify, "public.full_dup_rows").await;
@@ -1999,7 +2043,11 @@ async fn full_identity_update_duplicate_rows() {
#[tokio::test]
async fn full_identity_delete_duplicate_rows() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
ensure_table(&mut verify, "public.full_dup_rows").await;
@@ -2066,7 +2114,11 @@ async fn full_identity_delete_duplicate_rows() {
#[tokio::test]
async fn full_identity_update_matches_null_column() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
// full_dup_rows has no NOT NULL on value — we can seed a NULL row.
@@ -2127,7 +2179,11 @@ async fn full_identity_update_matches_null_column() {
#[tokio::test]
async fn full_identity_delete_matches_null_column() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
ensure_table(&mut verify, "public.full_dup_rows").await;
@@ -2302,7 +2358,13 @@ async fn cross_subscriber_omni_deadlock_two_databases() {
let id1 = id1.clone();
let round_barrier = Arc::clone(&round_barrier);
tokio::spawn(async move {
- let mut sub = make_subscriber_with_tables_two_databases(vec![make_settings_table()]);
+ // Each subscriber owns a disjoint subset of destination shards:
+ // sub-0 → dest-0, sub-1 → dest-1 (dest_shard % 2 == sub_idx).
+ // This is the destination-partitioned apply fix for the cross-subscriber deadlock.
+ let mut sub = make_subscriber_with_tables_two_databases(
+ vec![make_settings_table()],
+ OmniOwnership::new(sub_idx, 2),
+ );
sub.connect().await.unwrap();
// Distinct LSN ranges so neither subscriber's LSN gating skips the other's events.
From c2a94ec8d24fcc8e502f8d49772c145224e884a9 Mon Sep 17 00:00:00 2001
From: meskill <8974488+meskill@users.noreply.github.com>
Date: Sun, 10 May 2026 11:15:05 +0000
Subject: [PATCH 2/2] add test & fix single shard
---
integration/resharding/dev.sh | 32 ++++++++-
.../issues/omni-table-subscriber-deadlock.md | 70 ++++++++++++-------
.../logical/subscriber/omni_ownership.rs | 4 ++
.../replication/logical/subscriber/stream.rs | 11 ++-
4 files changed, 84 insertions(+), 33 deletions(-)
diff --git a/integration/resharding/dev.sh b/integration/resharding/dev.sh
index 012ae23e1..ca27fbf82 100644
--- a/integration/resharding/dev.sh
+++ b/integration/resharding/dev.sh
@@ -151,10 +151,16 @@ wait_for_no_copy_rows tasks title
wait_for_no_copy_rows task_comments body
wait_for_no_copy_rows settings name
+
+# pg_count PORT TABLE — row count via a direct postgres connection (bypasses pgdog).
+pg_count() { PGPASSWORD=pgdog psql -h 127.0.0.1 -p "$1" -U pgdog -d postgres -tAc "SELECT COUNT(*) FROM $2"; }
+
+# check_row_count_matches TABLE
+# Verifies total row count matches between source (via pgdog) and destination (via pgdog).
+# Suitable for sharded tables where pgdog aggregates across all shards.
check_row_count_matches() {
local table="$1"
- local source_count
- local destination_count
+ local source_count destination_count
source_count=$(psql -d source -tAc "SELECT COUNT(*) FROM ${table}")
destination_count=$(psql -d destination -tAc "SELECT COUNT(*) FROM ${table}")
@@ -167,11 +173,31 @@ check_row_count_matches() {
echo "OK ${table}: ${source_count} rows"
}
+# check_omni_each_shard TABLE
+# For omni (non-sharded) tables: queries each destination shard directly and asserts
+# it holds the full source row count. A query through pgdog hits one shard and cannot
+# detect a shard that is missing rows.
+check_omni_each_shard() {
+ local table="$1"
+ local source_count dest0_count dest1_count
+
+ source_count=$(pg_count 15432 "${table}")
+ dest0_count=$(pg_count 15434 "${table}")
+ dest1_count=$(pg_count 15435 "${table}")
+
+ if [ "${source_count}" -ne "${dest0_count}" ] || [ "${source_count}" -ne "${dest1_count}" ]; then
+ echo "MISMATCH omni ${table}: source=${source_count} dest-0(15434)=${dest0_count} dest-1(15435)=${dest1_count} (expected ${source_count} on each)"
+ exit 1
+ fi
+
+ echo "OK omni ${table}: ${source_count} rows on each shard"
+}
+
check_row_count_matches tenants
check_row_count_matches accounts
check_row_count_matches projects
check_row_count_matches tasks
check_row_count_matches task_comments
-check_row_count_matches settings
+check_omni_each_shard settings
cleanup
diff --git a/pgdog/docs/issues/omni-table-subscriber-deadlock.md b/pgdog/docs/issues/omni-table-subscriber-deadlock.md
index bbfd8b50c..9a94ac3d3 100644
--- a/pgdog/docs/issues/omni-table-subscriber-deadlock.md
+++ b/pgdog/docs/issues/omni-table-subscriber-deadlock.md
@@ -137,24 +137,7 @@ errors. Combined with Solution 1, it catches the multi-row cases sequential appl
| Both | ✓ | bounded | ✓ |
Ship `lock_timeout` first — lower-risk, immediate protection against every deadlock shape.
-Sequential apply reduces single-row contention and is a prerequisite for any
-destination-partitioned apply (below).
-
-### Further: destination-partitioned apply (long-term)
-
-Each destination shard is written by exactly one source subscriber for omni-table DML:
-
-```
-owns_omni_destination(source_shard, dest_shard, n_sources) = dest_shard % n_sources == source_shard
-```
-
-Removes cross-subscriber row-lock contention entirely. Requires every source subscriber to see
-every omni-table WAL event. If omni writes can originate from a single source only, partitioning
-silently leaves other destinations stale — routing must first establish source-presence before
-partitioning is safe.
-
----
-
+Sequential apply reduces single-row contention. Both are superseded by Solution 6.
### Solution 3: per-table async writer task
One long-lived Tokio task per replicated table. Subscribers buffer the full WAL transaction in
@@ -304,21 +287,54 @@ Treat as a `send()`-loop optimization parallel to Solutions 1+2, not part of the
---
+### Solution 6: modulo-partition subscriber ownership (implemented)
+
+The root cause is that every subscriber writes every omni-table row to every destination.
+The fix assigns each subscriber a disjoint subset of destination shards so no two subscribers
+ever hold locks on the same row at the same time.
+
+The partitioning rule lives in `OmniOwnership::owns()` (`omni_ownership.rs`): subscriber
+`source_shard` owns destination shard `d` when `d % n_sources == source_shard`. With two
+subscribers and two destinations, sub-0 owns even-indexed destinations and sub-1 owns
+odd-indexed ones — they never touch the same row on the same server simultaneously.
+
+Enforcement happens in `send()` (`stream.rs`). The connection filter always passes each
+shard through `partition.owns()`, with one exception: when there are multiple destination
+connections, `Shard::Direct` and `Shard::Multi` carry an explicit shard computed from the
+row's shard key and must land precisely — ownership filtering doesn't apply there.
+
+The edge case that required care: with a single destination shard, the query router
+collapses `Shard::All` to `Shard::Direct(0)` for every table, including omni tables that
+have no shard key. Without a guard, the ownership check would be bypassed and all
+subscribers would write to the one destination — the original bug, just on a smaller
+cluster. The `n_conns == 1` path in the filter catches this, routing single-connection
+dispatches through `partition.owns()` regardless of how the shard was computed.
+
+**Correctness prerequisite:** every subscriber must receive every omni-table WAL event.
+Logical replication publishes each WAL record to all subscribers of the same publication,
+so this holds in any standard deployment.
+
+**Sharded tables are unaffected.** `OmniOwnership` is constructed with `n_sources = 1`
+for sharded-table subscribers, causing `owns()` to return `true` unconditionally — the
+behavior is identical to the pre-fix code.
+
+---
+
### Comparison
| | Deadlock fixed | Cross-table/shard atomicity | Throughput impact | Memory risk | Complexity | Recommendation |
|---|:---:|:---:|:---:|:---:|:---:|---|
-| Solution 1: sequential per-dest apply | single-row only | preserved | minor (serial dest RTTs per row) | none | low | Ship |
-| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Ship first |
-| **Solutions 1 + 2 combined** | **yes (bounded for multi-row)** | **preserved** | **minor** | **none** | **low** | **Recommended** |
-| Solution 3: per-table writer | yes | broken | severe: N-way → 1-way per omni table | unbounded | high | Not recommended |
-| Solution 4: per-shard writer | silent divergence† | broken | severe: removes all destination parallelism | unbounded | high | **Discard** |
-| Solution 5: buffer + `Sync` | no | preserved | minor (apply lag) | unbounded | low | Optional optimization |
-| Destination-partitioned apply | yes | preserved | none | none | high | Long-term structural fix |
+| Solution 1: sequential per-dest apply | single-row only | preserved | minor | none | low | Superseded |
+| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Superseded |
+| Solution 3: per-table writer | yes | broken | severe | unbounded | high | Not recommended |
+| Solution 4: per-shard writer | silent divergence† | broken | severe | unbounded | high | Discard |
+| Solution 5: buffer + `Sync` | no | preserved | minor | unbounded | low | Optional optimization |
+| **Modulo-partition subscriber ownership** | **yes (structural)** | **preserved** | **none** | **none** | **low** | **Implemented** |
†Solution 4 produces persistent, undetectable row-level disagreement across shards when two
subscribers race on the same omni row. The deadlock is observable and recoverable; this divergence
is neither.
-Ship 1+2 short-term; pursue destination-partitioned apply long-term. Solutions 3 and 4 trade the
-deadlock for worse failure modes and throughput. Solution 5 is orthogonal.
+Modulo-partition subscriber ownership is the structural fix and is now implemented in
+`send()`. Solutions 1 and 2 remain valid as defence-in-depth. Solutions 3 and 4 trade the
+deadlock for worse failure modes and throughput. Solution 5 is orthogonal.
\ No newline at end of file
diff --git a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
index 1aaa3a410..a29b2c9d6 100644
--- a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
@@ -10,6 +10,10 @@ pub struct OmniOwnership {
impl OmniOwnership {
pub fn new(source_shard: usize, n_sources: usize) -> Self {
+ debug_assert!(
+ n_sources == 0 || source_shard < n_sources,
+ "source_shard ({source_shard}) must be < n_sources ({n_sources})"
+ );
Self {
source_shard,
n_sources,
diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs
index 834ebf378..f9a450c45 100644
--- a/pgdog/src/backend/replication/logical/subscriber/stream.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs
@@ -230,14 +230,19 @@ impl StreamSubscriber {
async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> {
// Locals avoid borrowing self inside the iter_mut closure.
let partition = self.partition;
+ let n_conns = self.connections.len();
let mut conns: Vec<_> = self
.connections
.iter_mut()
.enumerate()
.filter(|(shard, _)| match val {
- Shard::Direct(direct) => *shard == *direct,
- Shard::Multi(multi) => multi.contains(shard),
- Shard::All => partition.owns(*shard),
+ // With a single destination shard the router collapses Shard::All
+ // to Direct(0), bypassing the partition ownership check. Apply
+ // partition.owns() for all variants when there is only one connection
+ // so that omni-table writes are still partitioned across subscribers.
+ Shard::Direct(direct) if n_conns > 1 => *shard == *direct,
+ Shard::Multi(multi) if n_conns > 1 => multi.contains(shard),
+ _ => partition.owns(*shard),
})
.map(|(_, server)| server)
.collect();