Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.1.18"
version = "4.2.0"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
3 changes: 2 additions & 1 deletion src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn; // created_lsn
uint64_t lsn; // created_lsn
uint64_t sealed_lsn{INT64_MAX}; // sealed_lsn
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
15 changes: 13 additions & 2 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,16 @@ target_link_libraries(homestore_test_gc PUBLIC homeobject_homestore ${COMMON_TES
add_test(NAME HomestoreTestGC COMMAND homestore_test_gc -csv error --executor immediate --config_path ./
--override_config hs_backend_config.enable_gc=true
--override_config hs_backend_config.gc_enable_read_verify=true
--override_config hs_backend_config.gc_garbage_rate_threshold=0
--override_config hs_backend_config.gc_scan_interval_sec=5)
--override_config hs_backend_config.gc_garbage_rate_threshold=0
--override_config hs_backend_config.gc_scan_interval_sec=5
--gtest_filter=-HomeObjectFixture.StalePChunkRouteAfterGC:-HomeObjectFixture.StaleBlobRouteAfterSealAndGC)

# Shard-race repro tests: require chunks_per_pg=1 (forces vchunk reuse after seal) and gc_garbage_rate_threshold=0.
# Run together since they share the same config requirements.
add_test(NAME HomestoreTestGC_ShardRaceTests
COMMAND homestore_test_gc -csv error --executor immediate --config_path ./
--chunks_per_pg 1
--override_config hs_backend_config.enable_gc=true
--override_config hs_backend_config.gc_garbage_rate_threshold=0
--override_config hs_backend_config.gc_scan_interval_sec=5
--gtest_filter=HomeObjectFixture.StalePChunkRouteAfterGC:HomeObjectFixture.StaleBlobRouteAfterSealAndGC)
1 change: 0 additions & 1 deletion src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ bool GCManager::pdev_gc_actor::copy_valid_data(

// prepare a shard header for this shard in move_to_chunk
sisl::sg_list header_sgs = generate_shard_super_blk_sg_list(shard_id);

// we ignore the state in shard header blk. we never read a shard header since we don`t know where it is(nor
// record the pba in indextable)
#if 0
Expand Down
35 changes: 8 additions & 27 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,8 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t p_chunk_id, bool ad
}
}

// select_chunk will only be called in homestore when creating a shard.
csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const homestore::blk_alloc_hints& hint) {
auto& chunkIdHint = hint.chunk_id_hint;
if (chunkIdHint.has_value()) {
LOGWARNMOD(homeobject, "should not allocated a chunk with exiting chunkIdHint={} in hint!",
chunkIdHint.value());
return nullptr;
}

if (!hint.application_hint.has_value()) {
LOGWARNMOD(homeobject, "should not allocated a chunk without exiting application_hint in hint!");
return nullptr;
} else {
// Both chunk_num_t and pg_id_t are of type uint16_t.
static_assert(std::is_same< pg_id_t, uint16_t >::value, "pg_id_t is not uint16_t");
static_assert(std::is_same< homestore::chunk_num_t, uint16_t >::value, "chunk_num_t is not uint16_t");
auto application_hint = hint.application_hint.value();
pg_id_t pg_id = (uint16_t)(application_hint >> 16 & 0xFFFF);
homestore::chunk_num_t v_chunk_id = (uint16_t)(application_hint & 0xFFFF);
return select_specific_chunk(pg_id, v_chunk_id);
}
RELEASE_ASSERT(false, "never be used in HOMEOBJECT");
}

bool HeapChunkSelector::try_mark_chunk_to_gc_state(const chunk_num_t chunk_id, bool force) {
Expand Down Expand Up @@ -106,24 +87,24 @@ void HeapChunkSelector::mark_chunk_out_of_gc_state(const chunk_num_t chunk_id, c
final_state);
}

csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id) {
std::optional< homestore::chunk_num_t > HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id,
const chunk_num_t v_chunk_id) {
homestore::shared< ExtendedVChunk > chunk;

while (true) {
{
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg={}", pg_id);
return nullptr;
return std::nullopt;
}

auto pg_chunk_collection = pg_it->second;
auto& pg_chunks = pg_chunk_collection->m_pg_chunks;
std::scoped_lock lock(pg_chunk_collection->mtx);
if (v_chunk_id >= pg_chunks.size()) {
LOGWARNMOD(homeobject, "No chunk found for v_chunk_id={}", v_chunk_id);
return nullptr;
return std::nullopt;
}

chunk = pg_chunks[v_chunk_id];
Expand All @@ -145,9 +126,9 @@ csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const
std::this_thread::sleep_for(std::chrono::seconds(1));
}

LOGDEBUGMOD(homeobject, "chunk={} is selected for v_chunk_id={}, pg={}", chunk->get_chunk_id(), v_chunk_id, pg_id);
LOGDEBUGMOD(homeobject, "pchunk={} is selected for v_chunk_id={}, pg={}", chunk->get_chunk_id(), v_chunk_id, pg_id);

return chunk->get_internal_chunk();
return chunk->get_chunk_id();
}

void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb) {
Expand Down Expand Up @@ -510,7 +491,7 @@ std::shared_ptr< const std::vector< homestore::chunk_num_t > > HeapChunkSelector
return p_chunk_ids;
}

