Add webrtc transport multidim interop (#100)

* update interop rust test lib to use tokio

to allow us add the WebRTC transport test (which is only available with
the tokio runtime)
Clean up the test and update Cargo.toml to 0.2.0 to avoid conflicting with the older tests, and allow us to import lib.rs
on the rust-libp2p repo and have the master tests there.

* simplify build_builder function

* replace deprecated ping keepalive

with KeepAlive Behaviour

* add WebRTC support for rust tests

* add webrtc support on interop generator.

* fix typo on generators.ts

webrtc instead of webtransport.

* update rust test plans to use enums instead.

* update rust tests enum to match the ones by generator.ts

* fix webrtc testcases to have proper security and muxer.

* review: address suggestions:

- undo generator changes, let's do that later on
- don't parse muxer and sec protocol where we don't need it

* ident generator.ts.
This commit is contained in:
João Oliveira 2023-01-18 11:56:09 +00:00 committed by GitHub
parent 150c5ca618
commit b0f3a2d2ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1411 additions and 585 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +1,19 @@
[package] [package]
edition = "2021" edition = "2021"
name = "testplan" name = "testplan"
version = "0.1.0" version = "0.2.0"
[dependencies] [dependencies]
async-trait = "0.1.58"
anyhow = "1" anyhow = "1"
async-std = { version = "1.10", features = ["attributes", "tokio1"] } async-trait = "0.1.58"
env_logger = "0.9.0" env_logger = "0.9.0"
futures = "0.3.1"
if-addrs = "0.7.0" if-addrs = "0.7.0"
log = "0.4" log = "0.4"
futures = "0.3.1" redis = { version = "0.22.1", features = ["tokio-native-tls-comp", "tokio-comp"] }
tokio = { version = "1.24.1", features = ["full"] }
libp2pv0500 = { package = "libp2p", version = "0.50.0", features = ["websocket", "webrtc", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros"] }
rand = "0.8.5"
strum = { version = "0.24.1", features = ["derive"] }
libp2pv0500 = { package = "libp2p", version = "0.50.0", features = ["websocket", "quic", "mplex", "yamux", "tcp", "async-std", "ping", "noise", "tls", "dns", "rsa", "macros"] }
redis = { version = "0.22.1", features = ["async-std-tls-comp"] }

View File

@ -1,144 +1,148 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite, StreamExt};
use libp2p::core::transport::Boxed;
use libp2p::core::upgrade::EitherUpgrade;
use libp2p::noise::NoiseOutput;
use libp2p::tls::TlsStream;
use libp2p::{core::muxing::StreamMuxerBox, swarm::derive_prelude::EitherOutput};
use libp2pv0500 as libp2p;
use libp2pv0500::swarm::SwarmEvent;
use libp2pv0500::websocket::WsConfig;
use libp2pv0500::*;
use std::collections::HashSet; use std::collections::HashSet;
use std::env; use std::env;
use std::time::Duration; use std::time::Duration;
use testplan::{run_ping_redis, PingSwarm};
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite, StreamExt};
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::core::upgrade::EitherUpgrade;
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::websocket::WsConfig;
use libp2p::{
core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport as _,
};
use libp2pv0500 as libp2p;
use testplan::{run_ping, Muxer, PingSwarm, SecProtocol, Transport};
fn build_builder<T, C>( fn build_builder<T, C>(
builder: core::transport::upgrade::Builder<T>, builder: core::transport::upgrade::Builder<T>,
secure_channel_param: String, secure_channel_param: SecProtocol,
muxer_param: String, muxer_param: Muxer,
local_key: identity::Keypair, local_key: &identity::Keypair,
) -> Boxed<(libp2p::PeerId, StreamMuxerBox)> ) -> Boxed<(libp2p::PeerId, StreamMuxerBox)>
where where
T: Transport<Output = C> + Send + Unpin + 'static, T: libp2p::Transport<Output = C> + Send + Unpin + 'static,
<T as libp2p::Transport>::Error: Sync + Send, <T as libp2p::Transport>::Error: Sync + Send + 'static,
<T as libp2p::Transport>::Error: 'static,
<T as libp2p::Transport>::ListenerUpgrade: Send, <T as libp2p::Transport>::ListenerUpgrade: Send,
<T as libp2p::Transport>::Dial: Send, <T as libp2p::Transport>::Dial: Send,
C: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
let secure_upgrade = match secure_channel_param.as_str() { let mux_upgrade = match muxer_param {
"noise" => EitherUpgrade::A(libp2p::noise::NoiseAuthenticated::xx(&local_key).unwrap()), Muxer::Yamux => EitherUpgrade::A(yamux::YamuxConfig::default()),
"tls" => EitherUpgrade::B(libp2p::tls::Config::new(&local_key).unwrap()), Muxer::Mplex => EitherUpgrade::B(mplex::MplexConfig::default()),
_ => panic!("Unsupported secure channel"),
}; };
trait AsyncRW: 'static + AsyncRead + AsyncWrite + Unpin + Send {} let timeout = Duration::from_secs(5);
impl<T> AsyncRW for T where T: 'static + AsyncRead + AsyncWrite + Unpin + Send {}
let f = |x: EitherOutput< match secure_channel_param {
(libp2p::PeerId, NoiseOutput<core::Negotiated<C>>), SecProtocol::Noise => builder
(libp2p::PeerId, TlsStream<core::Negotiated<C>>), .authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap())
>| .multiplex(mux_upgrade)
-> (PeerId, Box<dyn AsyncRW>) { .timeout(timeout)
match x { .boxed(),
EitherOutput::First((p_id, out)) => (p_id, Box::new(out)), SecProtocol::Tls => builder
EitherOutput::Second((p_id, out)) => (p_id, Box::new(out)), .authenticate(libp2p::tls::Config::new(&local_key).unwrap())
} .multiplex(mux_upgrade)
}; .timeout(timeout)
.boxed(),
let secure_upgrade = secure_upgrade.map_outbound(f).map_inbound(f); }
let authenticated = builder.authenticate(secure_upgrade);
let mux_upgrade = match muxer_param.as_str() {
"yamux" => EitherUpgrade::A(yamux::YamuxConfig::default()),
"mplex" => EitherUpgrade::B(mplex::MplexConfig::default()),
_ => panic!("Unsupported muxer"),
};
authenticated
.multiplex(mux_upgrade)
.timeout(Duration::from_secs(5))
.boxed()
} }
#[async_std::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
let transport_param = let transport_param: Transport =
env::var("transport").context("transport environment variable is not set")?; testplan::from_env("transport").context("unsupported transport")?;
let secure_channel_param =
env::var("security").context("security environment variable is not set")?;
let muxer_param = env::var("muxer").context("muxer environment variable is not set")?;
let ip = env::var("ip").context("ip environment variable is not set")?; let ip = env::var("ip").context("ip environment variable is not set")?;
let is_dialer = env::var("is_dialer")
.unwrap_or("true".into())
.parse::<bool>()?;
let redis_addr = env::var("REDIS_ADDR") let redis_addr = env::var("REDIS_ADDR")
.map(|addr| format!("redis://{addr}")) .map(|addr| format!("redis://{addr}"))
.unwrap_or("redis://redis:6379".into()); .unwrap_or("redis://redis:6379".into());
let client = redis::Client::open(redis_addr).context("Could not connect to redis")?; let client = redis::Client::open(redis_addr).context("Could not connect to redis")?;
let (boxed_transport, local_addr) = match transport_param.as_str() { let (boxed_transport, local_addr) = match transport_param {
"quic-v1" => { Transport::QuicV1 => {
let builder = let builder =
libp2p::quic::async_std::Transport::new(libp2p::quic::Config::new(&local_key)) libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(&local_key))
.map(|(p, c), _| (p, StreamMuxerBox::new(c))); .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
(builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1")) (builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1"))
} }
"tcp" => { Transport::Tcp => {
let builder = libp2p::tcp::async_io::Transport::new(libp2p::tcp::Config::new()) let builder = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new())
.upgrade(libp2p::core::upgrade::Version::V1Lazy); .upgrade(libp2p::core::upgrade::Version::V1Lazy);
let secure_channel_param: SecProtocol =
testplan::from_env("security").context("unsupported secure channel")?;
let muxer_param: Muxer =
testplan::from_env("muxer").context("unsupported multiplexer")?;
( (
build_builder( build_builder(builder, secure_channel_param, muxer_param, &local_key),
builder,
secure_channel_param,
muxer_param,
local_key.clone(),
),
format!("/ip4/{ip}/tcp/0"), format!("/ip4/{ip}/tcp/0"),
) )
} }
"ws" => { Transport::Ws => {
let builder = WsConfig::new(libp2p::tcp::async_io::Transport::new( let builder = WsConfig::new(libp2p::tcp::tokio::Transport::new(
libp2p::tcp::Config::new(), libp2p::tcp::Config::new(),
)) ))
.upgrade(libp2p::core::upgrade::Version::V1Lazy); .upgrade(libp2p::core::upgrade::Version::V1Lazy);
let secure_channel_param: SecProtocol =
testplan::from_env("security").context("unsupported secure channel")?;
let muxer_param: Muxer =
testplan::from_env("muxer").context("unsupported multiplexer")?;
( (
build_builder( build_builder(builder, secure_channel_param, muxer_param, &local_key),
builder,
secure_channel_param,
muxer_param,
local_key.clone(),
),
format!("/ip4/{ip}/tcp/0/ws"), format!("/ip4/{ip}/tcp/0/ws"),
) )
} }
_ => panic!("Unsupported"), Transport::Webrtc => (
webrtc::tokio::Transport::new(
local_key,
webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?,
)
.map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)))
.boxed(),
format!("/ip4/{ip}/udp/0/webrtc"),
),
}; };
let swarm = OrphanRuleWorkaround(Swarm::with_async_std_executor( let swarm = OrphanRuleWorkaround(Swarm::with_tokio_executor(
boxed_transport, boxed_transport,
ping::Behaviour::new( Behaviour {
#[allow(deprecated)] ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
// TODO: Fixing this deprecation requires https://github.com/libp2p/rust-libp2p/pull/3055. keep_alive: keep_alive::Behaviour,
ping::Config::new() },
.with_interval(Duration::from_secs(1))
.with_keep_alive(true),
),
local_peer_id, local_peer_id,
)); ));
run_ping_redis(client, swarm, &local_addr, local_peer_id).await?; // Use peer id as a String so that `run_ping` does not depend on a specific libp2p version.
let local_peer_id = local_peer_id.to_string();
run_ping(client, swarm, &local_addr, &local_peer_id, is_dialer).await?;
Ok(()) Ok(())
} }
struct OrphanRuleWorkaround(Swarm<ping::Behaviour>); #[derive(NetworkBehaviour)]
#[behaviour(prelude = "libp2pv0500::swarm::derive_prelude")]
struct Behaviour {
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}
struct OrphanRuleWorkaround(Swarm<Behaviour>);
#[async_trait] #[async_trait]
impl PingSwarm for OrphanRuleWorkaround { impl PingSwarm for OrphanRuleWorkaround {
@ -181,10 +185,10 @@ impl PingSwarm for OrphanRuleWorkaround {
let mut received_pings = Vec::with_capacity(number); let mut received_pings = Vec::with_capacity(number);
while received_pings.len() < number { while received_pings.len() < number {
if let Some(SwarmEvent::Behaviour(ping::Event { if let Some(SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
peer: _, peer: _,
result: Ok(ping::Success::Ping { rtt }), result: Ok(ping::Success::Ping { rtt }),
})) = self.0.next().await }))) = self.0.next().await
{ {
received_pings.push(rtt); received_pings.push(rtt);
} }

View File

@ -1,14 +1,54 @@
use std::{env, str::FromStr, time::Duration};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use env_logger::Env; use env_logger::Env;
use libp2pv0500::PeerId;
use log::info; use log::info;
use redis::{AsyncCommands, Client as Rclient}; use redis::{AsyncCommands, Client as Rclient};
use std::{env, time::Duration}; use strum::EnumString;
const REDIS_TIMEOUT: usize = 10; const REDIS_TIMEOUT: usize = 10;
/// Supported transports by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Transport {
Tcp,
QuicV1,
Webrtc,
Ws,
}
/// Supported stream multiplexers by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum Muxer {
Mplex,
Yamux,
}
/// Supported security protocols by rust-libp2p.
#[derive(Clone, Debug, EnumString)]
#[strum(serialize_all = "kebab-case")]
pub enum SecProtocol {
Noise,
Tls,
}
/// Helper function to get a ENV variable into an test parameter like `Transport`.
pub fn from_env<T>(env_var: &str) -> Result<T>
where
T: FromStr,
T::Err: std::error::Error + Send + Sync + 'static,
{
env::var(env_var)
.with_context(|| format!("{env_var} environment variable is not set"))?
.parse()
.map_err(Into::into)
}
/// PingSwarm allows us to abstract over libp2p versions for `run_ping`.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait PingSwarm: Sized { pub trait PingSwarm: Sized + Send + 'static {
async fn listen_on(&mut self, address: &str) -> Result<String>; async fn listen_on(&mut self, address: &str) -> Result<String>;
fn dial(&mut self, address: &str) -> Result<()>; fn dial(&mut self, address: &str) -> Result<()>;
@ -22,17 +62,15 @@ pub trait PingSwarm: Sized {
fn local_peer_id(&self) -> String; fn local_peer_id(&self) -> String;
} }
pub enum TransportKind { /// Run a ping interop test. Based on `is_dialer`, either dial the address
Tcp, /// retrieved via `listenAddr` key over the redis connection. Or wait to be pinged and have
WebSocket, /// `dialerDone` key ready on the redis connection.
Quic, pub async fn run_ping<S>(
}
pub async fn run_ping_redis<S>(
client: Rclient, client: Rclient,
mut swarm: S, mut swarm: S,
local_addr: &str, local_addr: &str,
local_peer_id: PeerId, local_peer_id: &str,
is_dialer: bool,
) -> Result<()> ) -> Result<()>
where where
S: PingSwarm, S: PingSwarm,
@ -42,9 +80,11 @@ where
info!("Running ping test: {}", swarm.local_peer_id()); info!("Running ping test: {}", swarm.local_peer_id());
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let is_dialer = env::var("is_dialer") info!(
.unwrap_or("true".into()) "Test instance, listening for incoming connections on: {:?}.",
.parse::<bool>()?; local_addr
);
let local_addr = swarm.listen_on(local_addr).await?;
if is_dialer { if is_dialer {
let result: Vec<String> = conn.blpop("listenerAddr", REDIS_TIMEOUT).await?; let result: Vec<String> = conn.blpop("listenerAddr", REDIS_TIMEOUT).await?;
@ -65,17 +105,17 @@ where
results.first().expect("Should have a ping result") results.first().expect("Should have a ping result")
); );
} else { } else {
let local_addr = swarm.listen_on(local_addr).await?;
let ma = format!("{local_addr}/p2p/{local_peer_id}"); let ma = format!("{local_addr}/p2p/{local_peer_id}");
conn.rpush("listenerAddr", ma).await?; conn.rpush("listenerAddr", ma).await?;
info!(
"Test instance, listening for incoming connections on: {:?}.",
local_addr
);
swarm.await_connections(1).await; // Drive Swarm in the background while we await for `dialerDone` to be ready.
tokio::spawn(async move {
swarm.loop_on_next().await;
});
let _done: Vec<String> = conn.blpop("dialerDone", REDIS_TIMEOUT).await?; let done: Vec<String> = conn.blpop("dialerDone", REDIS_TIMEOUT).await?;
done.get(1)
.context("Failed to wait for dialer conclusion")?;
info!("Ping successful"); info!("Ping successful");
} }

