This commit is contained in:
Youngjoon Lee 2025-02-05 19:46:31 +09:00
parent 95365b8282
commit 551ba09900
No known key found for this signature in database
GPG Key ID: D94003D91DE12141

View File

@ -288,11 +288,7 @@ impl BlendNode {
.filter(|&id| Some(*id) != exclude_node)
{
let mut message = message.clone();
message.history.push(MessageEvent::NetworkSent {
from_node_id: self.id,
to_node_id: *node_id,
step_id: self.state.step_id,
});
self.record_network_sent_event(&mut message.history, *node_id);
self.network_interface.send_message(*node_id, message)
}
self.message_cache
@ -319,13 +315,7 @@ impl BlendNode {
}
fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) {
message
.history
.push(MessageEvent::PersistentTransmissionScheduled {
node_id: self.id,
step_id: self.state.step_id,
index: self.state.cur_num_persistent_scheduled,
});
self.record_persistent_scheduled_event(&mut message.history);
self.persistent_sender.send(message).unwrap();
self.state.cur_num_persistent_scheduled += 1;
}
@ -351,13 +341,7 @@ impl BlendNode {
}
fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) {
message
.history
.push(MessageEvent::TemporalProcessorScheduled {
node_id: self.id,
step_id: self.state.step_id,
index: self.state.cur_num_temporal_scheduled,
});
self.record_temporal_scheduled_event(&mut message.history);
self.temporal_sender.send(message).unwrap();
self.state.cur_num_temporal_scheduled += 1;
}
@ -386,6 +370,82 @@ impl BlendNode {
);
}
fn record_network_sent_event(&self, history: &mut Vec<MessageEvent>, to: NodeId) {
history.push(MessageEvent::NetworkSent {
from_node_id: self.id,
to_node_id: to,
step_id: self.state.step_id,
})
}
fn record_network_received_event(&self, history: &mut Vec<MessageEvent>, from: NodeId) {
match history.last().unwrap() {
MessageEvent::NetworkSent {
from_node_id,
to_node_id,
step_id,
} => {
assert_eq!(*from_node_id, from);
assert_eq!(*to_node_id, self.id);
history.push(MessageEvent::NetworkReceived {
from_node_id: from,
to_node_id: self.id,
step_id: self.state.step_id,
latency: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message event: {:?}", event),
}
}
fn record_persistent_scheduled_event(&mut self, history: &mut Vec<MessageEvent>) {
history.push(MessageEvent::PersistentTransmissionScheduled {
node_id: self.id,
step_id: self.state.step_id,
index: self.state.cur_num_persistent_scheduled,
})
}
fn record_persistent_released_event(&mut self, history: &mut Vec<MessageEvent>) {
match history.last().unwrap() {
MessageEvent::PersistentTransmissionScheduled {
node_id, step_id, ..
} => {
assert_eq!(*node_id, self.id);
history.push(MessageEvent::PersistentTransmissionReleased {
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message event: {:?}", event),
}
}
fn record_temporal_scheduled_event(&mut self, history: &mut Vec<MessageEvent>) {
history.push(MessageEvent::TemporalProcessorScheduled {
node_id: self.id,
step_id: self.state.step_id,
index: self.state.cur_num_temporal_scheduled,
})
}
fn record_temporal_released_event(&mut self, history: &mut Vec<MessageEvent>) {
match history.last().unwrap() {
MessageEvent::TemporalProcessorScheduled {
node_id, step_id, ..
} => {
assert_eq!(*node_id, self.id);
history.push(MessageEvent::TemporalProcessorReleased {
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message event: {:?}", event),
}
}
fn duration_between(&self, from_step: usize, to_step: usize) -> Duration {
self.step_time
.mul((to_step - from_step).try_into().unwrap())
@ -428,25 +488,10 @@ impl Node for BlendNode {
// Handle incoming messages
for mut network_message in self.receive() {
match network_message.payload.history.last().unwrap() {
MessageEvent::NetworkSent {
to_node_id,
step_id,
..
} => {
assert_eq!(*to_node_id, self.id);
network_message
.payload
.history
.push(MessageEvent::NetworkReceived {
from_node_id: network_message.from,
to_node_id: self.id,
step_id: self.state.step_id,
latency: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message history event: {:?}", event),
}
self.record_network_received_event(
&mut network_message.payload.history,
network_message.from,
);
if network_message.payload().is_drop() {
continue;
@ -460,38 +505,23 @@ impl Node for BlendNode {
}
// Proceed temporal processor
if let Poll::Ready(Some(mut outgoing_msg_with_route)) =
if let Poll::Ready(Some(mut outgoing_msg_with_history)) =
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
// Add a TemporalProcessorReleased history event
match outgoing_msg_with_route.history.last().unwrap() {
MessageEvent::TemporalProcessorScheduled {
node_id, step_id, ..
} => {
assert_eq!(*node_id, self.id);
outgoing_msg_with_route
.history
.push(MessageEvent::TemporalProcessorReleased {
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
self.state.cur_num_temporal_scheduled -= 1;
}
event => panic!("Unexpected message history event: {:?}", event),
}
self.record_temporal_released_event(&mut outgoing_msg_with_history.history);
self.state.cur_num_temporal_scheduled -= 1;
// Proceed the message
match outgoing_msg_with_route.outgoing_message {
match outgoing_msg_with_history.outgoing_message {
BlendOutgoingMessage::Outbound(message) => {
self.schedule_persistent_transmission(BlendMessage {
message,
history: outgoing_msg_with_route.history,
history: outgoing_msg_with_history.history,
});
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.history);
self.log_message_fully_unwrapped(&payload, outgoing_msg_with_history.history);
self.state.num_messages_fully_unwrapped += 1;
}
}
@ -515,22 +545,8 @@ impl Node for BlendNode {
if let Poll::Ready(Some(mut msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{
// Add a PersistentTransmissionReleased history event
match msg.history.last().unwrap() {
MessageEvent::PersistentTransmissionScheduled {
node_id, step_id, ..
} => {
assert_eq!(*node_id, self.id);
msg.history
.push(MessageEvent::PersistentTransmissionReleased {
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
self.state.cur_num_persistent_scheduled -= 1;
}
event => panic!("Unexpected message history event: {:?}", event),
}
self.record_persistent_released_event(&mut msg.history);
self.state.cur_num_persistent_scheduled -= 1;
self.forward(msg, None);
}