Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.1.18"
version = "4.1.19"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
11 changes: 5 additions & 6 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority
const auto pg_id = EXvchunk->m_pg_id.value();
m_hs_home_object->gc_manager()->incr_pg_pending_gc_task(pg_id);

if (!m_hs_home_object->can_chunks_in_pg_be_gc(pg_id)) {
if (!m_hs_home_object->is_pg_alive(pg_id)) {
LOGDEBUGMOD(gcmgr, "chunk_id={} belongs to pg {}, which is not eligible for gc at this moment!",
move_from_chunk, pg_id)
m_hs_home_object->gc_manager()->decr_pg_pending_gc_task(pg_id);
Expand Down Expand Up @@ -504,7 +504,7 @@ void GCManager::pdev_gc_actor::handle_recovered_gc_task(
}

// we have no gc_task_guard for recovered gc task, so we need to do this manually to make sure the gc task can be
// marked as completed and the pg can be marked as available for new gc task
// marked as completed
on_gc_task_completed(priority, pg_id, move_from_chunk, move_to_chunk, vchunk_id, true, 0);

GCLOGD(RECOVERD_GC_TASK_ID, pg_id, NO_SHARD_ID,
Expand Down Expand Up @@ -797,9 +797,8 @@ bool GCManager::pdev_gc_actor::copy_valid_data(
move_from_chunk);
}

// check if all the pbas in the valid_blob_indexes are in move_from_chunk, if not, it means the
// shard is being modified during gc, we can not guarantee the data consistency, so we fail this gc
// task and let it be retried later.
// check if all the pbas in the valid_blob_indexes are in move_from_chunk, if not, we cancel this task and retry
// later.
for (const auto& [blob, v] : valid_blob_indexes) {
auto pba = v.pbas();
if (pba.chunk_num() != move_from_chunk) {
Expand Down Expand Up @@ -1100,7 +1099,7 @@ bool GCManager::pdev_gc_actor::purge_reserved_chunk(chunk_id_t chunk, const uint
RELEASE_ASSERT(!vchunk->m_pg_id.has_value(),
"chunk_id={} is expected to be a reserved chunk, and not belong to a pg", chunk);
RELEASE_ASSERT(vchunk->m_state == ChunkState::GC,
"chunk_id={} is a reserved chunk, expected to have a GC state, but actuall state is {} ", chunk,
"chunk_id={} is a reserved chunk, expected to have a GC state, but the actual state is {} ", chunk,
vchunk->m_state);

// Clear all rreqs on the reserved chunk BEFORE reset() resets its allocator.
Expand Down
6 changes: 4 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,9 @@ class HSHomeObject : public HomeObjectImpl {
*
* @param pg_id The ID of the PG to be destroyed.
*/
bool pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine = false);
bool pg_destroy(pg_id_t pg_id);

void destroy_pg_resource(pg_id_t pg_id);

bool pause_pg_state_machine(pg_id_t pg_id);

Expand Down Expand Up @@ -977,7 +979,7 @@ class HSHomeObject : public HomeObjectImpl {
* @param pg_id The ID of the PG whose shards are to be destroyed.
* @return True if the chunks in the PG can be garbage collected, false otherwise.
*/
bool can_chunks_in_pg_be_gc(pg_id_t pg_id) const;
bool is_pg_alive(pg_id_t pg_id) const;

bool pg_exists(pg_id_t pg_id) const;

Expand Down
47 changes: 31 additions & 16 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,18 +688,7 @@ std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(group_id_t group_

void HSHomeObject::_destroy_pg(pg_id_t pg_id) { pg_destroy(pg_id); }

bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine) {
if (need_to_pause_pg_state_machine && !pause_pg_state_machine(pg_id)) {
LOGI("Failed to pause pg state machine, pg_id={}", pg_id);
return false;
}
LOGI("Destroying pg={}", pg_id);
mark_pg_destroyed(pg_id);

// we have the assumption that after pg is marked as destroyed, it will not be marked as alive again.
// TODO:: if this assumption is broken, we need to handle it.
gc_mgr_->drain_pg_pending_gc_task(pg_id);

void HSHomeObject::destroy_pg_resource(pg_id_t pg_id) {
destroy_shards(pg_id);
destroy_hs_resources(pg_id);
destroy_pg_index_table(pg_id);
Expand All @@ -709,8 +698,35 @@ bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg={} chunks to dev_heap", pg_id);
LOGI("resource of pg={} is destroyed", pg_id);
}

LOGI("pg={} is destroyed", pg_id);
bool HSHomeObject::pg_destroy(pg_id_t pg_id) {
auto hs_pg = const_cast< HS_PG* >(get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "pg={} is null", pg_id);
auto repl_dev = hs_pg ? hs_pg->repl_dev_ : nullptr;
RELEASE_ASSERT(repl_dev, "repl_dev for pg={} is null", pg_id);

// when reaching here, we will not receive any new log, both for BR or leave_group case. we wait for all logs to be
// committed and trigger a cp, so that when restart, no log will be replayed since cp_lsn is equal to
// last_append_lsn.
while (true) {
if (repl_dev->get_last_commit_lsn() == repl_dev->get_last_append_lsn()) { break; }
LOGI("Waiting for pg={} to be idle before destroying, last_append_lsn={}, last_commit_lsn={}", pg_id,
repl_dev->get_last_append_lsn(), repl_dev->get_last_commit_lsn());
std::this_thread::sleep_for(std::chrono::seconds(1));
}

RELEASE_ASSERT(homestore::hs()->cp_mgr().trigger_cp_flush(true).get(),
"Failed to trigger checkpoint flush before destroying pg={}", pg_id);

LOGI("Destroying pg={}", pg_id);
mark_pg_destroyed(pg_id);

// we have the assumption that after pg is marked as destroyed, it will not be marked as alive again.
// TODO:: if this assumption is broken, we need to handle it.
gc_mgr_->drain_pg_pending_gc_task(pg_id);
destroy_pg_resource(pg_id);
return true;
}

Expand Down Expand Up @@ -800,7 +816,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
LOGD("pg={} is marked as destroyed", pg_id);
}

bool HSHomeObject::can_chunks_in_pg_be_gc(pg_id_t pg_id) const {
bool HSHomeObject::is_pg_alive(pg_id_t pg_id) const {
auto lg = std::scoped_lock(_pg_lock);
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
Expand Down Expand Up @@ -949,8 +965,7 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
} else {
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered before PG");
hs_pg->index_table_ = nullptr;
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered for alive PG");
LOGI("Index table not found for destroyed pg={}, index_table_uuid={}", pg_id, uuid_str);
}

Expand Down
16 changes: 14 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
set_snapshot_context(context); // Update the snapshot context in case apply_snapshot is not called
auto hs_pg = home_object_->get_hs_pg(m_snp_rcv_handler->get_context_pg_id());
hs_pg->pg_state_.clear_state(PGStateMask::BASELINE_RESYNC);
// we only reset this if destroying pg happens in BR case. for other cases (on_destroy and _exit_pg),
// since this replica will leave the PG and no later logs will be received, no need to reset this.
reset_no_space_left_error_info();
repl_dev()->reset_latch_lsn();
return;
}

Expand Down Expand Up @@ -499,7 +503,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
if (home_object_->pg_exists(pg_data->pg_id())) {
LOGI("pg already exists, clean pg resources before snapshot, pg={} {}", pg_data->pg_id(), log_suffix);
// Need to pause state machine before destroying the PG, if fail, let raft retry.
if (!home_object_->pg_destroy(pg_data->pg_id(), true /* pause state machine */)) {
if (!home_object_->pg_destroy(pg_data->pg_id())) {
LOGE("failed to destroy existing pg, let raft retry, pg={} {}", pg_data->pg_id(), log_suffix);
return;
}
Expand Down Expand Up @@ -1030,7 +1034,15 @@ void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& gr
const auto pg_id = pg_id_opt.value();
RELEASE_ASSERT(home_object_->pg_exists(pg_id), "pg={} should exist, but not! fatal error!", pg_id);

const auto& shards_in_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->_get_hs_pg_unlocked(pg_id)))->shards_;
const auto hs_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->get_hs_pg(pg_id)));
RELEASE_ASSERT(hs_pg, "Failed to get pg={} when log replay done", pg_id);
if (hs_pg->pg_sb_->state == PGState::DESTROYED) {
// pg resources were not cleaned up on the previous restart, clean them up now.
home_object_->destroy_pg_resource(pg_id);
return;
}

const auto& shards_in_pg = hs_pg->shards_;
auto chunk_selector = home_object_->chunk_selector();

for (const auto& shard_iter : shards_in_pg) {
Expand Down
Loading