@@ -453,6 +453,139 @@ TEST_F(DTLReductionTest, CompressionWithDerivedRatio)
453453 });
454454}
455455
456+ TEST_F (DTLReductionTest, DecimationStagingEngine)
457+ {
458+ DO_TEST_WITH_FORK ([this ]() {
459+ // Build a two-host platform with a network link (required for staging transport)
460+ auto * zone = sg4::Engine::get_instance ()->get_netzone_root ()->add_netzone_star (" zone" );
461+ auto * pub_host = zone->add_host (" pub_host" , " 6Gf" );
462+ auto * sub_host = zone->add_host (" sub_host" , " 6Gf" );
463+ auto * backbone = zone->add_link (" backbone" , " 10Gbps" )->set_latency (" 10us" );
464+ auto * link_pub = zone->add_link (" link_pub" , " 10Gbps" )->set_latency (" 10us" );
465+ auto * link_sub = zone->add_link (" link_sub" , " 10Gbps" )->set_latency (" 10us" );
466+ zone->add_route (pub_host, nullptr , std::vector<const sg4::Link*>{link_pub, backbone});
467+ zone->add_route (sub_host, nullptr , std::vector<const sg4::Link*>{link_sub, backbone});
468+ zone->seal ();
469+ dtlmod::DTL::create ();
470+
471+ pub_host->add_actor (" Publisher" , []() {
472+ auto dtl = dtlmod::DTL::connect ();
473+ auto stream = dtl->add_stream (" my-output" );
474+ stream->set_engine_type (dtlmod::Engine::Type::Staging);
475+ stream->set_transport_method (dtlmod::Transport::Method::MQ);
476+ XBT_INFO (" Create a 2D variable with 10kx10k doubles" );
477+ auto var = stream->define_variable (" var" , {10000 , 10000 }, {0 , 0 }, {10000 , 10000 }, sizeof (double ));
478+ auto decimator = stream->define_reduction_method (" decimation" );
479+ auto engine = stream->open (" my-output" , dtlmod::Stream::Mode::Publish);
480+ sg4::this_actor::sleep_for (0.5 );
481+
482+ XBT_INFO (" Assign decimation with stride 2,2" );
483+ ASSERT_NO_THROW (var->set_reduction_operation (decimator, {{" stride" , " 2,2" }}));
484+ ASSERT_TRUE (var->is_reduced ());
485+ ASSERT_TRUE (var->is_reduced_by_publisher ());
486+
487+ XBT_INFO (" Verify reduced shape: 5000x5000" );
488+ auto shape = decimator->get_reduced_variable_shape (*var);
489+ ASSERT_EQ (shape[0 ], 5000u );
490+ ASSERT_EQ (shape[1 ], 5000u );
491+
492+ engine->begin_transaction ();
493+ ASSERT_NO_THROW (engine->put (var));
494+ engine->end_transaction ();
495+ sg4::this_actor::sleep_for (1 );
496+ engine->close ();
497+ dtlmod::DTL::disconnect ();
498+ });
499+
500+ sub_host->add_actor (" Subscriber" , []() {
501+ auto dtl = dtlmod::DTL::connect ();
502+ auto stream = dtl->add_stream (" my-output" );
503+ auto engine = stream->open (" my-output" , dtlmod::Stream::Mode::Subscribe);
504+ auto var = stream->inquire_variable (" var" );
505+
506+ XBT_INFO (" Get the decimated variable" );
507+ engine->begin_transaction ();
508+ ASSERT_NO_THROW (engine->get (var));
509+ engine->end_transaction ();
510+
511+ engine->close ();
512+ dtlmod::DTL::disconnect ();
513+ });
514+
515+ ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
516+ });
517+ }
518+
519+ TEST_F (DTLReductionTest, CompressionStagingEngine)
520+ {
521+ DO_TEST_WITH_FORK ([this ]() {
522+ // Build a two-host platform with a network link (required for staging transport)
523+ auto * zone = sg4::Engine::get_instance ()->get_netzone_root ()->add_netzone_star (" zone" );
524+ auto * pub_host = zone->add_host (" pub_host" , " 6Gf" );
525+ auto * sub_host = zone->add_host (" sub_host" , " 6Gf" );
526+ auto * backbone = zone->add_link (" backbone" , " 10Gbps" )->set_latency (" 10us" );
527+ auto * link_pub = zone->add_link (" link_pub" , " 10Gbps" )->set_latency (" 10us" );
528+ auto * link_sub = zone->add_link (" link_sub" , " 10Gbps" )->set_latency (" 10us" );
529+ zone->add_route (pub_host, nullptr , std::vector<const sg4::Link*>{link_pub, backbone});
530+ zone->add_route (sub_host, nullptr , std::vector<const sg4::Link*>{link_sub, backbone});
531+ zone->seal ();
532+ dtlmod::DTL::create ();
533+
534+ pub_host->add_actor (" Publisher" , []() {
535+ auto dtl = dtlmod::DTL::connect ();
536+ auto stream = dtl->add_stream (" my-output" );
537+ stream->set_engine_type (dtlmod::Engine::Type::Staging);
538+ stream->set_transport_method (dtlmod::Transport::Method::MQ);
539+ XBT_INFO (" Create a 2D variable with 10kx10k doubles" );
540+ auto var = stream->define_variable (" var" , {10000 , 10000 }, {0 , 0 }, {10000 , 10000 }, sizeof (double ));
541+ auto compressor = stream->define_reduction_method (" compression" );
542+ auto engine = stream->open (" my-output" , dtlmod::Stream::Mode::Publish);
543+ sg4::this_actor::sleep_for (0.5 );
544+
545+ XBT_INFO (" Assign compression with ratio 5 and explicit costs" );
546+ ASSERT_NO_THROW (var->set_reduction_operation (compressor, {{" compression_ratio" , " 5" },
547+ {" compression_cost_per_element" , " 3" },
548+ {" decompression_cost_per_element" , " 1" }}));
549+ ASSERT_TRUE (var->is_reduced ());
550+ ASSERT_TRUE (var->is_reduced_by_publisher ());
551+
552+ XBT_INFO (" Verify compressed sizes" );
553+ size_t expected_reduced = static_cast <size_t >(std::ceil (sizeof (double ) * 10000.0 * 10000.0 / 5.0 ));
554+ ASSERT_EQ (compressor->get_reduced_variable_global_size (*var), expected_reduced);
555+ ASSERT_EQ (compressor->get_reduced_variable_local_size (*var), expected_reduced);
556+
557+ XBT_INFO (" Verify shape is unchanged (compression preserves shape)" );
558+ auto shape = compressor->get_reduced_variable_shape (*var);
559+ ASSERT_EQ (shape[0 ], 10000u );
560+ ASSERT_EQ (shape[1 ], 10000u );
561+
562+ engine->begin_transaction ();
563+ ASSERT_NO_THROW (engine->put (var));
564+ engine->end_transaction ();
565+ sg4::this_actor::sleep_for (1 );
566+ engine->close ();
567+ dtlmod::DTL::disconnect ();
568+ });
569+
570+ sub_host->add_actor (" Subscriber" , []() {
571+ auto dtl = dtlmod::DTL::connect ();
572+ auto stream = dtl->add_stream (" my-output" );
573+ auto engine = stream->open (" my-output" , dtlmod::Stream::Mode::Subscribe);
574+ auto var = stream->inquire_variable (" var" );
575+
576+ XBT_INFO (" Get the compressed variable (decompression cost should be applied on subscriber)" );
577+ engine->begin_transaction ();
578+ ASSERT_NO_THROW (engine->get (var));
579+ engine->end_transaction ();
580+
581+ engine->close ();
582+ dtlmod::DTL::disconnect ();
583+ });
584+
585+ ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
586+ });
587+ }
588+
456589TEST_F (DTLReductionTest, DoubleReductionForbidden)
457590{
458591 DO_TEST_WITH_FORK ([this ]() {
0 commit comments