Merge f6df164e5147f6ecce612d9542482eb10f6480de into 4854942dcedd226f6adbe9970afc4661dfdb1e6d

This commit is contained in:
Andrus Salumets 2026-05-02 08:03:33 +02:00 committed by GitHub
commit 898afd14f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 94 additions and 4 deletions

View File

@ -163,7 +163,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
pub fn stop_all(&self) {
let mut state = self.lock_state();
for node in &mut state.nodes {
node.start_kill();
node.stop_blocking();
}
state.nodes.clear();

View File

@ -7,7 +7,7 @@ use std::{
path::{Path, PathBuf},
process::Stdio,
thread,
time::Duration,
time::{Duration, Instant},
};
use fs_extra::dir::{CopyOptions, copy as copy_dir};
@ -18,6 +18,9 @@ use tokio::{
time::timeout,
};
const PROCESS_CLEANUP_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
const PROCESS_CLEANUP_POLL_INTERVAL: Duration = Duration::from_millis(20);
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub enum NodeEndpointPort {
TestingApi,
@ -197,6 +200,28 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
let _ = self.child.start_kill();
}
pub fn stop_blocking(&mut self) {
let _ = self.child.start_kill();
let _ = self.wait_for_exit_blocking(PROCESS_CLEANUP_WAIT_TIMEOUT);
}
fn wait_for_exit_blocking(&mut self, wait_timeout: Duration) -> bool {
let deadline = Instant::now() + wait_timeout;
loop {
match self.child.try_wait() {
Ok(Some(_)) | Err(_) => return true,
Ok(None) => {}
}
if Instant::now() >= deadline {
return false;
}
thread::sleep(PROCESS_CLEANUP_POLL_INTERVAL);
}
}
pub fn keep_tempdir(&mut self) -> io::Result<()> {
let dir = mem::replace(&mut self.tempdir, tempfile::tempdir()?);
let _ = dir.keep();
@ -294,7 +319,8 @@ fn build_process_command(tempdir: &Path, launch: &LaunchSpec) -> Command {
.current_dir(tempdir)
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());
.stderr(Stdio::inherit())
.kill_on_drop(true);
command
}
@ -310,7 +336,7 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
if should_preserve_tempdir(self.keep_tempdir) {
let _ = self.keep_tempdir();
}
self.start_kill();
self.stop_blocking();
}
}
@ -423,6 +449,14 @@ fn create_tempdir(persist_dir: Option<&Path>) -> Result<TempDir, ProcessSpawnErr
#[cfg(test)]
mod tests {
#[cfg(unix)]
use std::{
process::{Command as StdCommand, Stdio},
time::{Duration, Instant},
};
#[cfg(unix)]
use super::{LaunchSpec, ProcessNode};
use super::{NodeEndpointPort, NodeEndpoints};
#[test]
@ -439,4 +473,60 @@ mod tests {
Some(9000)
);
}
#[cfg(unix)]
#[tokio::test]
async fn drop_stops_child_process() {
let node = ProcessNode::<(), ()>::spawn(
"node",
(),
|_, _, _| {
Ok(LaunchSpec {
binary: "/bin/sleep".into(),
args: vec!["60".into()],
..LaunchSpec::default()
})
},
|_| Ok(NodeEndpoints::default()),
false,
None,
None,
|_| Ok(()),
)
.await
.expect("process should spawn");
let pid = node.pid();
drop(node);
assert!(
wait_until_process_exits(pid, Duration::from_secs(1)),
"process {pid} should exit on drop"
);
}
#[cfg(unix)]
fn wait_until_process_exits(pid: u32, timeout: Duration) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if !process_exists(pid) {
return true;
}
std::thread::sleep(Duration::from_millis(20));
}
!process_exists(pid)
}
#[cfg(unix)]
fn process_exists(pid: u32) -> bool {
StdCommand::new("kill")
.arg("-0")
.arg(pid.to_string())
.stderr(Stdio::null())
.status()
.is_ok_and(|status| status.success())
}
}