track network route

This commit is contained in:
Youngjoon Lee 2025-02-04 12:39:58 +09:00
parent a8ee51eacf
commit 88f51730b6
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
3 changed files with 105 additions and 31 deletions

View File

@ -27,3 +27,4 @@ sha2 = "0.10"
uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2"
cached = "0.54.0"
bincode = "1.3.3"

View File

@ -114,6 +114,7 @@ impl SimulationApp {
create_boxed_blendnode(
node_id,
&mut network,
regions_data.clone(),
settings.simulation_settings.clone(),
no_netcap,
BlendnodeSettings {
@ -155,6 +156,7 @@ impl SimulationApp {
fn create_boxed_blendnode(
node_id: NodeId,
network: &mut Network<BlendMessage>,
regions_data: RegionsData,
simulation_settings: SimulationSettings,
no_netcap: bool,
blendnode_settings: BlendnodeSettings,
@ -190,6 +192,7 @@ fn create_boxed_blendnode(
node_id,
blendnode_settings,
network_interface,
regions_data,
))
}

View File

@ -15,6 +15,7 @@ use futures::Stream;
use lottery::StakeLottery;
use message::{Payload, PayloadId};
use multiaddr::Multiaddr;
use netrunner::network::regions::RegionsData;
use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId, NodeIdExt};
use netrunner::{
@ -41,8 +42,20 @@ use state::BlendnodeState;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone)]
pub struct BlendMessage(Vec<u8>);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlendMessage {
message: Vec<u8>,
network_route: Vec<(NodeId, Duration)>,
}
impl BlendMessage {
pub fn new(message: Vec<u8>, origin: NodeId) -> Self {
Self {
message,
network_route: vec![(origin, Duration::ZERO)],
}
}
}
impl PayloadSize for BlendMessage {
fn size_bytes(&self) -> u32 {
@ -50,6 +63,11 @@ impl PayloadSize for BlendMessage {
}
}
struct BlendOutgoingMessageWithRoute {
outgoing_message: BlendOutgoingMessage,
network_route: Vec<(NodeId, Duration)>,
}
#[derive(Deserialize)]
pub struct BlendnodeSettings {
pub connected_peers: Vec<NodeId>,
@ -72,6 +90,7 @@ pub struct BlendNode {
state: BlendnodeState,
settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<BlendMessage>,
regions_data: RegionsData,
message_cache: TimedCache<Sha256Hash, ()>,
data_msg_lottery_update_time_sender: channel::Sender<Duration>,
@ -87,10 +106,10 @@ pub struct BlendNode {
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessageWithRoute>,
temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages:
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessage>, TemporalScheduler>,
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessageWithRoute>, TemporalScheduler>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage>,
@ -101,6 +120,7 @@ impl BlendNode {
id: NodeId,
settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<BlendMessage>,
regions_data: RegionsData,
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
@ -178,6 +198,7 @@ impl BlendNode {
Self {
id,
network_interface,
regions_data,
// We're not coupling this lifespan with the steps now, but it's okay
// We expected that a message will be delivered to most of nodes within 60s.
message_cache: TimedCache::with_lifespan(60),
@ -224,7 +245,8 @@ impl BlendNode {
self.network_interface
.send_message(*node_id, message.clone())
}
self.message_cache.cache_set(Self::sha256(&message.0), ());
self.message_cache
.cache_set(Self::sha256(&message.message), ());
}
fn receive(&mut self) -> Vec<NetworkMessage<BlendMessage>> {
@ -234,7 +256,7 @@ impl BlendNode {
// Retain only messages that have not been seen before
.filter(|msg| {
self.message_cache
.cache_set(Self::sha256(&msg.payload().0), ())
.cache_set(Self::sha256(&msg.payload().message), ())
.is_none()
})
.collect()
@ -246,23 +268,29 @@ impl BlendNode {
hasher.finalize().into()
}
fn schedule_persistent_transmission(&mut self, message: Vec<u8>) {
fn schedule_persistent_transmission(&mut self, message: BlendMessage) {
self.log_message(
"PersistentTransmissionScheduled",
&Self::parse_payload(&message),
&Self::parse_payload(&message.message),
);
self.persistent_sender.send(message).unwrap();
self.persistent_sender
.send(bincode::serialize(&message).unwrap())
.unwrap();
}
fn handle_incoming_message(&mut self, message: Vec<u8>) {
match self.crypto_processor.unwrap_message(&message) {
fn handle_incoming_message(&mut self, message: BlendMessage) {
match self.crypto_processor.unwrap_message(&message.message) {
Ok((unwrapped_message, fully_unwrapped)) => {
let temporal_message = if fully_unwrapped {
BlendOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else {
BlendOutgoingMessage::Outbound(unwrapped_message)
};
self.schedule_temporal_processor(temporal_message);
self.schedule_temporal_processor(BlendOutgoingMessageWithRoute {
outgoing_message: temporal_message,
network_route: message.network_route,
});
}
Err(e) => {
tracing::debug!("Failed to unwrap message: {:?}", e);
@ -270,8 +298,8 @@ impl BlendNode {
}
}
fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) {
let payload = match &message {
fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessageWithRoute) {
let payload = match &message.outgoing_message {
BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()),
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg),
};
@ -301,8 +329,25 @@ impl BlendNode {
self.log_message(format!("MessageReleasedFrom{}", from).as_str(), payload);
}
fn log_message_fully_unwrapped(&self, payload: &Payload) {
self.log_message("MessageFullyUnwrapped", payload);
fn log_message_fully_unwrapped(
&self,
payload: &Payload,
network_route: Vec<(NodeId, Duration)>,
) {
log!(
"MessageFullyUnwrapped",
MessageWithRouteLog {
message: MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id.index(),
},
network_route: network_route
.into_iter()
.map(|(id, delay)| (id.index(), delay.as_millis()))
.collect(),
}
);
}
fn log_message(&self, topic: &str, payload: &Payload) {
@ -356,33 +401,52 @@ impl Node for BlendNode {
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.schedule_persistent_transmission(message);
self.schedule_persistent_transmission(BlendMessage::new(message, self.id));
}
}
// Handle incoming messages
for network_message in self.receive() {
for mut network_message in self.receive() {
assert_eq!(
network_message.payload().network_route.last().unwrap().0,
network_message.from
);
network_message.payload.network_route.push((
self.id,
self.regions_data
.network_behaviour(network_message.from, self.id)
.delay(),
));
self.forward(
network_message.payload().clone(),
Some(network_message.from),
None,
);
self.handle_incoming_message(network_message.into_payload().0);
self.handle_incoming_message(network_message.into_payload());
}
// Proceed temporal processor
if let Poll::Ready(Some(msg)) =
if let Poll::Ready(Some(outgoing_msg_with_route)) =
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
match msg {
BlendOutgoingMessage::Outbound(msg) => {
self.log_message_released_from("TemporalProcessor", &Self::parse_payload(&msg));
self.schedule_persistent_transmission(msg);
match outgoing_msg_with_route.outgoing_message {
BlendOutgoingMessage::Outbound(message) => {
self.log_message_released_from(
"TemporalProcessor",
&Self::parse_payload(&message),
);
self.schedule_persistent_transmission(BlendMessage {
message,
network_route: outgoing_msg_with_route.network_route,
});
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_released_from("TemporalProcessor", &payload);
self.log_message_fully_unwrapped(&payload);
self.log_message_fully_unwrapped(
&payload,
outgoing_msg_with_route.network_route,
);
self.state.num_messages_fully_unwrapped += 1;
//TODO: create a tracing event
}
@ -397,19 +461,19 @@ impl Node for BlendNode {
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.schedule_persistent_transmission(message);
self.schedule_persistent_transmission(BlendMessage::new(message, self.id));
}
// Proceed persistent transmission
if let Poll::Ready(Some(msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{
self.log_message_released_from("PersistentTransmission", &Self::parse_payload(&msg));
self.forward(
BlendMessage(msg),
None,
Some(self.new_emission_log("FromPersistent")),
let msg: BlendMessage = bincode::deserialize(&msg).unwrap();
self.log_message_released_from(
"PersistentTransmission",
&Self::parse_payload(&msg.message),
);
self.forward(msg, None, Some(self.new_emission_log("FromPersistent")));
}
self.state.step_id += 1;
@ -433,6 +497,12 @@ struct MessageLog {
node_id: usize,
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageWithRouteLog {
message: MessageLog,
network_route: Vec<(usize, u128)>,
}
#[derive(Debug, Serialize, Deserialize)]
struct EmissionLog {
emission_type: String,