diff --git a/src/capture.rs b/src/capture.rs index 3691382..d8aedfa 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -184,26 +184,33 @@ async fn do_capture( }); /* create barriers for active clients */ - for (handle, pos) in clients { - tokio::select! { - r = capture.create(handle, to_capture_pos(pos)) => match r { - Ok(_) => {}, - Err(e) => { - capture.terminate().await?; - return Err(e.into()); - }, - }, - _ = wait_for_termination(request_rx) => { - capture.terminate().await?; - return Ok(()); - }, - } + if let Err(e) = create_clients(&mut capture, clients, request_rx).await { + capture.terminate().await?; + return Err(e.into()); } - let res = do_capture_session(active, &mut capture, conn, event_tx, request_rx, service).await; - // FIXME replace with async drop when stabilized - capture.terminate().await?; - res + if let Err(e) = + do_capture_session(active, &mut capture, conn, event_tx, request_rx, service).await + { + // FIXME replace with async drop when stabilized + capture.terminate().await?; + return Err(e); + } + Ok(()) +} + +async fn create_clients( + capture: &mut InputCapture, + clients: impl Iterator, + request_rx: &mut Receiver, +) -> Result<(), CaptureError> { + for (handle, pos) in clients { + tokio::select! { + r = capture.create(handle, to_capture_pos(pos)) => r?, + _ = wait_for_termination(request_rx) => return Ok(()), + } + } + Ok(()) } async fn do_capture_session(