Cli: Deserialization and runtime fixes (#854)

* Deserialization and runtime fixes for cli

* Disable dissemination test
This commit is contained in:
gusto 2024-10-25 14:56:37 +03:00 committed by GitHub
parent 461a7beaf8
commit bebb15f921
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 49 additions and 25 deletions

View File

@ -1,5 +1,6 @@
// std
use std::path::PathBuf;
use std::sync::mpsc::Sender;
// crates
use clap::Args;
use reqwest::Url;
@ -54,13 +55,7 @@ impl Disseminate {
let metadata = Metadata::new(app_id, self.index.into());
let (res_sender, res_receiver) = std::sync::mpsc::channel();
tokio::spawn(async move {
let res = client
.publish_blob(bytes, metadata)
.await
.map_err(|err| format!("Failed to publish blob: {:?}", err));
res_sender.send(res).unwrap();
});
std::thread::spawn(move || disperse_data(res_sender, client, bytes, metadata));
match res_receiver.recv() {
Ok(update) => match update {
@ -80,3 +75,17 @@ impl Disseminate {
Ok(())
}
}
#[tokio::main]
async fn disperse_data(
res_sender: Sender<Result<(), String>>,
client: ExecutorHttpClient,
bytes: Vec<u8>,
metadata: Metadata,
) {
let res = client
.publish_blob(bytes, metadata)
.await
.map_err(|err| format!("Failed to publish blob: {:?}", err));
res_sender.send(res).unwrap();
}

View File

@ -1,4 +1,5 @@
// std
use std::sync::mpsc::Sender;
use std::{error::Error, ops::Range};
// crates
use clap::Args;
@ -6,8 +7,11 @@ use reqwest::{Client, Url};
use serde::{de::DeserializeOwned, Serialize};
// internal
use kzgrs_backend::{common::blob::DaBlob, dispersal::Index};
use nomos_core::da::blob::{self, metadata};
use nomos_core::da::blob::metadata;
use nomos_node::api::{handlers::GetRangeReq, paths};
use nomos_node::wire;
type RetrievalRes<Index> = Result<Vec<(Index, Vec<Vec<u8>>)>, Box<dyn Error + Send + Sync>>;
#[derive(Args, Debug)]
pub struct Retrieve {
@ -33,21 +37,12 @@ impl Retrieve {
let app_id: [u8; 32] = hex::decode(&self.app_id)?
.try_into()
.map_err(|_| "Invalid app_id")?;
let addr = self.addr;
let from: Index = self.from.into();
let to: Index = self.to.into();
let (res_sender, res_receiver) = std::sync::mpsc::channel();
tokio::spawn(async move {
let res = get_app_data_range_from_node::<DaBlob, kzgrs_backend::dispersal::Metadata>(
reqwest::Client::new(),
self.addr.clone(),
app_id,
from..to,
)
.await
.map_err(|err| format!("Failed to retrieve data form appid {app_id:?}: {err:?}",));
res_sender.send(res).unwrap();
});
std::thread::spawn(move || retrieve_data(res_sender, addr, app_id, from..to));
match res_receiver.recv() {
Ok(update) => match update {
@ -55,18 +50,19 @@ impl Retrieve {
for (index, blobs) in app_blobs.iter() {
tracing::info!("Index {:?} has {:} blobs", (index), blobs.len());
for blob in blobs.iter() {
let blob = wire::deserialize::<DaBlob>(blob).unwrap();
tracing::info!("Index {:?}; Blob: {blob:?}", index.to_u64());
}
}
}
Err(e) => {
tracing::error!("Error receiving data: {e}");
return Err(e.into());
return Err(e);
}
},
Err(e) => {
tracing::error!("Failed to receive from client thread: {e}");
return Err(e.into());
return Err(Box::new(e));
}
}
@ -75,14 +71,30 @@ impl Retrieve {
}
}
pub async fn get_app_data_range_from_node<Blob, Metadata>(
#[tokio::main]
async fn retrieve_data(
res_sender: Sender<RetrievalRes<Index>>,
url: Url,
app_id: [u8; 32],
range: Range<Index>,
) {
let res = get_app_data_range_from_node::<kzgrs_backend::dispersal::Metadata>(
reqwest::Client::new(),
url,
app_id,
range,
)
.await;
res_sender.send(res).unwrap();
}
async fn get_app_data_range_from_node<Metadata>(
client: Client,
url: Url,
app_id: Metadata::AppId,
range: Range<Metadata::Index>,
) -> Result<Vec<(Metadata::Index, Vec<Blob>)>, Box<dyn Error>>
) -> RetrievalRes<Metadata::Index>
where
Blob: blob::Blob + DeserializeOwned,
Metadata: metadata::Metadata + Serialize,
<Metadata as metadata::Metadata>::Index: Serialize + DeserializeOwned,
<Metadata as metadata::Metadata>::AppId: Serialize + DeserializeOwned,
@ -99,7 +111,7 @@ where
.send()
.await
.unwrap()
.json::<Vec<(Metadata::Index, Vec<Blob>)>>()
.json::<Vec<(Metadata::Index, Vec<Vec<u8>>)>>()
.await
.unwrap())
}

View File

@ -6,6 +6,7 @@ export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="http://cfgsync:4400" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_KIND="executor" \
CFG_HOST_IDENTIFIER="executor-$(hostname -i)" \
LOG_LEVEL="INFO" \
RISC0_DEV_MODE=true

View File

@ -5,6 +5,7 @@ set -e
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="http://cfgsync:4400" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_IDENTIFIER="validator-$(hostname -i)" \
LOG_LEVEL="INFO" \
RISC0_DEV_MODE=true

View File

@ -28,6 +28,7 @@ async fn disseminate(executor: &Executor, data: &[u8]) {
client.publish_blob(data.to_vec(), metadata).await.unwrap();
}
#[ignore = "todo: reenable after mixnet is tested"]
#[tokio::test]
async fn disseminate_and_retrieve() {
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;