View File

@ -48,6 +48,7 @@ export async function buildTestSpecs(versions: Array<Version>): Promise<Array<Co
AND ma.muxer == mb.muxer AND ma.muxer == mb.muxer
-- quic only uses its own muxer/securechannel -- quic only uses its own muxer/securechannel
AND a.transport != "webtransport" AND a.transport != "webtransport"
AND a.transport != "webrtc"
AND a.transport != "quic" AND a.transport != "quic"
AND a.transport != "quic-v1";`); AND a.transport != "quic-v1";`);
const quicQueryResults = const quicQueryResults =
@ -68,6 +69,12 @@ export async function buildTestSpecs(versions: Array<Version>): Promise<Array<Co
WHERE a.transport == b.transport WHERE a.transport == b.transport
-- Only webtransport transports -- Only webtransport transports
AND a.transport == "webtransport";`); AND a.transport == "webtransport";`);
const webrtcQueryResults =
await db.all(`SELECT DISTINCT a.id as id1, b.id as id2, a.transport
FROM transports a, transports b
WHERE a.transport == b.transport
-- Only webrtc transports
AND a.transport == "webrtc";`);
await db.close(); await db.close();
const testSpecs = queryResults.map((test): ComposeSpecification => ( const testSpecs = queryResults.map((test): ComposeSpecification => (
@ -79,14 +86,27 @@ export async function buildTestSpecs(versions: Array<Version>): Promise<Array<Co
muxer: test.muxer, muxer: test.muxer,
security: test.sec, security: test.sec,
}) })
)).concat(quicQueryResults.concat(quicV1QueryResults).concat(webtransportQueryResults).map((test): ComposeSpecification => buildSpec(containerImages, { )).concat(
name: `${test.id1} x ${test.id2} (${test.transport})`, quicQueryResults
dialerID: test.id1, .concat(quicV1QueryResults)
listenerID: test.id2, .concat(webtransportQueryResults)
transport: test.transport, .map((test): ComposeSpecification => buildSpec(containerImages, {
muxer: "quic", name: `${test.id1} x ${test.id2} (${test.transport})`,
security: "quic", dialerID: test.id1,
}))) listenerID: test.id2,
transport: test.transport,
muxer: "quic",
security: "quic",
})))
.concat(webrtcQueryResults
.map((test): ComposeSpecification => buildSpec(containerImages, {
name: `${test.id1} x ${test.id2} (${test.transport})`,
dialerID: test.id1,
listenerID: test.id2,
transport: test.transport,
muxer: "webrtc",
security: "webrtc",
})))
return testSpecs return testSpecs
} }

View File

@ -16,7 +16,7 @@ export const versions: Array<Version> = [
{ {
id: "rust-v0.50.0", id: "rust-v0.50.0",
containerImageID: rustv050.imageID, containerImageID: rustv050.imageID,
transports: ["ws", "tcp", "quic-v1"], transports: ["ws", "tcp", "quic-v1", "webrtc"],
secureChannels: ["tls", "noise"], secureChannels: ["tls", "noise"],
muxers: ["mplex", "yamux"], muxers: ["mplex", "yamux"],
}, },