std::optional< homestore::chunk_num_t > HeapChunkSelector::get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) {
std::optional< homestore::chunk_num_t > HeapChunkSelector::pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HeapChunkSelector : public homestore::ChunkSelector {

// this function will be used by create shard or recovery flow to mark one specific chunk to be busy, caller should
// be responsible to use release_chunk() interface to release it when no longer to use the chunk anymore.
csharedChunk select_specific_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);
std::optional< chunk_num_t > select_specific_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

/**
* try to mark a chunk as gc state, so that it will not be selected by any creating shard.
Expand Down Expand Up @@ -126,7 +126,7 @@ class HeapChunkSelector : public homestore::ChunkSelector {
* @param pg_id The ID of the pg.
* @return An optional chunk_num_t value representing v_chunk_id, or std::nullopt if no space left.
*/
std::optional< chunk_num_t > get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id);
std::optional< chunk_num_t > pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id);

// this should be called on each pg meta blk found
bool recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids);
Expand Down
56 changes: 55 additions & 1 deletion src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include "lib/blob_route.hpp"
#include <homestore/homestore.hpp>
#include <homestore/blkdata_service.hpp>
#ifdef _PRERELEASE
#include <iomgr/iomgr_flip.hpp>
#endif

SISL_LOGGING_DECL(blobmgr)

Expand Down Expand Up @@ -266,11 +269,46 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
return;
}

#ifdef _PRERELEASE
// Pause PUT_BLOB commit at function entry. While paused the test can seal the shard; the
// sealed_lsn guard then rejects this late blob when the gate releases.
iomgr_flip::instance()->callback_flip("pause_put_blob_commit");
#endif

const auto shard_id = msg_header->shard_id;
auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes())));

int64_t shard_sealed_lsn;
{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_id);
RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", shard_id,
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask));
shard_sealed_lsn = (*iter->second)->info.sealed_lsn;
}

if (lsn >= shard_sealed_lsn) {
homestore::data_service().async_free_blk(pbas).thenValue([lsn, shard_id, blob_id, tid, &pbas](auto&& err) {
if (err) {
BLOGW(tid, shard_id, blob_id, "failed to free blob data blk, err={}, lsn={}, blkid={}", err.message(),
lsn, pbas.to_string());
} else {
BLOGD(tid, shard_id, blob_id, "succeed to free blob data blk, lsn={}, blkid={}", lsn, pbas.to_string());
}
});

BLOGD(tid, shard_id, blob_id,
"try to commit put_blob message to a non-open shard, lsn={}, shard_sealed_lsn={}, skip it!", lsn,
shard_sealed_lsn);

if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD))); }
return;
}

auto const pg_id = msg_header->pg_id;

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.shard_id = shard_id;
blob_info.blob_id = blob_id;
blob_info.pbas = pbas;

Expand Down Expand Up @@ -464,6 +502,14 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

if (msg_header->msg_type != ReplicationMessageType::PUT_BLOB_MSG) {
LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, unsupported message type {}, reject it!", tid,
msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width),
(msg_header->shard_id & homeobject::shard_mask), msg_header->pg_id, msg_header->msg_type);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNSUPPORTED_OP))); }
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

auto hs_pg = get_hs_pg(msg_header->pg_id);
if (hs_pg == nullptr) {
LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, Received a blob_put on an unknown pg={}, underlying "
Expand All @@ -488,6 +534,14 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());

if (hs_shard->sb_->info.state != ShardInfo::State::OPEN) {
LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, Received a blob_put on an unopen shard, reject it!", tid,
msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width),
(msg_header->shard_id & homeobject::shard_mask));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD))); }
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;
if (hs_ctx->is_proposer()) { hints.reserved_blks = get_reserved_blks(); }
Expand Down
17 changes: 3 additions & 14 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,7 @@ class HSHomeObject : public HomeObjectImpl {

static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
static std::string serialize_shard_info(const ShardInfo& info);
void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, homestore::chunk_num_t p_chunk_id,
homestore::blk_count_t blk_count, trace_id_t tid = 0);
void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, trace_id_t tid = 0);
void add_new_shard_to_map(std::unique_ptr< HS_Shard > shard);
void update_shard_in_map(const ShardInfo& shard_info);

Expand Down Expand Up @@ -901,8 +900,8 @@ class HSHomeObject : public HomeObjectImpl {
* @param repl_dev The replication device.
* @param hs_ctx The replication request context.
*/
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

bool on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
Expand Down Expand Up @@ -949,16 +948,6 @@ class HSHomeObject : public HomeObjectImpl {
*/
void on_replica_restart();

/**
* @brief Extracts the physical chunk ID for create shard from the message.
*
* @param header The message header that includes the shard_info_superblk, which contains the data necessary for
* extracting and mapping the chunk ID.
* @return An optional virtual chunk id if the extraction and mapping process is successful, otherwise an empty
* optional.
*/
std::optional< homestore::chunk_num_t > resolve_v_chunk_id_from_msg(sisl::blob const& header);

/**
* @brief Releases a chunk based on the information provided in a CREATE_SHARD message.
*
Expand Down
Loading