Skip to content

Commit ae7c27d

Browse files
committed
ping() upon fill()ing
1 parent 2dd3a9c commit ae7c27d

1 file changed

Lines changed: 20 additions & 17 deletions

File tree

src/net/plex/plex_connector.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl PlexConnector {
8383
.map(|remote| {
8484
let connector = self.connector.clone();
8585
let connections_per_remote = self.settings.connections_per_remote;
86+
let multiplex_settings = self.settings.multiplex_settings.clone();
8687

8788
fuse.spawn(async move {
8889
let fuse = Fuse::new();
@@ -91,48 +92,50 @@ impl PlexConnector {
9192

9293
for _ in 0..connections_per_remote {
9394
let connector = connector.clone();
95+
let multiplex_settings = multiplex_settings.clone();
9496

95-
let connect_handle =
96-
fuse.spawn(async move { connector.connect(remote).await });
97+
let connect_handle = fuse.spawn(async move {
98+
connector.connect(remote).await.map(|connection| {
99+
let multiplex =
100+
Multiplex::new(Role::Connector, connection, multiplex_settings);
101+
102+
let (multiplex, _) = multiplex.split();
103+
multiplex.ping();
104+
105+
multiplex
106+
})
107+
});
97108

98109
connect_handles.push(connect_handle);
99110
time::sleep(interval).await;
100111
}
101112

102-
let mut connections = Vec::new();
113+
let mut new_multiplexes = Vec::new();
103114

104115
for connect_handle in connect_handles {
105-
if let Ok(connection) = connect_handle.await.unwrap().unwrap() {
106-
connections.push(connection)
116+
if let Ok(multiplex) = connect_handle.await.unwrap().unwrap() {
117+
new_multiplexes.push(multiplex)
107118
}
108119
}
109120

110-
(remote, connections)
121+
(remote, new_multiplexes)
111122
})
112123
})
113124
.collect::<Vec<_>>();
114125

115126
for remote_handle in remote_handles {
116-
let (remote, connections) = remote_handle.await.unwrap().unwrap();
127+
let (remote, new_multiplexes) = remote_handle.await.unwrap().unwrap();
117128

118129
let multiplexes = self.pool.lock().get_multiplexes(remote);
119130
let mut multiplexes = multiplexes.lock().await;
120131

121132
let missing = self.settings.connections_per_remote - multiplexes.len();
122133

123134
multiplexes.extend(
124-
connections
135+
new_multiplexes
125136
.into_iter()
126-
.map(|connection| {
127-
let multiplex = Multiplex::new(
128-
Role::Connector,
129-
connection,
130-
self.settings.multiplex_settings.clone(),
131-
);
132-
133-
let (multiplex, _) = multiplex.split();
137+
.map(|multiplex| {
134138
let id = self.cursor.fetch_add(1, Ordering::Relaxed);
135-
136139
(id, multiplex)
137140
})
138141
.take(missing),

0 commit comments

Comments
 (0)