Struct tower_util::CallAll [−][src]
This is a futures::Stream
of responses resulting from calling the wrapped tower::Service
for each request received on the wrapped Stream
.
use futures_util::future::{ready, Ready}; use futures_util::StreamExt; use tower_service::Service; use tower_util::ServiceExt; use tokio::prelude::*; // First, we need to have a Service to process our requests. #[derive(Debug, Eq, PartialEq)] struct FirstLetter; impl Service<&'static str> for FirstLetter { type Response = &'static str; type Error = Box<dyn Error + Send + Sync>; type Future = Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { ready(Ok(&req[..1])) } } #[tokio::main] async fn main() { // Next, we need a Stream of requests. let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel(); // Note that we have to help Rust out here by telling it what error type to use. // Specifically, it has to be From<Service::Error> + From<Stream::Error>. let mut rsps = FirstLetter.call_all(rx); // Now, let's send a few requests and then check that we get the corresponding responses. reqs.send("one"); reqs.send("two"); reqs.send("three"); drop(reqs); // We then loop over the response Strem that we get back from call_all. let mut i = 0usize; while let Some(rsp) = rsps.next().await { // Each response is a Result (we could also have used TryStream::try_next) match (i + 1, rsp.unwrap()) { (1, "o") | (2, "t") | (3, "t") => {} (n, i) => { unreachable!("{}. response was '{}'", n, i); } } i += 1; } // And at the end, we can get the Service back when there are no more requests. assert_eq!(rsps.into_inner(), FirstLetter); }
Implementations
impl<Svc, S> CallAll<Svc, S> where
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream,
[src][−]
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream,
pub fn new(service: Svc, stream: S) -> CallAll<Svc, S>
[src][−]
Create new CallAll
combinator.
Each request yielded by stread
is passed to svc
, and the resulting responses are
yielded in the same order by the implementation of Stream
for CallAll
.
pub fn into_inner(self) -> Svc
[src][−]
pub fn take_service(self: Pin<&mut Self>) -> Svc
[src][−]
Extract the wrapped Service
.
This CallAll
can no longer be used after this function has been called.
Panics
Panics if take_service
was already called.
pub fn unordered(self) -> CallAllUnordered<Svc, S>
[src][−]
Return responses as they are ready, regardless of the initial order.
This function must be called before the stream is polled.
Panics
Panics if poll
was called.
Trait Implementations
impl<Svc: Debug, S: Debug> Debug for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
Svc::Future: Debug,
[src][+]
Svc: Service<S::Item>,
S: Stream,
Svc::Future: Debug,
impl<Svc, S> PinnedDrop for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
[src][+]
Svc: Service<S::Item>,
S: Stream,
impl<Svc, S> Stream for CallAll<Svc, S> where
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream,
[src][+]
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream,
impl<'pin, Svc, S> Unpin for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
__CallAll<'pin, Svc, S>: Unpin,
[src]
Svc: Service<S::Item>,
S: Stream,
__CallAll<'pin, Svc, S>: Unpin,
impl<Svc, S> UnsafeUnpin for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
[src]
Svc: Service<S::Item>,
S: Stream,
Auto Trait Implementations
impl<Svc, S> !RefUnwindSafe for CallAll<Svc, S>
impl<Svc, S> Send for CallAll<Svc, S> where
S: Send,
Svc: Send,
<Svc as Service<<S as Stream>::Item>>::Error: Send,
<Svc as Service<<S as Stream>::Item>>::Future: Send,
<Svc as Service<<S as Stream>::Item>>::Response: Send,
S: Send,
Svc: Send,
<Svc as Service<<S as Stream>::Item>>::Error: Send,
<Svc as Service<<S as Stream>::Item>>::Future: Send,
<Svc as Service<<S as Stream>::Item>>::Response: Send,
impl<Svc, S> Sync for CallAll<Svc, S> where
S: Sync,
Svc: Sync,
<Svc as Service<<S as Stream>::Item>>::Error: Sync,
<Svc as Service<<S as Stream>::Item>>::Future: Sync,
<Svc as Service<<S as Stream>::Item>>::Response: Sync,
S: Sync,
Svc: Sync,
<Svc as Service<<S as Stream>::Item>>::Error: Sync,
<Svc as Service<<S as Stream>::Item>>::Future: Sync,
<Svc as Service<<S as Stream>::Item>>::Response: Sync,
impl<Svc, S> !UnwindSafe for CallAll<Svc, S>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src][+]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src][+]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src][+]
T: ?Sized,
impl<T> From<T> for T
[src][+]
impl<T, U> Into<U> for T where
U: From<T>,
[src][+]
U: From<T>,
impl<T> StreamExt for T where
T: Stream + ?Sized,
[src][+]
T: Stream + ?Sized,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src][+]
U: Into<T>,
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src][+]
U: TryFrom<T>,
impl<S, T, E> TryStream for S where
S: Stream<Item = Result<T, E>> + ?Sized,
[src][+]
S: Stream<Item = Result<T, E>> + ?Sized,
impl<S> TryStreamExt for S where
S: TryStream + ?Sized,
[src][+]
S: TryStream + ?Sized,