diff --git a/conanfile.py b/conanfile.py index 0d20a477d..b5db85ddd 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.5.9" + version = "7.5.10" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 3d3ec3b0f..55dd2ffff 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -324,6 +324,27 @@ table Consensus { // Log frequency for gc_repl_reqs messages (log every N times) // 60 * repl_dev_cleanup_interval_sec (60s) means every 1 hour we will log the gc repl reqs info. gc_repl_reqs_log_frequency: uint32 = 60 (hotswap); + + // Controls whether senders compute and attach a CRC32 checksum to pushdata/fetchdata payloads. + // The receiver always skips CRC verification when the checksum field is zero, so this flag only + // affects the send side. + // + // ROLLING UPGRADE SAFETY — two directions to consider: + // Safe: old sender → new receiver. Old nodes send raw block data with no FlatBuffer header; + // new receivers detect the absence of the "FDRS" identifier and fall back to legacy + // processing. checksum=0 (FlatBuffer field default) causes CRC verification to be + // skipped automatically. + // UNSAFE: new sender (data_checksum_enabled=true) → old receiver. An old follower has no + // try-and-fallback logic; it will treat the prepended FlatBuffer header bytes as raw + // block data and write them to disk — SILENT DATA CORRUPTION with no error logged. + // + // DEPLOYMENT RULE: do NOT enable this flag via hotswap until every node in the cluster has been + // upgraded to a version that contains the try-and-fallback FetchData receiver. Enable only + // after the rolling upgrade is complete. + // + // Not enabled by default: TCP already detects partial-transfer corruption; CRC is opt-in for + // environments requiring additional durability guarantees. + data_checksum_enabled: bool = false (hotswap); } table HomeStoreSettings { diff --git a/src/lib/replication/fetch_data_rpc.fbs b/src/lib/replication/fetch_data_rpc.fbs index e809cde42..044e188b1 100644 --- a/src/lib/replication/fetch_data_rpc.fbs +++ b/src/lib/replication/fetch_data_rpc.fbs @@ -19,6 +19,7 @@ table ResponseEntry { dsn : uint64; // Data Sequence number raft_term : uint64; // Raft term number data_size : uint32; // Size of the data which is sent as separate non flatbuffer + checksum: uint32; // CRC32 over the data for this entry; 0 when checksum is disabled } table FetchDataResponse { diff --git a/src/lib/replication/push_data_rpc.fbs b/src/lib/replication/push_data_rpc.fbs index d9a981e7c..5b31d0e1d 100644 --- a/src/lib/replication/push_data_rpc.fbs +++ b/src/lib/replication/push_data_rpc.fbs @@ -10,6 +10,7 @@ table PushDataRequest { user_key : [ubyte]; // User key data data_size : uint32; // Data size, actual data is sent as separate blob not by flatbuffer time_ms: uint64; // time point when originator pushed this request; + checksum: uint32; // CRC32 over the data payload; 0 when checksum is disabled } root_type PushDataRequest; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b83a9993b..5e7106c06 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -27,6 +27,14 @@ #include namespace homestore { + +// 4-byte FlatBuffer file identifier embedded by new senders in every FetchDataResponse buffer. +// Not declared in fetch_data_rpc.fbs because FetchDataResponse is a nested table, not the schema +// root_type. The identifier is set programmatically on the send side and checked via +// BufferHasIdentifier on the receive side; Verifier is given nullptr to skip the schema-level +// identifier check (which would require FetchDataResponse to be the root_type). +static constexpr char kFetchDataRespIdentifier[flatbuffers::kFileIdentifierLength + 1] = "FDRS"; + std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1}; RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk >&& rd_sb, bool load_existing) : @@ -1097,11 +1105,24 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list const& data) { auto& builder = rreq->create_fb_builder(); + // Compute CRC32 over the data payload before serialising so the follower can detect corruption in transit. + // checksum=0 is the sentinel for "not computed"; the receiver skips verification when it sees 0. + // A legitimately computed CRC of 0 (~1 in 4B) is indistinguishable from the sentinel and will + // also skip verification for that packet — an accepted limitation of this design. + uint32_t checksum = 0; + if (HS_DYNAMIC_CONFIG(consensus.data_checksum_enabled)) { + checksum = init_crc32; + for (auto const& iov : data.iovs) { + checksum = crc32_ieee(checksum, r_cast< const unsigned char* >(iov.iov_base), iov.iov_len); + } + } + // Prepare the rpc request packet with all repl_reqs details builder.FinishSizePrefixed(CreatePushDataRequest( builder, rreq->traceID(), server_id(), rreq->term(), rreq->dsn(), builder.CreateVector(rreq->header().cbytes(), rreq->header().size()), - builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms())); + builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms(), + checksum)); rreq->m_pkts = sisl::io_blob::sg_list_to_ioblob_list(data); rreq->m_pkts.insert(rreq->m_pkts.begin(), sisl::io_blob{builder.GetBufferPointer(), builder.GetSize(), false}); @@ -1144,9 +1165,27 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d rpc_data->send_response(); return; } + if (incoming_buf.size() < sizeof(flatbuffers::uoffset_t)) { + RD_LOGW(NO_TRACE_ID, "Data Channel: PushData received buffer too small ({}), ignoring", incoming_buf.size()); + rpc_data->send_response(); + return; + } - auto const fb_size = - flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t); + auto const fb_size = static_cast< uint64_t >( + flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes())) + + sizeof(flatbuffers::uoffset_t); + if (fb_size > static_cast< uint64_t >(incoming_buf.size())) { + RD_LOGW(NO_TRACE_ID, "Data Channel: PushData received with oversized FlatBuffer header ({}), ignoring", + fb_size); + rpc_data->send_response(); + return; + } + flatbuffers::Verifier push_verifier{incoming_buf.cbytes(), static_cast< size_t >(fb_size)}; + if (!push_verifier.VerifySizePrefixedBuffer< PushDataRequest >(nullptr)) { + RD_LOGW(NO_TRACE_ID, "Data Channel: PushData FlatBuffer verification failed, ignoring"); + rpc_data->send_response(); + return; + } auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes()); if (fb_size + push_req->data_size() != incoming_buf.size()) { RD_LOGW(NO_TRACE_ID, @@ -1155,6 +1194,13 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d rpc_data->send_response(); return; } + // user_header and user_key are optional FlatBuffer fields; guard against null even though + // a well-formed sender always sets them — the Verifier above only checks structural integrity. + if (!push_req->user_header() || !push_req->user_key()) { + RD_LOGW(NO_TRACE_ID, "Data Channel: PushData missing user_header or user_key, ignoring"); + rpc_data->send_response(); + return; + } sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()}; sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()}; repl_key rkey{.server_id = push_req->issuer_replica_id(), @@ -1165,6 +1211,23 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d RD_LOGD(rkey.traceID, "Data Channel: PushData received: time diff={} ms.", get_elapsed_time_ms(req_orig_time_ms)); + if (push_req->checksum() != 0) { + auto const data_ptr = r_cast< const unsigned char* >(incoming_buf.cbytes() + fb_size); + auto computed = crc32_ieee(init_crc32, data_ptr, push_req->data_size()); +#ifdef _PRERELEASE + if (iomgr_flip::instance()->test_flip("corrupt_push_data_checksum")) { computed ^= 0xdeadbeef; } +#endif + if (computed != push_req->checksum()) { + COUNTER_INCREMENT(m_metrics, push_data_checksum_mismatch_cnt, 1); + RD_LOGE(rkey.traceID, + "Data Channel: PushData checksum mismatch dsn={}, expected={:#010x}, computed={:#010x}, dropping " + "(follower will fetch from remote on next Raft retry)", + push_req->dsn(), push_req->checksum(), computed); + rpc_data->send_response(); + return; + } + } + #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("drop_push_data_request")) { RD_LOGI(rkey.traceID, @@ -1516,7 +1579,27 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); if (!incoming_buf.cbytes()) { - RD_LOGW(NO_TRACE_ID, "Data Channel: PushData received with empty buffer, ignoring this call"); + RD_LOGW(NO_TRACE_ID, "Data Channel: FetchData received with empty buffer, ignoring this call"); + rpc_data->send_response(); + return; + } + if (incoming_buf.size() < sizeof(flatbuffers::uoffset_t)) { + RD_LOGW(NO_TRACE_ID, "Data Channel: FetchData received buffer too small ({}), ignoring", incoming_buf.size()); + rpc_data->send_response(); + return; + } + auto const fetch_fb_size = static_cast< uint64_t >( + flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes())) + + sizeof(flatbuffers::uoffset_t); + if (fetch_fb_size > static_cast< uint64_t >(incoming_buf.size())) { + RD_LOGW(NO_TRACE_ID, "Data Channel: FetchData received with oversized FlatBuffer header ({}), ignoring", + fetch_fb_size); + rpc_data->send_response(); + return; + } + flatbuffers::Verifier fetch_verifier{incoming_buf.cbytes(), static_cast< size_t >(fetch_fb_size)}; + if (!fetch_verifier.VerifySizePrefixedBuffer< FetchData >(nullptr)) { + RD_LOGW(NO_TRACE_ID, "Data Channel: FetchData FlatBuffer verification failed, ignoring"); rpc_data->send_response(); return; } @@ -1525,9 +1608,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ RD_LOGT(NO_TRACE_ID, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); + struct FetchEntryMeta { + int64_t lsn; + uint64_t dsn; + uint64_t raft_term; + }; std::vector< sisl::sg_list > sgs_vec; + std::vector< FetchEntryMeta > entry_metas; std::vector< folly::Future< std::error_code > > futs; sgs_vec.reserve(fetch_req->request()->entries()->size()); + entry_metas.reserve(fetch_req->request()->entries()->size()); futs.reserve(fetch_req->request()->entries()->size()); for (auto const& req : *(fetch_req->request()->entries())) { @@ -1543,8 +1633,9 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ sgs.iovs.emplace_back( iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size}); - // accumulate the sgs for later use (send back to the requester)); + // accumulate the sgs and per-entry metadata for later use (send back to the requester); sgs_vec.push_back(sgs); + entry_metas.push_back({req->lsn(), req->dsn(), req->raft_term()}); if (originator != server_id()) { RD_LOGD(NO_TRACE_ID, "non-originator FetchData received: dsn={} lsn={} originator={}, my_server_id={}", @@ -1560,7 +1651,8 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ } folly::collectAllUnsafe(futs).thenValue( - [this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec)](auto&& vf) { + [this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec), + entry_metas = std::move(entry_metas)](auto&& vf) { for (auto const& err_c : vf) { const auto& err = err_c.value(); if (err) { @@ -1582,20 +1674,58 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ RD_LOGT(NO_TRACE_ID, "Data Channel: FetchData data read completed for {} buffers", sgs_vec.size()); + // Emit a size-prefixed FetchDataResponse FlatBuffer before the block data only when + // data_checksum_enabled is true. When disabled, send raw block data (pre-checksum wire + // format) so old receivers are never surprised by an unexpected header during rolling + // upgrades. Receivers use try-and-fallback to detect the FlatBuffer transparently. + std::unique_ptr< uint8_t[] > hdr_buf; + uint32_t hdr_size = 0; + + if (HS_DYNAMIC_CONFIG(consensus.data_checksum_enabled)) { + flatbuffers::FlatBufferBuilder resp_builder; + std::vector< flatbuffers::Offset< ResponseEntry > > resp_entries; + for (size_t i = 0; i < sgs_vec.size(); ++i) { + auto const& sgs = sgs_vec[i]; + uint32_t checksum = init_crc32; + for (auto const& iov : sgs.iovs) { + checksum = crc32_ieee(checksum, r_cast< const unsigned char* >(iov.iov_base), iov.iov_len); + } + int64_t const lsn = (i < entry_metas.size()) ? entry_metas[i].lsn : 0; + uint64_t const dsn = (i < entry_metas.size()) ? entry_metas[i].dsn : 0; + uint64_t const raft_term = (i < entry_metas.size()) ? entry_metas[i].raft_term : 0; + resp_entries.push_back( + CreateResponseEntry(resp_builder, lsn, dsn, raft_term, + static_cast< uint32_t >(sgs.size), checksum)); + } + resp_builder.FinishSizePrefixed( + CreateFetchDataResponse(resp_builder, server_id(), resp_builder.CreateVector(resp_entries)), + kFetchDataRespIdentifier); + + // Heap-copy the FlatBuffer so it outlives resp_builder until the send completion callback. + hdr_size = resp_builder.GetSize(); + hdr_buf = std::make_unique< uint8_t[] >(hdr_size); + std::memcpy(hdr_buf.get(), resp_builder.GetBufferPointer(), hdr_size); + } + // now prepare the io_blob_list to response back to requester; nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{}; + if (hdr_buf) { pkts.emplace_back(sisl::io_blob{hdr_buf.get(), hdr_size, false}); } for (auto const& sgs : sgs_vec) { auto const ret = sisl::io_blob::sg_list_to_ioblob_list(sgs); pkts.insert(pkts.end(), ret.begin(), ret.end()); } + // All potential throws are past; release ownership of hdr_buf to the completion lambda. + auto* raw_hdr = hdr_buf.release(); - rpc_data->set_comp_cb([sgs_vec = std::move(sgs_vec)](boost::intrusive_ptr< sisl::GenericRpcData >&) { - for (auto const& sgs : sgs_vec) { - for (auto const& iov : sgs.iovs) { - iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + rpc_data->set_comp_cb( + [sgs_vec = std::move(sgs_vec), raw_hdr](boost::intrusive_ptr< sisl::GenericRpcData >&) { + delete[] raw_hdr; // delete[] nullptr is a no-op when checksums are disabled + for (auto const& sgs : sgs_vec) { + for (auto const& iov : sgs.iovs) { + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + } } - } - }); + }); rpc_data->send_response(pkts); }); @@ -1612,13 +1742,78 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons return; } + // New senders embed a "FDRS" file identifier at bytes 8-11 of the size-prefixed FlatBuffer. + // Old senders emit raw block data with no prefix. A collision (raw data containing "FDRS" at + // exactly bytes 8-11) is astronomically unlikely and is ruled out by a further structural check + // via flatbuffers::Verifier even if a collision does occur. + const flatbuffers::Vector< flatbuffers::Offset< ResponseEntry > >* resp_entries = nullptr; + + static constexpr size_t kMinFetchRespHdrSize = + 2 * sizeof(flatbuffers::uoffset_t) + flatbuffers::kFileIdentifierLength; + if (total_size >= kMinFetchRespHdrSize && + flatbuffers::BufferHasIdentifier(raw_data + sizeof(flatbuffers::uoffset_t), kFetchDataRespIdentifier)) { + auto const fb_hdr_size = static_cast< uint64_t >( + flatbuffers::ReadScalar< flatbuffers::uoffset_t >(raw_data)) + + sizeof(flatbuffers::uoffset_t); + if (fb_hdr_size <= static_cast< uint64_t >(total_size)) { + flatbuffers::Verifier verifier{raw_data, fb_hdr_size}; + if (verifier.VerifySizePrefixedBuffer< FetchDataResponse >(nullptr)) { + auto const fetch_resp = flatbuffers::GetSizePrefixedRoot< FetchDataResponse >(raw_data); + raw_data += fb_hdr_size; + total_size -= fb_hdr_size; + if (fetch_resp->entries()) { + resp_entries = fetch_resp->entries(); + if (resp_entries->size() != rreqs.size()) { + RD_LOGW(NO_TRACE_ID, + "Data Channel: FetchData response entry count {} != request count {}, " + "some entries will not be checksum-verified", + resp_entries->size(), rreqs.size()); + } + } + } else { + RD_LOGD(NO_TRACE_ID, + "Data Channel: FetchData response FlatBuffer verifier failed despite matching identifier, " + "treating as legacy raw-block format"); + } + } + } + + // Count only actual block data bytes (framing header excluded). COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size); RD_LOGD(NO_TRACE_ID, "Data Channel: FetchData completed for {} requests", rreqs.size()); - for (auto const& rreq : rreqs) { + std::vector< repl_req_ptr_t > checksum_mismatch_rreqs; + for (size_t i = 0; i < rreqs.size(); ++i) { + auto const& rreq = rreqs[i]; auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size(); + if (data_size > total_size) { + RD_LOGE(NO_TRACE_ID, + "Data Channel: FetchData response truncated: need {} bytes for dsn={} but only {} bytes remain, " + "aborting response processing", + data_size, rreq->dsn(), total_size); + return; + } + + if (resp_entries && i < resp_entries->size() && (*resp_entries)[i]->checksum() != 0) { + auto computed = crc32_ieee(init_crc32, r_cast< const unsigned char* >(raw_data), data_size); +#ifdef _PRERELEASE + if (iomgr_flip::instance()->test_flip("corrupt_fetch_data_checksum")) { computed ^= 0xdeadbeef; } +#endif + if (computed != (*resp_entries)[i]->checksum()) { + COUNTER_INCREMENT(m_metrics, fetch_data_checksum_mismatch_cnt, 1); + RD_LOGE(rreq->traceID(), + "Data Channel: FetchData checksum mismatch dsn={}, expected={:#010x}, computed={:#010x}; " + "re-fetching immediately.", + rreq->dsn(), (*resp_entries)[i]->checksum(), computed); + raw_data += data_size; + total_size -= data_size; + checksum_mismatch_rreqs.emplace_back(rreq); + continue; + } + } + if (!rreq->save_fetched_data(response, raw_data, data_size)) { RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string()); auto const local_size = rreq->local_blkid().blk_count() * get_blk_size(); @@ -1666,6 +1861,12 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons total_size -= data_size; } + if (!checksum_mismatch_rreqs.empty()) { + RD_LOGD(NO_TRACE_ID, "Data Channel: Re-fetching {} rreqs that had checksum mismatches", + checksum_mismatch_rreqs.size()); + check_and_fetch_remote_data(std::move(checksum_mismatch_rreqs)); + } + RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index c27155025..ed75509e5 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -71,6 +71,10 @@ class RaftReplDevMetrics : public sisl::MetricsGroup { REGISTER_COUNTER(read_err_cnt, "total read error count", "read_err_cnt", {"op", "read"}); REGISTER_COUNTER(write_err_cnt, "total write error count", "write_err_cnt", {"op", "write"}); REGISTER_COUNTER(fetch_err_cnt, "total fetch data error count", "fetch_err_cnt", {"op", "fetch"}); + REGISTER_COUNTER(push_data_checksum_mismatch_cnt, "CRC32 mismatches on push data channel", + "push_data_checksum_mismatch_cnt", {"op", "checksum_push"}); + REGISTER_COUNTER(fetch_data_checksum_mismatch_cnt, "CRC32 mismatches on fetch data channel", + "fetch_data_checksum_mismatch_cnt", {"op", "checksum_fetch"}); REGISTER_COUNTER(fetch_rreq_cnt, "total fetch data count", "fetch_data_req_cnt", {"op", "fetch"}); REGISTER_COUNTER(fetch_total_blk_size, "total fetch data blocks size", "fetch_total_blk_size", {"op", "fetch"}); diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index eafad14a8..29e0c0489 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -110,7 +110,127 @@ TEST_F(RaftReplDevTest, Follower_Fetch_OnActive_ReplicaGroup) { g_helper->sync_for_cleanup_start(); if (g_helper->replica_num() != 0) { g_helper->remove_flip("drop_push_data_request"); } } +// Also validates the try-and-fallback receive path: with data_checksum_enabled=false (default), +// no FetchDataResponse FlatBuffer header is emitted by the sender. The new receiver code must +// treat the raw block bytes as old-format data — exactly what an old sender would produce during +// a rolling upgrade. Passes = old-format fallback is correct. +#endif + +// Verifies the happy path with checksums explicitly enabled: writes should commit correctly +// and all replicas should hold the same data. +TEST_F(RaftReplDevTest, Checksum_Enabled_PushData_Path) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + LOGINFO("Enabling data_checksum_enabled"); + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; }); + HS_SETTINGS_FACTORY().save(); + + this->write_on_leader(20, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); +} + +#ifdef _PRERELEASE +// Verifies that the fetch path works correctly with checksums enabled. +// Drops all push-data on non-leader replicas so they are forced to fetch, then checks that +// the framing header is correctly parsed and data arrives intact. +TEST_F(RaftReplDevTest, Checksum_Enabled_FetchData_Path) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + LOGINFO("Enabling data_checksum_enabled"); + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; }); + HS_SETTINGS_FACTORY().save(); + + if (g_helper->replica_num() != 0) { + LOGINFO("Drop all push-data so follower {} must fetch with checksum header", g_helper->replica_num()); + g_helper->set_basic_flip("drop_push_data_request"); + } + + this->write_on_leader(20, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); + if (g_helper->replica_num() != 0) { g_helper->remove_flip("drop_push_data_request"); } +} +// Verifies that a PushData checksum mismatch is detected and the follower recovers correctly. +// The corrupt_push_data_checksum flip causes the receiver to treat a valid CRC as wrong, +// simulating bit corruption in transit. Followers drop the packet and fall back to fetch; +// data must still commit correctly on all replicas. +TEST_F(RaftReplDevTest, Checksum_Mismatch_PushData_Path) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; }); + HS_SETTINGS_FACTORY().save(); + + if (g_helper->replica_num() != 0) { + LOGINFO("Enabling corrupt_push_data_checksum on follower {} to simulate CRC mismatch", + g_helper->replica_num()); + g_helper->set_basic_flip("corrupt_push_data_checksum"); + } + + this->write_on_leader(10, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate data: follower must have recovered via fetch despite push checksum mismatch"); + this->validate_data(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); + if (g_helper->replica_num() != 0) { g_helper->remove_flip("corrupt_push_data_checksum"); } +} + +// Verifies that a FetchData checksum mismatch triggers an immediate re-fetch and the follower +// recovers correctly. Drops push-data to force the fetch path, then fires +// corrupt_fetch_data_checksum once per entry so the first fetch response looks corrupted. +// The immediate re-fetch (check_and_fetch_remote_data) must succeed and data must commit. +TEST_F(RaftReplDevTest, Checksum_Mismatch_FetchData_Path) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; }); + HS_SETTINGS_FACTORY().save(); + + if (g_helper->replica_num() != 0) { + LOGINFO("Follower {}: dropping push-data and injecting one-shot fetch checksum corruption", + g_helper->replica_num()); + g_helper->set_basic_flip("drop_push_data_request"); + // Fire once: first fetch response is checksum-corrupted; the immediate re-fetch succeeds. + g_helper->set_basic_flip("corrupt_fetch_data_checksum", 1, 100); + } + + this->write_on_leader(10, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate data: follower must have recovered via re-fetch after fetch checksum mismatch"); + this->validate_data(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); + if (g_helper->replica_num() != 0) { + g_helper->remove_flip("drop_push_data_request"); + g_helper->remove_flip("corrupt_fetch_data_checksum"); + } +} +#endif + +#ifdef _PRERELEASE TEST_F(RaftReplDevTest, Write_With_Diabled_Leader_Push_Data) { g_helper->set_basic_flip("disable_leader_push_data", std::numeric_limits< int >::max(), 100); LOGINFO("Homestore replica={} setup completed, all the push_data from leader are disabled",