@@ -306,3 +306,206 @@ TEST_F(DTLReductionTest, SinglePubSingleSubDecimationOnRead)
306306 ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
307307 });
308308}
309+
310+ TEST_F (DTLReductionTest, BogusCompressionSetting)
311+ {
312+ DO_TEST_WITH_FORK ([this ]() {
313+ this ->setup_platform ();
314+ host_->add_actor (" TestActor" , [this ]() {
315+ std::shared_ptr<dtlmod::ReductionMethod> compressor;
316+ XBT_INFO (" Connect to the DTL" );
317+ auto dtl = dtlmod::DTL::connect ();
318+ XBT_INFO (" Create a stream" );
319+ auto stream = dtl->add_stream (" my-output" );
320+ stream->set_transport_method (dtlmod::Transport::Method::File);
321+ stream->set_engine_type (dtlmod::Engine::Type::File);
322+ XBT_INFO (" Create a 3D variable" );
323+ auto var = stream->define_variable (" var3D" , {640 , 640 , 640 }, {0 , 0 , 0 }, {640 , 640 , 640 }, sizeof (double ));
324+ XBT_INFO (" Define a Compression Reduction Method" );
325+ ASSERT_NO_THROW (compressor = stream->define_reduction_method (" compression" ));
326+ XBT_INFO (" Assign the compression method with a bogus option, should fail" );
327+ ASSERT_THROW (var->set_reduction_operation (compressor, {{" bogus" , " 1" }}),
328+ dtlmod::UnknownCompressionOptionException);
329+ XBT_INFO (" Assign the compression method with 'fixed' profile but no ratio, should fail" );
330+ ASSERT_THROW (var->set_reduction_operation (compressor, {{" compressor" , " fixed" }}),
331+ dtlmod::InconsistentCompressionRatioException);
332+ XBT_INFO (" Assign the compression method with ratio < 1, should fail" );
333+ ASSERT_THROW (var->set_reduction_operation (compressor, {{" compression_ratio" , " 0.5" }}),
334+ dtlmod::InconsistentCompressionRatioException);
335+ XBT_INFO (" Assign the compression method with unknown compressor profile, should fail" );
336+ ASSERT_THROW (var->set_reduction_operation (compressor, {{" compressor" , " bogus" }}),
337+ dtlmod::UnknownCompressionOptionException);
338+
339+ XBT_INFO (" Disconnect the actor from the DTL" );
340+ dtlmod::DTL::disconnect ();
341+ });
342+
343+ // Run the simulation
344+ ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
345+ });
346+ }
347+
348+ TEST_F (DTLReductionTest, SimpleCompressionFileEngine)
349+ {
350+ DO_TEST_WITH_FORK ([this ]() {
351+ this ->setup_platform ();
352+ host_->add_actor (" Publisher" , [this ]() {
353+ XBT_INFO (" Connect to the DTL" );
354+ auto dtl = dtlmod::DTL::connect ();
355+ XBT_INFO (" Create a stream" );
356+ auto stream = dtl->add_stream (" my-output" );
357+ stream->set_transport_method (dtlmod::Transport::Method::File);
358+ stream->set_engine_type (dtlmod::Engine::Type::File);
359+ XBT_INFO (" Create a 2D variable with 1000x1000 doubles" );
360+ auto var = stream->define_variable (" var2D" , {1000 , 1000 }, {0 , 0 }, {1000 , 1000 }, sizeof (double ));
361+ XBT_INFO (" Define a Compression Reduction Method" );
362+ auto compressor = stream->define_reduction_method (" compression" );
363+
364+ XBT_INFO (" Open the stream in Publish mode" );
365+ auto engine = stream->open (" zone:my_fs:/host/scratch/my-working-dir/my-output" , dtlmod::Stream::Mode::Publish);
366+ sg4::this_actor::sleep_for (1 );
367+
368+ XBT_INFO (" Assign compression with fixed ratio of 10" );
369+ ASSERT_NO_THROW (var->set_reduction_operation (compressor, {{" compression_ratio" , " 10" },
370+ {" compression_cost_per_element" , " 5" },
371+ {" decompression_cost_per_element" , " 2" }}));
372+ ASSERT_TRUE (var->is_reduced ());
373+ ASSERT_TRUE (var->is_reduced_by_publisher ());
374+ XBT_INFO (" Verify reduced sizes" );
375+ size_t original_global_size = sizeof (double ) * 1000 * 1000 ;
376+ size_t expected_reduced = static_cast <size_t >(std::ceil (original_global_size / 10.0 ));
377+ ASSERT_EQ (compressor->get_reduced_variable_global_size (*var), expected_reduced);
378+ ASSERT_EQ (compressor->get_reduced_variable_local_size (*var), expected_reduced);
379+ XBT_INFO (" Verify that shape is unchanged" );
380+ auto reduced_shape = compressor->get_reduced_variable_shape (*var);
381+ ASSERT_EQ (reduced_shape.size (), 2u );
382+ ASSERT_EQ (reduced_shape[0 ], 1000u );
383+ ASSERT_EQ (reduced_shape[1 ], 1000u );
384+ XBT_INFO (" Verify compression flop cost" );
385+ double expected_flops = 5.0 * 1000 * 1000 ; // cost_per_element * num_elements
386+ ASSERT_DOUBLE_EQ (compressor->get_flop_amount_to_reduce_variable (*var), expected_flops);
387+ XBT_INFO (" Verify decompression flop cost" );
388+ double expected_decomp_flops = 2.0 * 1000 * 1000 ;
389+ ASSERT_DOUBLE_EQ (compressor->get_flop_amount_to_decompress_variable (*var), expected_decomp_flops);
390+ engine->begin_transaction ();
391+ ASSERT_NO_THROW (engine->put (var));
392+ engine->end_transaction ();
393+ sg4::this_actor::sleep_for (1 );
394+ engine->close ();
395+
396+ XBT_INFO (" Disconnect the actor from the DTL" );
397+ dtlmod::DTL::disconnect ();
398+ });
399+
400+ // Run the simulation
401+ ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
402+ });
403+ }
404+
405+ TEST_F (DTLReductionTest, CompressionWithDerivedRatio)
406+ {
407+ DO_TEST_WITH_FORK ([this ]() {
408+ this ->setup_platform ();
409+ host_->add_actor (" TestActor" , [this ]() {
410+ XBT_INFO (" Connect to the DTL" );
411+ auto dtl = dtlmod::DTL::connect ();
412+ auto stream = dtl->add_stream (" my-output" );
413+ stream->set_transport_method (dtlmod::Transport::Method::File);
414+ stream->set_engine_type (dtlmod::Engine::Type::File);
415+ auto var = stream->define_variable (" var2D" , {1000 , 1000 }, {0 , 0 }, {1000 , 1000 }, sizeof (double ));
416+ size_t orig_size = sizeof (double ) * 1000 * 1000 ;
417+
418+ XBT_INFO (" Test SZ profile: accuracy=1e-3, data_smoothness=0.5" );
419+ auto sz_compressor = stream->define_reduction_method (" compression" );
420+ ASSERT_NO_THROW (var->set_reduction_operation (
421+ sz_compressor, {{" compressor" , " sz" }, {" accuracy" , " 1e-3" }, {" data_smoothness" , " 0.5" }}));
422+ ASSERT_TRUE (var->is_reduced ());
423+ // SZ model: ratio = 3.0 * pow(3, 0.8) * 1.0 ≈ 7.22
424+ size_t sz_reduced = sz_compressor->get_reduced_variable_global_size (*var);
425+ ASSERT_GT (sz_reduced, 0u );
426+ ASSERT_LT (sz_reduced, orig_size);
427+ XBT_INFO (" SZ reduced size: %zu (original: %zu, ratio: %.2f)" , sz_reduced, orig_size,
428+ static_cast <double >(orig_size) / sz_reduced);
429+
430+ XBT_INFO (" Test ZFP profile: accuracy=1e-6" );
431+ auto stream2 = dtl->add_stream (" my-output-2" );
432+ stream2->set_transport_method (dtlmod::Transport::Method::File);
433+ stream2->set_engine_type (dtlmod::Engine::Type::File);
434+ auto var2 = stream2->define_variable (" var2D" , {1000 , 1000 }, {0 , 0 }, {1000 , 1000 }, sizeof (double ));
435+ auto zfp_compressor = stream2->define_reduction_method (" compression" );
436+ ASSERT_NO_THROW (var2->set_reduction_operation (zfp_compressor, {{" compressor" , " zfp" }, {" accuracy" , " 1e-6" }}));
437+ // ZFP model: rate = max(1.0, -log2(1e-6) + 1.0) ≈ 20.93, ratio = 64.0 / 20.93 ≈ 3.06
438+ size_t zfp_reduced = zfp_compressor->get_reduced_variable_global_size (*var2);
439+ ASSERT_GT (zfp_reduced, 0u );
440+ ASSERT_LT (zfp_reduced, orig_size);
441+ XBT_INFO (" ZFP reduced size: %zu (original: %zu, ratio: %.2f)" , zfp_reduced, orig_size,
442+ static_cast <double >(orig_size) / zfp_reduced);
443+
444+ XBT_INFO (" Verify SZ gives higher compression than ZFP at these settings" );
445+ ASSERT_LT (sz_reduced, zfp_reduced);
446+
447+ XBT_INFO (" Disconnect the actor from the DTL" );
448+ dtlmod::DTL::disconnect ();
449+ });
450+
451+ // Run the simulation
452+ ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
453+ });
454+ }
455+
456+ TEST_F (DTLReductionTest, DoubleReductionForbidden)
457+ {
458+ DO_TEST_WITH_FORK ([this ]() {
459+ this ->setup_platform ();
460+ host_->add_actor (" TestActor" , [this ]() {
461+ XBT_INFO (" Connect to the DTL" );
462+ auto dtl = dtlmod::DTL::connect ();
463+ auto stream = dtl->add_stream (" my-output" );
464+ stream->set_transport_method (dtlmod::Transport::Method::File);
465+ stream->set_engine_type (dtlmod::Engine::Type::File);
466+ auto var = stream->define_variable (" var" , {20000 , 20000 }, {0 , 0 }, {20000 , 20000 }, sizeof (double ));
467+ auto compressor = stream->define_reduction_method (" compression" );
468+ auto engine = stream->open (" zone:my_fs:/host/scratch/my-working-dir/my-output" , dtlmod::Stream::Mode::Publish);
469+ sg4::this_actor::sleep_for (1 );
470+ XBT_INFO (" Apply publisher-side compression" );
471+ ASSERT_NO_THROW (var->set_reduction_operation (compressor, {{" compression_ratio" , " 5" }}));
472+ ASSERT_TRUE (var->is_reduced_by_publisher ());
473+
474+ XBT_INFO (" Re-parameterize the same reduction method (allowed — updates parameters)" );
475+ ASSERT_NO_THROW (var->set_reduction_operation (compressor, {{" compression_ratio" , " 10" }}));
476+ ASSERT_TRUE (var->is_reduced_by_publisher ());
477+
478+ engine->begin_transaction ();
479+ ASSERT_NO_THROW (engine->put (var));
480+ engine->end_transaction ();
481+ sg4::this_actor::sleep_for (1 );
482+ engine->close ();
483+ dtlmod::DTL::disconnect ();
484+
485+ XBT_INFO (" Wait and reconnect as subscriber" );
486+ sg4::this_actor::sleep_until (10 );
487+ dtl = dtlmod::DTL::connect ();
488+ engine = stream->open (" zone:my_fs:/host/scratch/my-working-dir/my-output" , dtlmod::Stream::Mode::Subscribe);
489+ auto var_sub = stream->inquire_variable (" var" );
490+
491+ XBT_INFO (" Verify that var_sub carries publisher reduction state" );
492+ ASSERT_TRUE (var_sub->is_reduced ());
493+ ASSERT_TRUE (var_sub->is_reduced_by_publisher ());
494+
495+ XBT_INFO (" Attempt subscriber-side compression, should fail (compression is publisher-side only)" );
496+ auto sub_compressor = stream->define_reduction_method (" compression" );
497+ ASSERT_THROW (var_sub->set_reduction_operation (sub_compressor, {{" compression_ratio" , " 2" }}),
498+ dtlmod::SubscriberSideCompressionException);
499+
500+ XBT_INFO (" Attempt subscriber-side decimation on a publisher-reduced variable, should fail (double reduction)" );
501+ auto decimator = stream->define_reduction_method (" decimation" );
502+ ASSERT_THROW (var_sub->set_reduction_operation (decimator, {{" stride" , " 2,2" }}), dtlmod::DoubleReductionException);
503+
504+ engine->close ();
505+ dtlmod::DTL::disconnect ();
506+ });
507+
508+ // Run the simulation
509+ ASSERT_NO_THROW (sg4::Engine::get_instance ()->run ());
510+ });
511+ }
0 commit comments