Relay sink + stream (#13)
* Outbound relay into_sink Inbound relay as stream * Cleanup imports * Remove not necessary bounds
This commit is contained in:
parent
2827f0c6a2
commit
c9e9429b60
|
@ -818,6 +818,7 @@ dependencies = [
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ futures = "0.3"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] }
|
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] }
|
||||||
tokio-stream = {version ="0.1", features = ["sync"] }
|
tokio-stream = {version ="0.1", features = ["sync"] }
|
||||||
|
tokio-util = "0.7"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
@ -2,10 +2,14 @@
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
// crates
|
// crates
|
||||||
|
use futures::{Sink, Stream};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
use tokio_util::sync::PollSender;
|
||||||
use tracing::{error, instrument};
|
use tracing::{error, instrument};
|
||||||
// internal
|
// internal
|
||||||
use crate::overwatch::commands::{OverwatchCommand, RelayCommand, ReplyChannel};
|
use crate::overwatch::commands::{OverwatchCommand, RelayCommand, ReplyChannel};
|
||||||
|
@ -105,7 +109,7 @@ impl<M> InboundRelay<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> OutboundRelay<M> {
|
impl<M: Send + 'static> OutboundRelay<M> {
|
||||||
/// Send a message to the relay connection
|
/// Send a message to the relay connection
|
||||||
pub async fn send(&self, message: M) -> Result<(), (RelayError, M)> {
|
pub async fn send(&self, message: M) -> Result<(), (RelayError, M)> {
|
||||||
self.sender
|
self.sender
|
||||||
|
@ -130,6 +134,10 @@ impl<M> OutboundRelay<M> {
|
||||||
.blocking_send(message)
|
.blocking_send(message)
|
||||||
.map_err(|e| (RelayError::Send, e.0))
|
.map_err(|e| (RelayError::Send, e.0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_sink(self) -> impl Sink<M> {
|
||||||
|
PollSender::new(self.sender)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: ServiceCore> Relay<S> {
|
impl<S: ServiceCore> Relay<S> {
|
||||||
|
@ -174,3 +182,11 @@ impl<S: ServiceCore> Relay<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<M> Stream for InboundRelay<M> {
|
||||||
|
type Item = M;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
self.receiver.poll_recv(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue