Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions integration/resharding/dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,16 @@ wait_for_no_copy_rows tasks title
wait_for_no_copy_rows task_comments body
wait_for_no_copy_rows settings name


# pg_count PORT TABLE — row count via a direct postgres connection (bypasses pgdog).
pg_count() { PGPASSWORD=pgdog psql -h 127.0.0.1 -p "$1" -U pgdog -d postgres -tAc "SELECT COUNT(*) FROM $2"; }

# check_row_count_matches TABLE
# Verifies total row count matches between source (via pgdog) and destination (via pgdog).
# Suitable for sharded tables where pgdog aggregates across all shards.
check_row_count_matches() {
local table="$1"
local source_count
local destination_count
local source_count destination_count

source_count=$(psql -d source -tAc "SELECT COUNT(*) FROM ${table}")
destination_count=$(psql -d destination -tAc "SELECT COUNT(*) FROM ${table}")
Expand All @@ -167,11 +173,31 @@ check_row_count_matches() {
echo "OK ${table}: ${source_count} rows"
}

# check_omni_each_shard TABLE
# For omni (non-sharded) tables: queries each destination shard directly and asserts
# it holds the full source row count. A query through pgdog hits one shard and cannot
# detect a shard that is missing rows.
check_omni_each_shard() {
local table="$1"
local source_count dest0_count dest1_count

source_count=$(pg_count 15432 "${table}")
dest0_count=$(pg_count 15434 "${table}")
dest1_count=$(pg_count 15435 "${table}")

if [ "${source_count}" -ne "${dest0_count}" ] || [ "${source_count}" -ne "${dest1_count}" ]; then
echo "MISMATCH omni ${table}: source=${source_count} dest-0(15434)=${dest0_count} dest-1(15435)=${dest1_count} (expected ${source_count} on each)"
exit 1
fi

echo "OK omni ${table}: ${source_count} rows on each shard"
}

check_row_count_matches tenants
check_row_count_matches accounts
check_row_count_matches projects
check_row_count_matches tasks
check_row_count_matches task_comments
check_row_count_matches settings
check_omni_each_shard settings

cleanup
70 changes: 43 additions & 27 deletions pgdog/docs/issues/omni-table-subscriber-deadlock.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,7 @@ errors. Combined with Solution 1, it catches the multi-row cases sequential appl
| Both | ✓ | bounded | ✓ |

Ship `lock_timeout` first — lower-risk, immediate protection against every deadlock shape.
Sequential apply reduces single-row contention and is a prerequisite for any
destination-partitioned apply (below).

### Further: destination-partitioned apply (long-term)

Each destination shard is written by exactly one source subscriber for omni-table DML:

```
owns_omni_destination(source_shard, dest_shard, n_sources) = dest_shard % n_sources == source_shard
```

Removes cross-subscriber row-lock contention entirely. Requires every source subscriber to see
every omni-table WAL event. If omni writes can originate from a single source only, partitioning
silently leaves other destinations stale — routing must first establish source-presence before
partitioning is safe.

---

Sequential apply reduces single-row contention. Both are superseded by Solution 6.
### Solution 3: per-table async writer task

One long-lived Tokio task per replicated table. Subscribers buffer the full WAL transaction in
Expand Down Expand Up @@ -304,21 +287,54 @@ Treat as a `send()`-loop optimization parallel to Solutions 1+2, not part of the

---

### Solution 6: modulo-partition subscriber ownership (implemented)

The root cause is that every subscriber writes every omni-table row to every destination.
The fix assigns each subscriber a disjoint subset of destination shards so no two subscribers
ever hold locks on the same row at the same time.

The partitioning rule lives in `OmniOwnership::owns()` (`omni_ownership.rs`): subscriber
`source_shard` owns destination shard `d` when `d % n_sources == source_shard`. With two
subscribers and two destinations, sub-0 owns even-indexed destinations and sub-1 owns
odd-indexed ones — they never touch the same row on the same server simultaneously.

Enforcement happens in `send()` (`stream.rs`). The connection filter always passes each
shard through `partition.owns()`, with one exception: when there are multiple destination
connections, `Shard::Direct` and `Shard::Multi` carry an explicit shard computed from the
row's shard key and must land precisely — ownership filtering doesn't apply there.

The edge case that required care: with a single destination shard, the query router
collapses `Shard::All` to `Shard::Direct(0)` for every table, including omni tables that
have no shard key. Without a guard, the ownership check would be bypassed and all
subscribers would write to the one destination — the original bug, just on a smaller
cluster. The `n_conns == 1` path in the filter catches this, routing single-connection
dispatches through `partition.owns()` regardless of how the shard was computed.

**Correctness prerequisite:** every subscriber must receive every omni-table WAL event.
Logical replication publishes each WAL record to all subscribers of the same publication,
so this holds in any standard deployment.

**Sharded tables are unaffected.** `OmniOwnership` is constructed with `n_sources = 1`
for sharded-table subscribers, causing `owns()` to return `true` unconditionally — the
behavior is identical to the pre-fix code.

---

### Comparison

| | Deadlock fixed | Cross-table/shard atomicity | Throughput impact | Memory risk | Complexity | Recommendation |
|---|:---:|:---:|:---:|:---:|:---:|---|
| Solution 1: sequential per-dest apply | single-row only | preserved | minor (serial dest RTTs per row) | none | low | Ship |
| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Ship first |
| **Solutions 1 + 2 combined** | **yes (bounded for multi-row)** | **preserved** | **minor** | **none** | **low** | **Recommended** |
| Solution 3: per-table writer | yes | broken | severe: N-way → 1-way per omni table | unbounded | high | Not recommended |
| Solution 4: per-shard writer | silent divergence† | broken | severe: removes all destination parallelism | unbounded | high | **Discard** |
| Solution 5: buffer + `Sync` | no | preserved | minor (apply lag) | unbounded | low | Optional optimization |
| Destination-partitioned apply | yes | preserved | none | none | high | Long-term structural fix |
| Solution 1: sequential per-dest apply | single-row only | preserved | minor | none | low | Superseded |
| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Superseded |
| Solution 3: per-table writer | yes | broken | severe | unbounded | high | Not recommended |
| Solution 4: per-shard writer | silent divergence† | broken | severe | unbounded | high | Discard |
| Solution 5: buffer + `Sync` | no | preserved | minor | unbounded | low | Optional optimization |
| **Modulo-partition subscriber ownership** | **yes (structural)** | **preserved** | **none** | **none** | **low** | **Implemented** |

†Solution 4 produces persistent, undetectable row-level disagreement across shards when two
subscribers race on the same omni row. The deadlock is observable and recoverable; this divergence
is neither.

Ship 1+2 short-term; pursue destination-partitioned apply long-term. Solutions 3 and 4 trade the
deadlock for worse failure modes and throughput. Solution 5 is orthogonal.
Modulo-partition subscriber ownership is the structural fix and is now implemented in
`send()`. Solutions 1 and 2 remain valid as defence-in-depth. Solutions 3 and 4 trade the
deadlock for worse failure modes and throughput. Solution 5 is orthogonal.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing::{debug, info, warn};
use super::super::{ensure_validation, publisher::Table, Error};
use super::ReplicationSlot;

use crate::backend::replication::logical::subscriber::omni_ownership::OmniOwnership;
use crate::backend::replication::logical::subscriber::stream::StreamSubscriber;
use crate::backend::replication::publisher::progress::Progress;
use crate::backend::replication::publisher::Lsn;
Expand Down Expand Up @@ -196,6 +197,7 @@ impl Publisher {
self.create_slots(source).await?;
}

let n_sources = source.shards().len();
for (number, _) in source.shards().iter().enumerate() {
// Use table offsets from data sync
// or from loading them above.
Expand All @@ -204,7 +206,10 @@ impl Publisher {
.get(&number)
.ok_or(Error::NoReplicationTables(number))?;
// Handles the logical replication stream messages.
let mut stream = StreamSubscriber::new(dest, tables);
// Each subscriber owns a partition of destination shards for omni-table DML
// (dest_shard % n_sources == source_shard), preventing cross-subscriber deadlocks.
let mut stream =
StreamSubscriber::new(dest, tables, OmniOwnership::new(number, n_sources));

// Take ownership of the slot for replication.
let mut slot = self
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/replication/logical/subscriber/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod context;
pub mod copy;
pub mod omni_ownership;
pub mod parallel_connection;
pub mod stream;

Expand Down
116 changes: 116 additions & 0 deletions pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/// Controls which destination shards a subscriber writes to for omni (unsharded) tables.
///
/// Partitions destinations via `dest_shard % n_sources == source_shard` so that each
/// subscriber owns a disjoint subset, preventing cross-subscriber row-lock deadlocks.
#[derive(Debug, Clone, Copy)]
pub struct OmniOwnership {
source_shard: usize,
n_sources: usize,
}

impl OmniOwnership {
pub fn new(source_shard: usize, n_sources: usize) -> Self {
debug_assert!(
n_sources == 0 || source_shard < n_sources,
"source_shard ({source_shard}) must be < n_sources ({n_sources})"
);
Self {
source_shard,
n_sources,
}
}

/// Returns true if this subscriber should write omni-table DML to `dest_shard`.
pub fn owns(&self, dest_shard: usize) -> bool {
if self.n_sources <= 1 {
return true;
}
dest_shard % self.n_sources == self.source_shard
}
}

impl Default for OmniOwnership {
fn default() -> Self {
Self::new(0, 1)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn omni_single_source_owns_all_dests() {
let p = OmniOwnership::new(0, 1);
assert!(p.owns(0));
assert!(p.owns(1));
assert!(p.owns(2));
assert!(p.owns(7));
}

#[test]
fn omni_zero_sources_owns_all_dests() {
let p = OmniOwnership::new(0, 0);
assert!(p.owns(0));
assert!(p.owns(1));
assert!(p.owns(3));
}

#[test]
fn omni_equal_sources_and_dests() {
// n_sources == n_dests == 3: strict 1:1, each source owns only its own index.
assert!(OmniOwnership::new(0, 3).owns(0));
assert!(OmniOwnership::new(1, 3).owns(1));
assert!(OmniOwnership::new(2, 3).owns(2));

assert!(!OmniOwnership::new(1, 3).owns(0));
assert!(!OmniOwnership::new(2, 3).owns(0));
assert!(!OmniOwnership::new(0, 3).owns(1));
assert!(!OmniOwnership::new(2, 3).owns(1));
assert!(!OmniOwnership::new(0, 3).owns(2));
assert!(!OmniOwnership::new(1, 3).owns(2));
}

#[test]
fn omni_fewer_sources_than_dests() {
// n_sources=2, n_dests=4: sub-0 owns even dests, sub-1 owns odd dests.
let p0 = OmniOwnership::new(0, 2);
assert!(p0.owns(0));
assert!(p0.owns(2));
assert!(!p0.owns(1));
assert!(!p0.owns(3));

let p1 = OmniOwnership::new(1, 2);
assert!(p1.owns(1));
assert!(p1.owns(3));
assert!(!p1.owns(0));
assert!(!p1.owns(2));
}

#[test]
fn omni_more_sources_than_dests_all_dests_covered() {
// n_sources=5, n_dests=3: subs 0-2 each own their matching dest exclusively.
assert!(OmniOwnership::new(0, 5).owns(0));
assert!(OmniOwnership::new(1, 5).owns(1));
assert!(OmniOwnership::new(2, 5).owns(2));

assert!(!OmniOwnership::new(1, 5).owns(0));
assert!(!OmniOwnership::new(2, 5).owns(0));
assert!(!OmniOwnership::new(0, 5).owns(1));
assert!(!OmniOwnership::new(2, 5).owns(1));
assert!(!OmniOwnership::new(0, 5).owns(2));
assert!(!OmniOwnership::new(1, 5).owns(2));
}

#[test]
fn omni_more_sources_than_dests_excess_sources_idle() {
// n_sources=5, n_dests=3: subs 3 and 4 own no destinations.
assert!(!OmniOwnership::new(3, 5).owns(0));
assert!(!OmniOwnership::new(3, 5).owns(1));
assert!(!OmniOwnership::new(3, 5).owns(2));

assert!(!OmniOwnership::new(4, 5).owns(0));
assert!(!OmniOwnership::new(4, 5).owns(1));
assert!(!OmniOwnership::new(4, 5).owns(2));
}
}
22 changes: 17 additions & 5 deletions pgdog/src/backend/replication/logical/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::super::publisher::{tables_missing_unique_index, NonIdentityColumnsPre
use super::super::{
ensure_validation, publisher::Table, Error, TableValidationError, TableValidationErrorKind,
};
use super::omni_ownership::OmniOwnership;
use super::StreamContext;
use crate::net::messages::replication::logical::tuple_data::{Identifier, TupleData};
use crate::net::messages::replication::logical::update::Update as XLogUpdate;
Expand Down Expand Up @@ -128,10 +129,13 @@ pub struct StreamSubscriber {

// Missed rows.
missed_rows: MissedRows,

// Determines which destination shards this subscriber owns for omni tables.
partition: OmniOwnership,
}

impl StreamSubscriber {
pub fn new(cluster: &Cluster, tables: &[Table]) -> Self {
pub fn new(cluster: &Cluster, tables: &[Table], partition: OmniOwnership) -> Self {
let cluster = cluster.logical_stream();
Self {
cluster,
Expand All @@ -158,6 +162,7 @@ impl StreamSubscriber {
in_transaction: false,
missed_rows: MissedRows::default(),
keys: HashMap::default(),
partition,
}
}

Expand Down Expand Up @@ -223,14 +228,21 @@ impl StreamSubscriber {

// Dispatch a pre-built bind to the matching shard(s).
async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> {
// Locals avoid borrowing self inside the iter_mut closure.
let partition = self.partition;
let n_conns = self.connections.len();
let mut conns: Vec<_> = self
.connections
.iter_mut()
.enumerate()
.filter(|(shard, _)| match val {
Shard::Direct(direct) => *shard == *direct,
Shard::Multi(multi) => multi.contains(shard),
_ => true,
// With a single destination shard the router collapses Shard::All
// to Direct(0), bypassing the partition ownership check. Apply
// partition.owns() for all variants when there is only one connection
// so that omni-table writes are still partitioned across subscribers.
Shard::Direct(direct) if n_conns > 1 => *shard == *direct,
Shard::Multi(multi) if n_conns > 1 => multi.contains(shard),
_ => partition.owns(*shard),
})
.map(|(_, server)| server)
.collect();
Expand Down Expand Up @@ -995,7 +1007,7 @@ mod tests {

fn make_subscriber() -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
StreamSubscriber::new(&cluster, &[])
StreamSubscriber::new(&cluster, &[], OmniOwnership::default())
}

#[test]
Expand Down
Loading
Loading