Skip to content

Commit f902c1a

Browse files
committed
register subscriber variable in CompressionReductionMethod during inquire
1 parent 4a07017 commit f902c1a

4 files changed

Lines changed: 23 additions & 0 deletions

File tree

include/dtlmod/CompressionReductionMethod.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class CompressionReductionMethod : public ReductionMethod {
7979

8080
[[nodiscard]] double get_flop_amount_to_reduce_variable(const Variable& var) const override;
8181
[[nodiscard]] double get_flop_amount_to_decompress_variable(const Variable& var) const override;
82+
83+
/// @brief Copy a publisher variable's parameterization to a subscriber variable.
84+
/// Called by Stream::inquire_variable so that Engine::get() can compute decompression costs.
85+
void propagate_for_subscriber(const Variable& publisher_var, const Variable& subscriber_var);
8286
};
8387
/// \endcond
8488
} // namespace dtlmod

src/CompressionReductionMethod.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,13 @@ void CompressionReductionMethod::parameterize_for_variable(
139139
// Always (re)create the parameterization — avoids field-by-field update complexity.
140140
per_variable_parameterizations_[&var] = std::make_shared<ParameterizedCompression>(var, std::move(cfg));
141141
}
142+
143+
void CompressionReductionMethod::propagate_for_subscriber(const Variable& publisher_var, const Variable& subscriber_var)
144+
{
145+
auto it = per_variable_parameterizations_.find(&publisher_var);
146+
if (it != per_variable_parameterizations_.end())
147+
per_variable_parameterizations_[&subscriber_var] =
148+
std::make_shared<ParameterizedCompression>(subscriber_var, it->second->get_config());
149+
}
150+
142151
} // namespace dtlmod

src/Stream.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,10 @@ std::shared_ptr<Variable> Stream::inquire_variable(std::string_view name) const
410410
if (var->second->is_reduced()) {
411411
new_var->is_reduced_with_ = var->second->get_reduction_method();
412412
new_var->reduction_origin_ = var->second->reduction_origin_;
413+
// Register the subscriber's variable in the compression method's map so that
414+
// Engine::get() can compute decompression costs via get_flop_amount_to_decompress_variable.
415+
if (auto compressor = std::dynamic_pointer_cast<CompressionReductionMethod>(new_var->is_reduced_with_))
416+
compressor->propagate_for_subscriber(*var->second, *new_var);
413417
}
414418

415419
return new_var;

test/dtl_reduction.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,8 +571,14 @@ TEST_F(DTLReductionTest, CompressionStagingEngine)
571571
auto dtl = dtlmod::DTL::connect();
572572
auto stream = dtl->add_stream("my-output");
573573
auto engine = stream->open("my-output", dtlmod::Stream::Mode::Subscribe);
574+
XBT_INFO("Wait for the publisher to have set the compression reduction operation");
575+
sg4::this_actor::sleep_for(1);
574576
auto var = stream->inquire_variable("var");
575577

578+
XBT_INFO("Verify that the subscriber variable carries the publisher compression state");
579+
ASSERT_TRUE(var->is_reduced());
580+
ASSERT_TRUE(var->is_reduced_by_publisher());
581+
576582
XBT_INFO("Get the compressed variable (decompression cost should be applied on subscriber)");
577583
engine->begin_transaction();
578584
ASSERT_NO_THROW(engine->get(var));

0 commit comments

Comments
 (0)