Receive blocks blobs in parallel (#554)

* Receive blocks blobs in parallel

* Update nomos-cli config
This commit is contained in:
gusto 2024-01-08 12:56:11 +02:00 committed by GitHub
parent 15c366c606
commit 285300f365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 22 deletions

View File

@ -1,3 +1,45 @@
backend:
host: "127.0.0.1"
port: 8000
host: 0.0.0.0
port: 3019
log_level: "fatal"
# Node key needs to be unique for every client.
node_key: "0000000000000000000000000000000000000000000000000000000000001444"
discV5BootstrapNodes: []
initial_peers: ["/dns/testnet.nomos.tech/tcp/3000"]
relayTopics: []
# Mixclient configuration to communicate with mixnodes.
# The libp2p network backend always requires this mixclient configuration
# (cannot be disabled for now).
mixnet_client:
# A mixclient mode. For details, see the documentation of the "mixnet" crate.
# - Sender
# - !SenderReceiver [mixnode_client_listen_address]
mode: Sender
# A mixnet topology, which contains the information of all mixnodes in the mixnet.
# (The topology is static for now.)
topology:
# Each mixnet layer consists of a list of mixnodes.
layers:
- nodes:
- address: testnet.nomos.tech:7707 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: testnet.nomos.tech:7717 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: testnet.nomos.tech:7727 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
# A max number of connections that will stay connected to mixnodes in the first mixnet layer.
connection_pool_size: 255
max_retries: 5
retry_delay:
secs: 1
nanos: 0
# A range of total delay that will be set to each Sphinx packets
# sent to the mixnet for timing obfuscation.
# Panics if start > end.
mixnet_delay:
start: "0ms"
end: "0ms"

View File

@ -17,6 +17,7 @@ use clap::Args;
use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use futures::{stream, StreamExt};
use nomos_core::{block::BlockId, da::DaProtocol, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
@ -238,11 +239,36 @@ async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
}
}
// Process a single block's blobs and return chat messages
async fn process_block_blobs(
node: Url,
block_id: &BlockId,
da_settings: DaSettings,
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let blobs = get_block_blobs(&node, block_id).await?;
// Note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate
let mut da_protocol =
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(da_settings);
let mut messages = Vec::new();
for blob in blobs {
da_protocol.recv_blob(blob);
let bytes = da_protocol.extract().unwrap();
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
messages.push(message);
}
}
Ok(messages)
}
// Fetch new messages since the last tip
async fn fetch_new_messages(
last_tip: &BlockId,
node: &Url,
) -> Result<(BlockId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
let mut new_messages = Vec::new();
// By only specifying the 'to' parameter we get all the blocks since the last tip
let mut new_blocks = get_blocks_info(node, None, Some(*last_tip))
.await?
@ -257,26 +283,26 @@ async fn fetch_new_messages(
// We already processed the last block so let's remove it
new_blocks.pop();
// Note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate
let mut da_protocol =
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(
DaSettings {
num_attestations: 1,
voter: [0; 32], // voter is ignored as well
},
);
let da_settings = DaSettings {
num_attestations: 1,
voter: [0; 32],
};
for block in new_blocks.iter().rev() {
let blobs = get_block_blobs(node, block).await?;
for blob in blobs {
da_protocol.recv_blob(blob);
// Full replication only needs one blob to decode the data, so the unwrap is safe
let bytes = da_protocol.extract().unwrap();
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
new_messages.push(message);
}
}
let block_stream = stream::iter(new_blocks.iter().rev());
let results: Vec<_> = block_stream
.map(|block| {
let node = node.clone();
let da_settings = da_settings.clone();
process_block_blobs(node, block, da_settings)
})
.buffer_unordered(new_blocks.len())
.collect::<Vec<_>>()
.await;
let mut new_messages = Vec::new();
for result in results {
new_messages.extend(result?);
}
Ok((new_tip, new_messages))