diff --git a/nomos-cli/config.yaml b/nomos-cli/config.yaml index 98f59795..a1740c24 100644 --- a/nomos-cli/config.yaml +++ b/nomos-cli/config.yaml @@ -1,3 +1,45 @@ backend: - host: "127.0.0.1" - port: 8000 \ No newline at end of file + host: 0.0.0.0 + port: 3019 + log_level: "fatal" + # Node key needs to be unique for every client. + node_key: "0000000000000000000000000000000000000000000000000000000000001444" + discV5BootstrapNodes: [] + initial_peers: ["/dns/testnet.nomos.tech/tcp/3000"] + relayTopics: [] + # Mixclient configuration to communicate with mixnodes. + # The libp2p network backend always requires this mixclient configuration + # (cannot be disabled for now). + mixnet_client: + # A mixclient mode. For details, see the documentation of the "mixnet" crate. + # - Sender + # - !SenderReceiver [mixnode_client_listen_address] + mode: Sender + # A mixnet topology, which contains the information of all mixnodes in the mixnet. + # (The topology is static for now.) + topology: + # Each mixnet layer consists of a list of mixnodes. + layers: + - nodes: + - address: testnet.nomos.tech:7707 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + - nodes: + - address: testnet.nomos.tech:7717 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + - nodes: + - address: testnet.nomos.tech:7727 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + # A max number of connections that will stay connected to mixnodes in the first mixnet layer. + connection_pool_size: 255 + max_retries: 5 + retry_delay: + secs: 1 + nanos: 0 + # A range of total delay that will be set to each Sphinx packets + # sent to the mixnet for timing obfuscation. + # Panics if start > end. + mixnet_delay: + start: "0ms" + end: "0ms" diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index b0463b43..7e6aa575 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -17,6 +17,7 @@ use clap::Args; use full_replication::{ AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, }; +use futures::{stream, StreamExt}; use nomos_core::{block::BlockId, da::DaProtocol, wire}; use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; @@ -238,11 +239,36 @@ async fn check_for_messages(sender: Sender>, node: Url) { } } +// Process a single block's blobs and return chat messages +async fn process_block_blobs( + node: Url, + block_id: &BlockId, + da_settings: DaSettings, +) -> Result, Box> { + let blobs = get_block_blobs(&node, block_id).await?; + + // Note that number of attestations is ignored here since we only use the da protocol to + // decode the blob data, not to validate the certificate + let mut da_protocol = + > as DaProtocol>::new(da_settings); + let mut messages = Vec::new(); + + for blob in blobs { + da_protocol.recv_blob(blob); + let bytes = da_protocol.extract().unwrap(); + if let Ok(message) = wire::deserialize::(&bytes) { + messages.push(message); + } + } + + Ok(messages) +} + +// Fetch new messages since the last tip async fn fetch_new_messages( last_tip: &BlockId, node: &Url, ) -> Result<(BlockId, Vec), Box> { - let mut new_messages = Vec::new(); // By only specifying the 'to' parameter we get all the blocks since the last tip let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) .await? @@ -257,26 +283,26 @@ async fn fetch_new_messages( // We already processed the last block so let's remove it new_blocks.pop(); - // Note that number of attestations is ignored here since we only use the da protocol to - // decode the blob data, not to validate the certificate - let mut da_protocol = - > as DaProtocol>::new( - DaSettings { - num_attestations: 1, - voter: [0; 32], // voter is ignored as well - }, - ); + let da_settings = DaSettings { + num_attestations: 1, + voter: [0; 32], + }; - for block in new_blocks.iter().rev() { - let blobs = get_block_blobs(node, block).await?; - for blob in blobs { - da_protocol.recv_blob(blob); - // Full replication only needs one blob to decode the data, so the unwrap is safe - let bytes = da_protocol.extract().unwrap(); - if let Ok(message) = wire::deserialize::(&bytes) { - new_messages.push(message); - } - } + let block_stream = stream::iter(new_blocks.iter().rev()); + let results: Vec<_> = block_stream + .map(|block| { + let node = node.clone(); + let da_settings = da_settings.clone(); + + process_block_blobs(node, block, da_settings) + }) + .buffer_unordered(new_blocks.len()) + .collect::>() + .await; + + let mut new_messages = Vec::new(); + for result in results { + new_messages.extend(result?); } Ok((new_tip, new_messages))