From bed0b9448df1bfa74cc1c81866ecd56a6323f715 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Mon, 19 Jun 2023 15:27:14 +0200 Subject: [PATCH] Simulation unhappy path (#193) * Use elapsed time * Added timeout * Extract tally * Missing elapsed time * Fix new view leader behaviour * Fix tests * Fix timeout double check * Fix logs * TimeoutHandler nitpicks * Clippy happy * Fix timeout sub * Modify discard messages comment --- simulations/src/node/carnot/event_builder.rs | 120 +++++++++++-------- simulations/src/node/carnot/mod.rs | 85 +++++++++---- simulations/src/node/carnot/tally.rs | 37 ++++++ simulations/src/node/carnot/timeout.rs | 34 ++++++ simulations/src/node/dummy.rs | 46 +++---- simulations/src/node/dummy_streaming.rs | 3 +- simulations/src/node/mod.rs | 4 +- simulations/src/runner/async_runner.rs | 4 +- simulations/src/runner/glauber_runner.rs | 4 +- simulations/src/runner/layered_runner.rs | 4 +- simulations/src/runner/mod.rs | 6 +- simulations/src/runner/sync_runner.rs | 8 +- 12 files changed, 249 insertions(+), 106 deletions(-) create mode 100644 simulations/src/node/carnot/tally.rs create mode 100644 simulations/src/node/carnot/timeout.rs diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs index a170afea..1153d6c5 100644 --- a/simulations/src/node/carnot/event_builder.rs +++ b/simulations/src/node/carnot/event_builder.rs @@ -1,11 +1,14 @@ -use crate::node::carnot::messages::CarnotMessage; +use crate::node::carnot::{messages::CarnotMessage, tally::Tally, timeout::TimeoutHandler}; use crate::util::parse_idx; -use consensus_engine::{Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote}; +use consensus_engine::{ + AggregateQc, Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, +}; use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}; use nomos_consensus::NodeId; use nomos_core::block::Block; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::hash::Hash; +use std::time::Duration; pub type CarnotTx = [u8; 32]; @@ -14,28 +17,54 @@ pub(crate) struct EventBuilder { leader_vote_message: Tally, vote_message: Tally, timeout_message: Tally, + leader_new_view_message: Tally, new_view_message: Tally, + timeout_handler: TimeoutHandler, pub(crate) current_view: View, } impl EventBuilder { - pub fn new(id: NodeId) -> Self { + pub fn new(id: NodeId, timeout: Duration) -> Self { Self { vote_message: Default::default(), leader_vote_message: Default::default(), timeout_message: Default::default(), + leader_new_view_message: Default::default(), new_view_message: Default::default(), current_view: View::default(), id, + timeout_handler: TimeoutHandler::new(timeout), + } + } + + fn local_timeout(&mut self, view: View, elapsed: Duration) -> bool { + if self.timeout_handler.step(view, elapsed) { + self.timeout_handler.prune_by_view(view); + true + } else { + false } } pub fn step( &mut self, - messages: Vec, + mut messages: Vec, engine: &Carnot, + elapsed: Duration, ) -> Vec> { let mut events = Vec::new(); + // check timeout and exit + if self.local_timeout(engine.current_view(), elapsed) { + events.push(Event::LocalTimeout); + // if we timeout discard incoming current view messages + messages.retain(|msg| { + matches!( + msg, + CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_) + ) + }); + } + // only run when the engine is in the genesis view if engine.highest_voted_view() == -1 && engine.overlay().is_member_of_leaf_committee(self.id) @@ -67,7 +96,14 @@ impl EventBuilder { events.push(Event::Proposal { block }) } CarnotMessage::TimeoutQc(msg) => { + let timeout_qc = msg.qc.clone(); events.push(Event::TimeoutQc { timeout_qc: msg.qc }); + if engine.overlay().is_member_of_leaf_committee(self.id) { + events.push(Event::NewView { + timeout_qc, + new_views: Default::default(), + }); + } } CarnotMessage::Vote(msg) => { let msg_view = msg.vote.view; @@ -138,66 +174,48 @@ impl EventBuilder { } CarnotMessage::NewView(msg) => { let msg_view = msg.vote.view; + let voter = msg.voter; let timeout_qc = msg.vote.timeout_qc.clone(); - self.current_view = core::cmp::max(self.current_view, msg_view); - // if we are the leader, then use the leader threshold, otherwise use the leaf threshold - let threshold = if engine.is_next_leader() { + let is_next_view_leader = engine.is_next_leader(); + let is_message_from_root_committee = + engine.overlay().is_member_of_root_committee(voter); + + let tally = if is_message_from_root_committee { + &mut self.leader_new_view_message + } else { + &mut self.new_view_message + }; + + // if the message comes from the root committee, then use the leader threshold, otherwise use the leaf threshold + let threshold = if is_message_from_root_committee { engine.leader_super_majority_threshold() } else { engine.super_majority_threshold() }; - if let Some(new_views) = - self.new_view_message.tally_by(msg_view, msg, threshold) - { - events.push(Event::NewView { - new_views: new_views.into_iter().map(|v| v.vote).collect(), - timeout_qc, - }) + if let Some(votes) = tally.tally_by(msg_view, msg, threshold) { + if is_next_view_leader && is_message_from_root_committee { + let high_qc = engine.high_qc(); + events.push(Event::ProposeBlock { + qc: Qc::Aggregated(AggregateQc { + high_qc, + view: msg_view + 1, + }), + }); + } else { + events.push(Event::NewView { + timeout_qc, + new_views: votes.into_iter().map(|v| v.vote).collect(), + }); + } } } } } - events } } -struct Tally { - cache: HashMap>, - threshold: usize, -} - -impl Default for Tally { - fn default() -> Self { - Self::new(0) - } -} - -impl Tally { - fn new(threshold: usize) -> Self { - Self { - cache: Default::default(), - threshold, - } - } - - fn tally(&mut self, view: View, message: T) -> Option> { - self.tally_by(view, message, self.threshold) - } - - fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option> { - let entries = self.cache.entry(view).or_default(); - entries.insert(message); - let entries = entries.len(); - if entries == threshold { - Some(self.cache.remove(&view).unwrap()) - } else { - None - } - } -} - pub enum Event { Proposal { block: Block, diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 8b2f6522..ef900fd8 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -3,6 +3,8 @@ mod event_builder; mod message_cache; mod messages; +mod tally; +mod timeout; // std use std::hash::Hash; @@ -22,7 +24,7 @@ use consensus_engine::overlay::RandomBeaconState; use consensus_engine::{ Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote, }; -use nomos_consensus::network::messages::ProposalChunkMsg; +use nomos_consensus::network::messages::{ProposalChunkMsg, TimeoutQcMsg}; use nomos_consensus::{ leader_selection::UpdateableLeaderSelection, network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}, @@ -117,6 +119,7 @@ impl CarnotNode { let overlay = O::new(overlay_settings); let engine = Carnot::from_genesis(id, genesis.header().clone(), overlay); let state = CarnotState::from(&engine); + let timeout = settings.timeout; // pk is generated in an insecure way, but for simulation purpouses using a rng like smallrng is more useful let mut pk_buff = [0; 32]; rng.fill_bytes(&mut pk_buff); @@ -127,7 +130,7 @@ impl CarnotNode { settings, network_interface, message_cache: MessageCache::new(), - event_builder: event_builder::EventBuilder::new(id), + event_builder: event_builder::EventBuilder::new(id, timeout), engine, random_beacon_pk, } @@ -186,8 +189,14 @@ impl CarnotNode { ); } } - Output::BroadcastTimeoutQc { .. } => { - unimplemented!() + Output::BroadcastTimeoutQc { timeout_qc } => { + self.network_interface.send_message( + self.id, + CarnotMessage::TimeoutQc(TimeoutQcMsg { + source: self.id, + qc: timeout_qc, + }), + ); } Output::BroadcastProposal { proposal } => { for node in &self.settings.nodes { @@ -221,7 +230,7 @@ impl> Node for Car &self.state } - fn step(&mut self) { + fn step(&mut self, elapsed: Duration) { // split messages per view, we just one to process the current engine processing view or proposals or timeoutqcs let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self .network_interface @@ -236,7 +245,9 @@ impl> Node for Car self.message_cache.update(other_view_messages); current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view())); - let events = self.event_builder.step(current_view_messages, &self.engine); + let events = self + .event_builder + .step(current_view_messages, &self.engine, elapsed); for event in events { let mut output: Vec> = vec![]; @@ -313,28 +324,60 @@ impl> Node for Car // This branch means we already get enough new view msgs for this qc // So we can just call approve_new_view Event::NewView { - timeout_qc: _, - new_views: _, + timeout_qc, + new_views, } => { - // let (new, out) = self.engine.approve_new_view(timeout_qc, new_views); - // output = Some(out); - // self.engine = new; - // let next_view = timeout_qc.view + 2; - // if self.engine.is_leader_for_view(next_view) { - // self.gather_new_views(&[self.id].into_iter().collect(), timeout_qc); - // } - tracing::error!("unimplemented new view branch"); - unimplemented!() + let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views); + output.push(Output::Send(out)); + self.engine = new; + tracing::info!( + node = parse_idx(&self.id), + current_view = self.engine.current_view(), + timeout_view = timeout_qc.view, + "receive new view message" + ); } Event::TimeoutQc { timeout_qc } => { - self.engine = self.engine.receive_timeout_qc(timeout_qc); + tracing::info!( + node = parse_idx(&self.id), + current_view = self.engine.current_view(), + timeout_view = timeout_qc.view, + "receive timeout qc message" + ); + self.engine = self.engine.receive_timeout_qc(timeout_qc.clone()); } Event::RootTimeout { timeouts } => { - println!("root timeouts: {timeouts:?}"); + tracing::debug!("root timeout {:?}", timeouts); + if self.engine.is_member_of_root_committee() { + assert!(timeouts + .iter() + .all(|t| t.view == self.engine.current_view())); + let high_qc = timeouts + .iter() + .map(|t| &t.high_qc) + .chain(std::iter::once(&self.engine.high_qc())) + .max_by_key(|qc| qc.view) + .expect("empty root committee") + .clone(); + let timeout_qc = TimeoutQc { + view: timeouts.iter().next().unwrap().view, + high_qc, + sender: self.id(), + }; + output.push(Output::BroadcastTimeoutQc { timeout_qc }); + } } Event::LocalTimeout => { - tracing::error!("unimplemented local timeout branch"); - unreachable!("local timeout will never be constructed") + tracing::info!( + node = parse_idx(&self.id), + current_view = self.engine.current_view(), + "receive local timeout message" + ); + let (new, out) = self.engine.local_timeout(); + self.engine = new; + if let Some(out) = out { + output.push(Output::Send(out)); + } } Event::None => { tracing::error!("unimplemented none branch"); diff --git a/simulations/src/node/carnot/tally.rs b/simulations/src/node/carnot/tally.rs new file mode 100644 index 00000000..6f3ecb3a --- /dev/null +++ b/simulations/src/node/carnot/tally.rs @@ -0,0 +1,37 @@ +use consensus_engine::View; +use std::collections::{HashMap, HashSet}; + +pub(crate) struct Tally { + cache: HashMap>, + threshold: usize, +} + +impl Default for Tally { + fn default() -> Self { + Self::new(0) + } +} + +impl Tally { + pub fn new(threshold: usize) -> Self { + Self { + cache: Default::default(), + threshold, + } + } + + pub fn tally(&mut self, view: View, message: T) -> Option> { + self.tally_by(view, message, self.threshold) + } + + pub fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option> { + let entries = self.cache.entry(view).or_default(); + entries.insert(message); + let entries = entries.len(); + if entries >= threshold { + Some(self.cache.remove(&view).unwrap()) + } else { + None + } + } +} diff --git a/simulations/src/node/carnot/timeout.rs b/simulations/src/node/carnot/timeout.rs new file mode 100644 index 00000000..c3fb67ba --- /dev/null +++ b/simulations/src/node/carnot/timeout.rs @@ -0,0 +1,34 @@ +use consensus_engine::View; +use polars::export::ahash::HashMap; +use std::time::Duration; + +pub(crate) struct TimeoutHandler { + pub timeout: Duration, + pub per_view: HashMap, +} + +impl TimeoutHandler { + pub fn new(timeout: Duration) -> Self { + Self { + timeout, + per_view: Default::default(), + } + } + + pub fn step(&mut self, view: View, elapsed: Duration) -> bool { + let timeout = self.per_view.entry(view).or_insert(self.timeout); + *timeout = timeout.saturating_sub(elapsed); + *timeout == Duration::ZERO + } + + pub fn is_timeout(&self, view: View) -> bool { + self.per_view + .get(&view) + .map(|t| t.is_zero()) + .unwrap_or(false) + } + + pub fn prune_by_view(&mut self, view: View) { + self.per_view.retain(|entry, _| entry > &view); + } +} diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs index b6771c62..b08f5a8e 100644 --- a/simulations/src/node/dummy.rs +++ b/simulations/src/node/dummy.rs @@ -1,5 +1,6 @@ // std use std::collections::{BTreeMap, BTreeSet}; +use std::time::Duration; // crates use serde::{Deserialize, Serialize}; // internal @@ -362,7 +363,7 @@ impl Node for DummyNode { &self.state } - fn step(&mut self) { + fn step(&mut self, _: Duration) { let incoming_messages = self.network_interface.receive_messages(); self.state.message_count += incoming_messages.len(); @@ -565,19 +566,19 @@ mod tests { for (_, node) in nodes.iter() { assert_eq!(node.current_view(), 0); } - + let elapsed = Duration::from_millis(100); // 1. Leaders receive vote and broadcast new Proposal(Block) to all nodes. - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); // 2. a) All nodes received proposal block. // b) Leaf nodes send vote to internal nodes. - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); @@ -598,9 +599,9 @@ mod tests { assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf // 3. Internal nodes send vote to root node. - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); @@ -616,9 +617,9 @@ mod tests { assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf // 4. Root node send vote to next view leader nodes. - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); @@ -632,9 +633,9 @@ mod tests { assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf // 5. Leaders receive vote and broadcast new Proposal(Block) to all nodes. - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); @@ -645,9 +646,9 @@ mod tests { // 6. a) All nodes received proposal block. // b) Leaf nodes send vote to internal nodes. - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); @@ -705,11 +706,11 @@ mod tests { for (_, node) in nodes.iter() { assert_eq!(node.current_view(), 0); } - + let elapsed = Duration::from_millis(100); for _ in 0..7 { - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); } @@ -755,11 +756,11 @@ mod tests { for (_, node) in nodes.iter() { assert_eq!(node.current_view(), 0); } - + let elapsed = Duration::from_millis(100); for _ in 0..7 { - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); } @@ -803,10 +804,11 @@ mod tests { network.collect_messages(); let nodes = Arc::new(RwLock::new(nodes)); + let elapsed = Duration::from_millis(100); for _ in 0..9 { - network.dispatch_after(Duration::from_millis(100)); + network.dispatch_after(elapsed); nodes.write().par_iter_mut().for_each(|(_, node)| { - node.step(); + node.step(elapsed); }); network.collect_messages(); } diff --git a/simulations/src/node/dummy_streaming.rs b/simulations/src/node/dummy_streaming.rs index ba8bcbe5..9ab340cd 100644 --- a/simulations/src/node/dummy_streaming.rs +++ b/simulations/src/node/dummy_streaming.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use std::time::Duration; use super::{Node, NodeId}; @@ -42,7 +43,7 @@ impl Node for DummyStreamingNode { &self.state } - fn step(&mut self) { + fn step(&mut self, _: Duration) { self.state.current_view += 1; } } diff --git a/simulations/src/node/mod.rs b/simulations/src/node/mod.rs index 4d2e4c4a..fa19f098 100644 --- a/simulations/src/node/mod.rs +++ b/simulations/src/node/mod.rs @@ -154,7 +154,7 @@ pub trait Node { // TODO: View must be view whenever we integrate consensus engine fn current_view(&self) -> usize; fn state(&self) -> &Self::State; - fn step(&mut self); + fn step(&mut self, elapsed: Duration); } #[cfg(test)] @@ -174,7 +174,7 @@ impl Node for usize { self } - fn step(&mut self) { + fn step(&mut self, _: Duration) { use std::ops::AddAssign; self.add_assign(1); } diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index 28d585bd..b61b20fb 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -9,6 +9,7 @@ use rayon::prelude::*; use serde::Serialize; use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use super::SimulationRunnerHandle; @@ -39,6 +40,7 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; + let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { loop { select! { @@ -53,7 +55,7 @@ where .write() .par_iter_mut() .filter(|n| ids.contains(&n.id())) - .for_each(N::step); + .for_each(|node|node.step(elapsed)); p.send(R::try_from( &simulation_state, diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index e1c06e75..85f8f03c 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -9,6 +9,7 @@ use rand::prelude::IteratorRandom; use serde::Serialize; use std::collections::BTreeSet; use std::sync::Arc; +use std::time::Duration; use super::SimulationRunnerHandle; @@ -42,6 +43,7 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; + let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { 'main: for chunk in iterations.chunks(update_rate) { select! { @@ -61,7 +63,7 @@ where let node: &mut N = shared_nodes .get_mut(parse_idx(&node_id)) .expect("Node should be present"); - node.step(); + node.step(elapsed); } // check if any condition makes the simulation stop diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index ca525f81..34e671ba 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -33,6 +33,7 @@ use crossbeam::select; use std::collections::BTreeSet; use std::ops::Not; use std::sync::Arc; +use std::time::Duration; // crates use fixed_slice_deque::FixedSliceDeque; use rand::prelude::{IteratorRandom, SliceRandom}; @@ -80,6 +81,7 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; + let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { loop { select! { @@ -99,7 +101,7 @@ where .get_mut(parse_idx(&node_id)) .expect("Node should be present"); let prev_view = node.current_view(); - node.step(); + node.step(elapsed); let after_view = node.current_view(); if after_view > prev_view { // pass node to next step group diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index c65a4c91..4fd01486 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -80,15 +80,15 @@ where .any(|x| x) } - fn step(&mut self, nodes: &mut [N]) + fn step(&mut self, nodes: &mut [N], elapsed: Duration) where N: Node + Send + Sync, N::Settings: Clone + Send, N::State: Serialize, { - self.network.dispatch_after(Duration::from_millis(100)); + self.network.dispatch_after(elapsed); nodes.par_iter_mut().for_each(|node| { - node.step(); + node.step(elapsed); }); self.network.collect_messages(); } diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index bf5a8f8e..e81478c2 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -5,6 +5,7 @@ use crate::warding::SimulationState; use crate::{node::Node, output_processors::Record}; use crossbeam::channel::{bounded, select}; use std::sync::Arc; +use std::time::Duration; /// Simulate with sending the network state to any subscriber pub fn simulate( @@ -31,6 +32,7 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; + let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { p.send(R::try_from(&state)?)?; loop { @@ -44,7 +46,7 @@ where // then dead lock will occur { let mut nodes = nodes.write(); - inner_runner.step(&mut nodes); + inner_runner.step(&mut nodes, elapsed); } p.send(R::try_from(&state)?)?; @@ -147,7 +149,7 @@ mod tests { let mut runner: SimulationRunner = SimulationRunner::new(network, nodes, producer, settings).unwrap(); let mut nodes = runner.nodes.write(); - runner.inner.step(&mut nodes); + runner.inner.step(&mut nodes, Duration::from_millis(100)); drop(nodes); let nodes = runner.nodes.read(); @@ -194,7 +196,7 @@ mod tests { SimulationRunner::new(network, nodes, Default::default(), settings).unwrap(); let mut nodes = runner.nodes.write(); - runner.inner.step(&mut nodes); + runner.inner.step(&mut nodes, Duration::from_millis(100)); drop(nodes); let nodes = runner.nodes.read();