Outbound relay into_sink

Inbound relay as stream
This commit is contained in:
Daniel Sanchez Quiros 2022-12-13 16:28:22 +01:00
parent d6cc851ee6
commit 6af204fcc4
3 changed files with 19 additions and 1 deletions

1
Cargo.lock generated
View File

@ -818,6 +818,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
]

View File

@ -28,6 +28,7 @@ futures = "0.3"
thiserror = "1.0"
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] }
tokio-stream = {version ="0.1", features = ["sync"] }
tokio-util = "0.7"
tracing = "0.1"
[dev-dependencies]

View File

@ -2,10 +2,14 @@
use std::any::Any;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
// crates
use futures::{poll, Sink, SinkExt, Stream};
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio_util::sync::PollSender;
use tracing::{error, instrument};
// internal
use crate::overwatch::commands::{OverwatchCommand, RelayCommand, ReplyChannel};
@ -105,7 +109,7 @@ impl<M> InboundRelay<M> {
}
}
impl<M> OutboundRelay<M> {
impl<M: Send + 'static> OutboundRelay<M> {
/// Send a message to the relay connection
pub async fn send(&self, message: M) -> Result<(), (RelayError, M)> {
self.sender
@ -130,6 +134,10 @@ impl<M> OutboundRelay<M> {
.blocking_send(message)
.map_err(|e| (RelayError::Send, e.0))
}
pub fn into_sink(self) -> impl Sink<M> + Send + 'static {
PollSender::new(self.sender)
}
}
impl<S: ServiceCore> Relay<S> {
@ -174,3 +182,11 @@ impl<S: ServiceCore> Relay<S> {
}
}
}
impl<M> Stream for InboundRelay<M> {
type Item = M;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}