Skip to content

Commit eb6685d

Browse files
committed
feedback
1 parent 74ddb58 commit eb6685d

1 file changed

Lines changed: 85 additions & 57 deletions

File tree

  • openvmm/openvmm_entry/src/ttrpc

openvmm/openvmm_entry/src/ttrpc/mod.rs

Lines changed: 85 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -188,56 +188,79 @@ impl VmService {
188188
}
189189
};
190190

191-
futures::select! { // merge semantics
192-
message = vm_service_recv.next() => {
193-
self.vm_controller_events = ctrl_events;
194-
match message {
195-
Some((ctx, message)) => {
196-
match self.handle(ctx, message).await {
197-
HandleAction::None => (),
198-
HandleAction::Quit => break true,
199-
}
200-
}
201-
None => {
202-
tracing::debug!("no more ttrpc requests");
203-
break false;
204-
}
205-
}
206-
},
207-
message = inspect_service_recv.next() => {
208-
self.vm_controller_events = ctrl_events;
209-
match message {
210-
Some((ctx, message)) => {
211-
self.handle_inspect(ctx, message).await;
212-
}
213-
None => {
214-
tracing::debug!("no more ttrpc requests");
215-
break false;
216-
}
217-
}
218-
},
219-
request = recv.recv().fuse() => {
220-
self.vm_controller_events = ctrl_events;
221-
match request {
222-
Ok(WorkerRpc::Restart(rpc)) => rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported")))),
223-
Ok(WorkerRpc::Inspect(_)) => (),
224-
Ok(WorkerRpc::Stop) => {
225-
tracing::info!("ttrpc worker stopping");
226-
break false;
227-
}
228-
Err(err) => {
229-
tracing::info!(error = &err as &dyn std::error::Error, "ttrpc worker tearing down");
230-
break false;
231-
}
191+
// Clone the WaitVm cancel context so we can poll it without
192+
// borrowing self.
193+
let mut wait_cancel_ctx = self.wait_vm_response.as_mut().map(|(ctx, _)| ctx.clone());
194+
let wait_cancel_fut = async {
195+
match &mut wait_cancel_ctx {
196+
Some(ctx) => Some(ctx.cancelled().await),
197+
None => std::future::pending().await,
198+
}
199+
};
200+
201+
enum Action {
202+
VmService(Box<Option<(mesh::CancelContext, vmservice::Vm)>>),
203+
InspectService(Option<(mesh::CancelContext, InspectService)>),
204+
WorkerRpc(Result<WorkerRpc<()>, mesh::RecvError>),
205+
ControllerEvent(Option<VmControllerEvent>),
206+
WaitVmCancelled,
207+
}
208+
209+
let action = futures::select! { // merge semantics
210+
m = vm_service_recv.next() => Action::VmService(Box::new(m)),
211+
m = inspect_service_recv.next() => Action::InspectService(m),
212+
r = recv.recv().fuse() => Action::WorkerRpc(r),
213+
e = ctrl_fut.fuse() => Action::ControllerEvent(e),
214+
_reason = wait_cancel_fut.fuse() => Action::WaitVmCancelled,
215+
};
216+
217+
// Restore controller events (unless the channel closed).
218+
if let Action::ControllerEvent(None) = &action {
219+
tracing::debug!("controller event channel closed");
220+
} else {
221+
self.vm_controller_events = ctrl_events;
222+
}
223+
224+
match action {
225+
Action::VmService(message) => match *message {
226+
Some((ctx, message)) => match self.handle(ctx, message).await {
227+
HandleAction::None => (),
228+
HandleAction::Quit => break true,
229+
},
230+
None => {
231+
tracing::debug!("no more ttrpc requests");
232+
break false;
232233
}
233234
},
234-
event = ctrl_fut.fuse() => {
235-
self.vm_controller_events = ctrl_events;
236-
if let Some(event) = event {
237-
self.handle_controller_event(event);
238-
} else {
239-
tracing::debug!("controller event channel closed");
240-
}
235+
Action::InspectService(Some((ctx, message))) => {
236+
self.handle_inspect(ctx, message).await;
237+
}
238+
Action::InspectService(None) => {
239+
tracing::debug!("no more ttrpc requests");
240+
break false;
241+
}
242+
Action::WorkerRpc(Ok(WorkerRpc::Restart(rpc))) => {
243+
rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
244+
}
245+
Action::WorkerRpc(Ok(WorkerRpc::Inspect(_))) => (),
246+
Action::WorkerRpc(Ok(WorkerRpc::Stop)) => {
247+
tracing::info!("ttrpc worker stopping");
248+
break false;
249+
}
250+
Action::WorkerRpc(Err(err)) => {
251+
tracing::info!(
252+
error = &err as &dyn std::error::Error,
253+
"ttrpc worker tearing down"
254+
);
255+
break false;
256+
}
257+
Action::ControllerEvent(Some(event)) => {
258+
self.handle_controller_event(event);
259+
}
260+
Action::ControllerEvent(None) => {} // handled above
261+
Action::WaitVmCancelled => {
262+
tracing::debug!("WaitVm client cancelled");
263+
self.wait_vm_response.take();
241264
}
242265
}
243266
};
@@ -253,7 +276,7 @@ impl VmService {
253276
}
254277

255278
// Complete any pending WaitVm with an error.
256-
if let Some(response) = self.wait_vm_response.take() {
279+
if let Some((_, response)) = self.wait_vm_response.take() {
257280
response.send(Err(grpc_error(anyhow!("server shutting down"))));
258281
}
259282

@@ -297,7 +320,7 @@ struct VmService {
297320
vm_controller: Option<mesh::Sender<VmControllerRpc>>,
298321
vm_controller_events: Option<mesh::Receiver<VmControllerEvent>>,
299322
controller_task: Option<Task<()>>,
300-
wait_vm_response: Option<mesh::OneshotSender<Result<(), Status>>>,
323+
wait_vm_response: Option<(mesh::CancelContext, mesh::OneshotSender<Result<(), Status>>)>,
301324
rpc_tasks: Vec<Task<()>>,
302325
transport: ResolvedTransport,
303326
}
@@ -331,7 +354,7 @@ enum HandleAction {
331354
}
332355

333356
impl VmService {
334-
async fn handle(&mut self, _ctx: mesh::CancelContext, request: vmservice::Vm) -> HandleAction {
357+
async fn handle(&mut self, ctx: mesh::CancelContext, request: vmservice::Vm) -> HandleAction {
335358
tracing::debug!(?request, "request");
336359
match request {
337360
vmservice::Vm::CreateVm(request, response) => {
@@ -350,7 +373,7 @@ impl VmService {
350373
}
351374
self.vm.take();
352375
self.vm_controller_events.take();
353-
if let Some(wait_response) = self.wait_vm_response.take() {
376+
if let Some((_, wait_response)) = self.wait_vm_response.take() {
354377
wait_response.send(Err(grpc_error(anyhow!("VM quit"))));
355378
}
356379
response.send(Ok(()));
@@ -377,7 +400,7 @@ impl VmService {
377400
if self.wait_vm_response.is_some() {
378401
response.send(Err(grpc_error(anyhow!("wait VM already in flight"))));
379402
} else {
380-
self.wait_vm_response = Some(response);
403+
self.wait_vm_response = Some((ctx.clone(), response));
381404
}
382405
}
383406
vmservice::Vm::ModifyResource(request, response) => {
@@ -706,7 +729,7 @@ impl VmService {
706729
}
707730
self.vm.take();
708731
self.vm_controller_events.take();
709-
if let Some(response) = self.wait_vm_response.take() {
732+
if let Some((_, response)) = self.wait_vm_response.take() {
710733
response.send(Err(grpc_error(anyhow!("VM torn down"))));
711734
}
712735
Ok(())
@@ -726,7 +749,7 @@ impl VmService {
726749
match event {
727750
VmControllerEvent::GuestHalt(reason) => {
728751
tracing::info!(%reason, "guest halted (via controller)");
729-
if let Some(response) = self.wait_vm_response.take() {
752+
if let Some((_, response)) = self.wait_vm_response.take() {
730753
response.send(Ok(()));
731754
}
732755
}
@@ -736,8 +759,13 @@ impl VmService {
736759
} else {
737760
tracing::info!("VM worker stopped");
738761
}
739-
if let Some(response) = self.wait_vm_response.take() {
740-
response.send(Err(grpc_error(anyhow!("VM worker stopped"))));
762+
if let Some((_, response)) = self.wait_vm_response.take() {
763+
let status = if let Some(err) = &error {
764+
grpc_error(anyhow!("VM worker stopped: {}", err))
765+
} else {
766+
grpc_error(anyhow!("VM worker stopped"))
767+
};
768+
response.send(Err(status));
741769
}
742770
// Clear VM state since the worker is gone. The controller
743771
// task will be awaited during final cleanup.

0 commit comments

Comments
 (0)