Skip to content

Commit f799476

Browse files
committed
refactor Engine.hpp by adding ActorRegistry class
1 parent fa52a4b commit f799476

6 files changed

Lines changed: 98 additions & 57 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ set(SOURCE_FILES
152152
)
153153

154154
set(HEADER_FILES
155+
include/dtlmod/ActorRegistry.hpp
155156
include/dtlmod/DTL.hpp
156157
include/dtlmod/DTLException.hpp
157158
include/dtlmod/Engine.hpp

include/dtlmod/ActorRegistry.hpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/* Copyright (c) 2026. The SWAT Team. All rights reserved. */
2+
3+
/* This program is free software; you can redistribute it and/or modify it
4+
* under the terms of the license (GNU LGPL) which comes with this package. */
5+
6+
#ifndef __DTLMOD_ACTOR_REGISTRY_HPP__
7+
#define __DTLMOD_ACTOR_REGISTRY_HPP__
8+
9+
#include <simgrid/s4u/Actor.hpp>
10+
#include <simgrid/s4u/Barrier.hpp>
11+
#include <simgrid/s4u/Mutex.hpp>
12+
13+
#include <set>
14+
15+
namespace sg4 = simgrid::s4u;
16+
17+
namespace dtlmod {
18+
19+
/// \cond EXCLUDE_FROM_DOCUMENTATION
20+
/// @brief A class that manages a registry of actors (publishers or subscribers) for an Engine.
21+
/// This class encapsulates actor management logic including addition, removal, lookup,
22+
/// and synchronization via barriers.
23+
24+
class ActorRegistry {
25+
friend class Engine;
26+
sg4::MutexPtr mutex_ = sg4::Mutex::create();
27+
std::set<sg4::ActorPtr> actors_;
28+
sg4::BarrierPtr barrier_ = nullptr;
29+
30+
public:
31+
ActorRegistry() = default;
32+
33+
void add(sg4::ActorPtr actor) { actors_.insert(actor); }
34+
35+
void remove(sg4::ActorPtr actor) noexcept { actors_.erase(actor); }
36+
37+
[[nodiscard]] bool contains(sg4::ActorPtr actor) const noexcept { return actors_.find(actor) != actors_.end(); }
38+
39+
[[nodiscard]] size_t count() const noexcept { return actors_.size(); }
40+
[[nodiscard]] const std::set<sg4::ActorPtr>& get_all() const noexcept { return actors_; }
41+
[[nodiscard]] bool is_empty() const noexcept { return actors_.empty(); }
42+
[[nodiscard]] sg4::BarrierPtr get_or_create_barrier()
43+
{
44+
if (!barrier_)
45+
barrier_ = sg4::Barrier::create(actors_.size());
46+
return barrier_;
47+
}
48+
[[nodiscard]] bool is_last_at_barrier() { return barrier_ && barrier_->wait(); }
49+
[[nodiscard]] sg4::MutexPtr get_mutex() noexcept { return mutex_; };
50+
};
51+
52+
} // namespace dtlmod
53+
#endif

include/dtlmod/Engine.hpp

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <string>
2020

21+
#include "dtlmod/ActorRegistry.hpp"
2122
#include "dtlmod/Transport.hpp"
2223
#include "dtlmod/Variable.hpp"
2324

@@ -45,66 +46,52 @@ class Engine {
4546

4647
friend class Stream;
4748

48-
protected:
4949
std::string name_;
5050
Type type_ = Type::Undefined;
5151
std::shared_ptr<Transport> transport_ = nullptr;
5252
std::weak_ptr<Stream> stream_;
5353

54-
sg4::MutexPtr pub_mutex_ = sg4::Mutex::create();
55-
std::set<sg4::ActorPtr> publishers_;
54+
ActorRegistry publishers_;
55+
ActorRegistry subscribers_;
56+
std::string metadata_file_;
57+
58+
protected:
5659
sg4::ActivitySet pub_transaction_;
5760
sg4::ConditionVariablePtr pub_transaction_completed_ = sg4::ConditionVariable::create();
58-
sg4::BarrierPtr pub_barrier_ = nullptr;
5961
unsigned int current_pub_transaction_id_ = 0;
6062
unsigned int completed_pub_transaction_id_ = 0;
6163
bool pub_transaction_in_progress_ = false;
62-
virtual void begin_pub_transaction() = 0;
63-
virtual void end_pub_transaction() = 0;
64-
virtual void pub_close() = 0;
6564

66-
sg4::MutexPtr sub_mutex_ = sg4::Mutex::create();
67-
std::set<sg4::ActorPtr> subscribers_;
6865
sg4::ActivitySet sub_transaction_;
69-
70-
sg4::BarrierPtr sub_barrier_ = nullptr;
7166
unsigned int current_sub_transaction_id_ = 0;
7267
bool sub_transaction_in_progress_ = false;
73-
virtual void begin_sub_transaction() = 0;
74-
virtual void end_sub_transaction() = 0;
75-
virtual void sub_close() = 0;
76-
77-
std::string metadata_file_;
78-
void export_metadata_to_file() const;
7968

80-
/// \cond EXCLUDE_FROM_DOCUMENTATION
8169
void close_stream() const;
8270

8371
virtual void create_transport(const Transport::Method& transport_method) = 0;
84-
8572
void set_transport(std::shared_ptr<Transport> transport) noexcept { transport_ = transport; }
8673
[[nodiscard]] std::shared_ptr<Transport> get_transport() const noexcept { return transport_; }
8774

8875
void add_publisher(sg4::ActorPtr actor);
89-
void rm_publisher(sg4::ActorPtr actor) noexcept { publishers_.erase(actor); }
90-
[[nodiscard]] bool is_publisher(sg4::ActorPtr actor) const noexcept
91-
{
92-
return publishers_.find(actor) != publishers_.end();
93-
}
76+
void rm_publisher(sg4::ActorPtr actor) noexcept { publishers_.remove(actor); }
77+
[[nodiscard]] bool is_publisher(sg4::ActorPtr actor) const noexcept { return publishers_.contains(actor); }
9478
// Synchronize publishers on engine closing
95-
[[nodiscard]] int is_last_publisher() const { return (pub_barrier_ && pub_barrier_->wait()); }
79+
[[nodiscard]] bool is_last_publisher() { return publishers_.is_last_at_barrier(); }
80+
virtual void begin_pub_transaction() = 0;
81+
virtual void end_pub_transaction() = 0;
82+
virtual void pub_close() = 0;
9683

9784
void add_subscriber(sg4::ActorPtr actor);
98-
void rm_subscriber(sg4::ActorPtr actor) noexcept { subscribers_.erase(actor); }
99-
[[nodiscard]] bool is_subscriber(sg4::ActorPtr actor) const noexcept
100-
{
101-
return subscribers_.find(actor) != subscribers_.end();
102-
}
85+
void rm_subscriber(sg4::ActorPtr actor) noexcept { subscribers_.remove(actor); }
86+
[[nodiscard]] bool is_subscriber(sg4::ActorPtr actor) const noexcept { return subscribers_.contains(actor); }
10387
// Synchronize subscribers on engine closing
104-
[[nodiscard]] int is_last_subscriber() const noexcept { return subscribers_.empty(); }
88+
[[nodiscard]] bool is_last_subscriber() const noexcept { return subscribers_.is_empty(); }
89+
virtual void begin_sub_transaction() = 0;
90+
virtual void end_sub_transaction() = 0;
91+
virtual void sub_close() = 0;
10592

10693
void set_metadata_file_name();
107-
/// \endcond
94+
void export_metadata_to_file() const;
10895

10996
public:
11097
/// \cond EXCLUDE_FROM_DOCUMENTATION
@@ -118,9 +105,9 @@ class Engine {
118105
[[nodiscard]] sg4::ActivitySet& get_pub_transaction() noexcept { return pub_transaction_; }
119106
[[nodiscard]] const sg4::ActivitySet& get_sub_transaction() const noexcept { return sub_transaction_; }
120107
[[nodiscard]] sg4::ActivitySet& get_sub_transaction() noexcept { return sub_transaction_; }
121-
[[nodiscard]] const std::set<sg4::ActorPtr>& get_publishers() const noexcept { return publishers_; }
122-
[[nodiscard]] size_t get_num_publishers() const noexcept { return publishers_.size(); }
123-
[[nodiscard]] size_t get_num_subscribers() const noexcept { return subscribers_.size(); }
108+
[[nodiscard]] const std::set<sg4::ActorPtr>& get_publishers() const noexcept { return publishers_.get_all(); }
109+
[[nodiscard]] size_t get_num_publishers() const noexcept { return publishers_.count(); }
110+
[[nodiscard]] size_t get_num_subscribers() const noexcept { return subscribers_.count(); }
124111
/// \endcond
125112

126113
/// @brief Helper function to print out the name of the Engine.

src/Engine.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ void Engine::close()
7575
/// \cond EXCLUDE_FROM_DOCUMENTATION
7676
void Engine::add_publisher(sg4::ActorPtr actor)
7777
{
78-
transport_->add_publisher(publishers_.size());
79-
publishers_.insert(actor);
78+
transport_->add_publisher(publishers_.count());
79+
publishers_.add(actor);
8080
}
8181

8282
void Engine::add_subscriber(sg4::ActorPtr actor)
8383
{
84-
transport_->add_subscriber(subscribers_.size());
85-
subscribers_.insert(actor);
84+
transport_->add_subscriber(subscribers_.count());
85+
subscribers_.add(actor);
8686
}
8787

8888
void Engine::export_metadata_to_file() const

src/FileEngine.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void FileEngine::begin_pub_transaction()
9292
XBT_DEBUG("Wait for the completion of %u publish activities from the previous transaction",
9393
file_pub_transaction_[self].size());
9494
while (file_pub_transaction_[self].size() > 0)
95-
pub_activities_completed_->wait(std::unique_lock(*pub_mutex_));
95+
pub_activities_completed_->wait(std::unique_lock(*publishers_.get_mutex()));
9696
XBT_DEBUG("All on-flight publish activities are completed. Proceed with the current transaction.");
9797
transport->clear_to_write_in_transaction(self);
9898
}
@@ -104,9 +104,9 @@ void FileEngine::end_pub_transaction()
104104
auto transport = std::static_pointer_cast<FileTransport>(transport_);
105105

106106
// This is the end of the first transaction, create a barrier
107-
if (!pub_barrier_) {
108-
XBT_DEBUG("Create a barrier for %zu publishers", publishers_.size());
109-
pub_barrier_ = sg4::Barrier::create(publishers_.size());
107+
auto pub_barrier = publishers_.get_or_create_barrier();
108+
if (pub_barrier) {
109+
XBT_DEBUG("Barrier created for %zu publishers", publishers_.count());
110110
}
111111

112112
// Publisher gets the list of files and size to write that has been build during the put() operations
@@ -143,7 +143,7 @@ void FileEngine::pub_close()
143143
XBT_DEBUG("[%s] Wait for the completion of %u publish activities from the previous transaction", get_cname(),
144144
file_pub_transaction_[self].size());
145145
while (file_pub_transaction_[self].size() > 0)
146-
pub_activities_completed_->wait(std::unique_lock(*pub_mutex_));
146+
pub_activities_completed_->wait(std::unique_lock(*publishers_.get_mutex()));
147147
transport->clear_to_write_in_transaction(self);
148148

149149
rm_publisher(self);
@@ -172,7 +172,7 @@ void FileEngine::begin_sub_transaction()
172172

173173
// We have publishers on that stream, wait for them to complete a transaction first
174174
if (get_num_publishers() > 0) {
175-
std::unique_lock lock(*sub_mutex_);
175+
std::unique_lock lock(*subscribers_.get_mutex());
176176
while (completed_pub_transaction_id_ < current_sub_transaction_id_) {
177177
XBT_DEBUG("Wait for publishers to end the transaction I need");
178178
pub_transaction_completed_->wait(lock);
@@ -190,7 +190,7 @@ void FileEngine::end_sub_transaction()
190190
// activities
191191
if (current_sub_transaction_id_ == current_pub_transaction_id_ && get_num_publishers() > 0) {
192192
XBT_DEBUG("Wait for the completion of publish activities from the current transaction");
193-
pub_activities_completed_->wait(std::unique_lock(*sub_mutex_));
193+
pub_activities_completed_->wait(std::unique_lock(*subscribers_.get_mutex()));
194194
XBT_DEBUG("All on-flight publish activities are completed. Proceed with the subscribe activities.");
195195
}
196196

src/StagingEngine.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void StagingEngine::begin_pub_transaction()
4747
}
4848

4949
// Only one publisher has to do this
50-
std::unique_lock lock(*pub_mutex_);
50+
std::unique_lock lock(*publishers_.get_mutex());
5151
if (current_pub_transaction_id_ > 1) { // This is not the first transaction.
5252
// Wait for the completion of the Publish activities from the previous transaction
5353
XBT_DEBUG("[T %d] (%d) Wait for the completion of %u publish activities from the previous transaction",
@@ -69,9 +69,9 @@ void StagingEngine::begin_pub_transaction()
6969
void StagingEngine::end_pub_transaction()
7070
{
7171
// This is the end of the first transaction, create a barrier
72-
if (!pub_barrier_) {
73-
XBT_DEBUG("Create a barrier for %zu publishers", publishers_.size());
74-
pub_barrier_ = sg4::Barrier::create(publishers_.size());
72+
auto pub_barrier = publishers_.get_or_create_barrier();
73+
if (pub_barrier) {
74+
XBT_DEBUG("Barrier created for %zu publishers", publishers_.count());
7575
}
7676

7777
// A new pub transaction has been completed, notify subscribers that they can starting getting variables
@@ -117,7 +117,7 @@ void StagingEngine::begin_sub_transaction()
117117
{
118118
if (current_sub_transaction_id_ == 0) { // This is the first transaction
119119
// Wait for at least one publisher to start a tran
120-
std::unique_lock lock(*sub_mutex_);
120+
std::unique_lock lock(*subscribers_.get_mutex());
121121
while (current_pub_transaction_id_ == 0)
122122
first_pub_transaction_started_->wait(lock);
123123
XBT_DEBUG("Publishers have started a transaction, create rendez-vous points");
@@ -141,28 +141,28 @@ void StagingEngine::begin_sub_transaction()
141141
sub_transaction_started_->notify_all();
142142
}
143143

144-
std::unique_lock lock(*sub_mutex_);
144+
std::unique_lock lock(*subscribers_.get_mutex());
145145
while (completed_pub_transaction_id_ < current_sub_transaction_id_)
146146
pub_transaction_completed_->wait(lock);
147147
}
148148

149149
void StagingEngine::end_sub_transaction()
150150
{
151151
// This is the end of the first transaction, create a barrier
152-
if (!sub_barrier_) {
153-
XBT_DEBUG("Create a barrier for %zu subscribers", subscribers_.size());
154-
sub_barrier_ = sg4::Barrier::create(subscribers_.size());
152+
auto sub_barrier = subscribers_.get_or_create_barrier();
153+
if (sub_barrier) {
154+
XBT_DEBUG("Barrier created for %zu subscribers", subscribers_.count());
155155
}
156156

157-
if (sub_barrier_->wait()) {
157+
if (subscribers_.is_last_at_barrier()) {
158158
XBT_DEBUG("Wait for the %d subscribe activities for the transaction", sub_transaction_.size());
159159
sub_transaction_.wait_all();
160160
XBT_DEBUG("All on-flight subscribe activities are completed. Proceed with the current transaction.");
161161
sub_transaction_.clear();
162162
}
163163

164164
// Prevent subscribers to start a new transaction before this one is really over
165-
if (sub_barrier_->wait())
165+
if (subscribers_.is_last_at_barrier())
166166
// Mark this transaction as over
167167
sub_transaction_in_progress_ = false;
168168
// Decrease counter for next iteration

0 commit comments

Comments
 (0)