rename track to history

This commit is contained in:
Youngjoon Lee 2025-02-05 16:31:41 +09:00
parent 88e7b5e4a7
commit cc2fd20988
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
2 changed files with 37 additions and 37 deletions

View File

@ -85,20 +85,20 @@
"stream_settings": {
"path": "test.json"
},
"node_count": 100,
"node_count": 10,
"seed": 0,
"record_settings": {},
"wards": [
{
"sum": 100
"sum": 10
}
],
"connected_peers_count": 4,
"data_message_lottery_interval": "20s",
"stake_proportion": 1.0,
"epoch_duration": "200s",
"epoch_duration": "20s",
"slot_duration": "1s",
"slots_per_epoch": 200,
"slots_per_epoch": 20,
"number_of_hops": 2,
"persistent_transmission": {
"max_emission_frequency": 1.0,

View File

@ -46,14 +46,14 @@ use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlendMessage {
message: Vec<u8>,
tracks: Vec<MessageTrack>,
history: Vec<MessageHistoryEvent>,
}
impl BlendMessage {
pub fn new(message: Vec<u8>) -> Self {
Self {
message,
tracks: Vec::new(),
history: Vec::new(),
}
}
}
@ -65,7 +65,7 @@ impl PayloadSize for BlendMessage {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum MessageTrack {
enum MessageHistoryEvent {
PersistentTransmissionScheduled {
#[serde(with = "node_id_serde")]
node_id: NodeId,
@ -103,9 +103,9 @@ enum MessageTrack {
},
}
struct BlendOutgoingMessageWithRoute {
struct BlendOutgoingMessageWithHistory {
outgoing_message: BlendOutgoingMessage,
tracks: Vec<MessageTrack>,
history: Vec<MessageHistoryEvent>,
}
#[derive(Deserialize)]
@ -149,10 +149,10 @@ pub struct BlendNode {
num_persistent_transmission_scheduled: usize,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessageWithRoute>,
temporal_sender: channel::Sender<BlendOutgoingMessageWithHistory>,
temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages:
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessageWithRoute>, TemporalScheduler>,
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessageWithHistory>, TemporalScheduler>,
num_temporal_processor_scheduled: usize,
epoch_update_sender: channel::Sender<Duration>,
@ -323,8 +323,8 @@ impl BlendNode {
&Self::parse_payload(&message.message),
);
message
.tracks
.push(MessageTrack::PersistentTransmissionScheduled {
.history
.push(MessageHistoryEvent::PersistentTransmissionScheduled {
node_id: self.id,
step_id: self.state.step_id,
index: self.num_persistent_transmission_scheduled,
@ -344,9 +344,9 @@ impl BlendNode {
BlendOutgoingMessage::Outbound(unwrapped_message)
};
self.schedule_temporal_processor(BlendOutgoingMessageWithRoute {
self.schedule_temporal_processor(BlendOutgoingMessageWithHistory {
outgoing_message: temporal_message,
tracks: message.tracks,
history: message.history,
});
}
Err(e) => {
@ -355,15 +355,15 @@ impl BlendNode {
}
}
fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithRoute) {
fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) {
let payload = match &message.outgoing_message {
BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()),
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg),
};
self.log_message("TemporalProcessorScheduled", &payload);
message
.tracks
.push(MessageTrack::TemporalProcessorScheduled {
.history
.push(MessageHistoryEvent::TemporalProcessorScheduled {
node_id: self.id,
step_id: self.state.step_id,
index: self.num_temporal_processor_scheduled,
@ -394,16 +394,16 @@ impl BlendNode {
self.log_message(format!("MessageReleasedFrom{}", from).as_str(), payload);
}
fn log_message_fully_unwrapped(&self, payload: &Payload, tracks: Vec<MessageTrack>) {
fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec<MessageHistoryEvent>) {
log!(
"MessageFullyUnwrapped",
MessageWithTracks {
MessageWithHistory {
message: MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id.index(),
},
tracks,
history,
}
);
}
@ -472,8 +472,8 @@ impl Node for BlendNode {
for mut network_message in self.receive() {
network_message
.payload
.tracks
.push(MessageTrack::NetworkReceived {
.history
.push(MessageHistoryEvent::NetworkReceived {
from_node_id: network_message.from,
to_node_id: self.id,
delay: self
@ -495,18 +495,18 @@ impl Node for BlendNode {
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
// Add a TemporalProcessorReleased track
match outgoing_msg_with_route.tracks.last().unwrap() {
MessageTrack::TemporalProcessorScheduled {
match outgoing_msg_with_route.history.last().unwrap() {
MessageHistoryEvent::TemporalProcessorScheduled {
node_id, step_id, ..
} => {
assert_eq!(*node_id, self.id);
outgoing_msg_with_route
.tracks
.push(MessageTrack::TemporalProcessorReleased {
outgoing_msg_with_route.history.push(
MessageHistoryEvent::TemporalProcessorReleased {
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
},
);
self.num_temporal_processor_scheduled -= 1;
}
track => panic!("Unexpected message track: {:?}", track),
@ -521,13 +521,13 @@ impl Node for BlendNode {
);
self.schedule_persistent_transmission(BlendMessage {
message,
tracks: outgoing_msg_with_route.tracks,
history: outgoing_msg_with_route.history,
});
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_released_from("TemporalProcessor", &payload);
self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.tracks);
self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.history);
self.state.num_messages_fully_unwrapped += 1;
//TODO: create a tracing event
}
@ -551,13 +551,13 @@ impl Node for BlendNode {
{
let mut msg: BlendMessage = bincode::deserialize(&msg).unwrap();
// Add a PersistentTransmissionReleased track
match msg.tracks.last().unwrap() {
MessageTrack::PersistentTransmissionScheduled {
match msg.history.last().unwrap() {
MessageHistoryEvent::PersistentTransmissionScheduled {
node_id, step_id, ..
} => {
assert_eq!(*node_id, self.id);
msg.tracks
.push(MessageTrack::PersistentTransmissionReleased {
msg.history
.push(MessageHistoryEvent::PersistentTransmissionReleased {
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
@ -595,9 +595,9 @@ struct MessageLog {
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageWithTracks {
struct MessageWithHistory {
message: MessageLog,
tracks: Vec<MessageTrack>,
history: Vec<MessageHistoryEvent>,
}
#[derive(Debug, Serialize, Deserialize)]