Struct tower_util::CallAll [−][src]
This is a futures::Stream of responses resulting from calling the wrapped tower::Service
for each request received on the wrapped Stream.
use futures_util::future::{ready, Ready}; use futures_util::StreamExt; use tower_service::Service; use tower_util::ServiceExt; use tokio::prelude::*; // First, we need to have a Service to process our requests. #[derive(Debug, Eq, PartialEq)] struct FirstLetter; impl Service<&'static str> for FirstLetter { type Response = &'static str; type Error = Box<dyn Error + Send + Sync>; type Future = Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { ready(Ok(&req[..1])) } } #[tokio::main] async fn main() { // Next, we need a Stream of requests. let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel(); // Note that we have to help Rust out here by telling it what error type to use. // Specifically, it has to be From<Service::Error> + From<Stream::Error>. let mut rsps = FirstLetter.call_all(rx); // Now, let's send a few requests and then check that we get the corresponding responses. reqs.send("one"); reqs.send("two"); reqs.send("three"); drop(reqs); // We then loop over the response Strem that we get back from call_all. let mut i = 0usize; while let Some(rsp) = rsps.next().await { // Each response is a Result (we could also have used TryStream::try_next) match (i + 1, rsp.unwrap()) { (1, "o") | (2, "t") | (3, "t") => {} (n, i) => { unreachable!("{}. response was '{}'", n, i); } } i += 1; } // And at the end, we can get the Service back when there are no more requests. assert_eq!(rsps.into_inner(), FirstLetter); }
Implementations
impl<Svc, S> CallAll<Svc, S> where
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream, [src]
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream,
pub fn new(service: Svc, stream: S) -> CallAll<Svc, S>[src]
Create new CallAll combinator.
Each request yielded by stread is passed to svc, and the resulting responses are
yielded in the same order by the implementation of Stream for CallAll.
pub fn into_inner(self) -> Svc[src]
pub fn take_service(self: Pin<&mut Self>) -> Svc[src]
Extract the wrapped Service.
This CallAll can no longer be used after this function has been called.
Panics
Panics if take_service was already called.
pub fn unordered(self) -> CallAllUnordered<Svc, S>[src]
Return responses as they are ready, regardless of the initial order.
This function must be called before the stream is polled.
Panics
Panics if poll was called.
Trait Implementations
impl<Svc: Debug, S: Debug> Debug for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
Svc::Future: Debug, [src]
Svc: Service<S::Item>,
S: Stream,
Svc::Future: Debug,
impl<Svc, S> PinnedDrop for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream, [src]
Svc: Service<S::Item>,
S: Stream,
impl<Svc, S> Stream for CallAll<Svc, S> where
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream, [src]
Svc: Service<S::Item>,
Svc::Error: Into<Box<dyn Error + Send + Sync>>,
S: Stream,
type Item = Result<Svc::Response, Box<dyn Error + Send + Sync>>
Values yielded by the stream.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>[src]
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>
pub fn size_hint(&self) -> (usize, Option<usize>)[src]
impl<'pin, Svc, S> Unpin for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
__CallAll<'pin, Svc, S>: Unpin, [src]
Svc: Service<S::Item>,
S: Stream,
__CallAll<'pin, Svc, S>: Unpin,
impl<Svc, S> UnsafeUnpin for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream, [src]
Svc: Service<S::Item>,
S: Stream,
Auto Trait Implementations
impl<Svc, S> !RefUnwindSafe for CallAll<Svc, S>
impl<Svc, S> Send for CallAll<Svc, S> where
S: Send,
Svc: Send,
<Svc as Service<<S as Stream>::Item>>::Error: Send,
<Svc as Service<<S as Stream>::Item>>::Future: Send,
<Svc as Service<<S as Stream>::Item>>::Response: Send,
S: Send,
Svc: Send,
<Svc as Service<<S as Stream>::Item>>::Error: Send,
<Svc as Service<<S as Stream>::Item>>::Future: Send,
<Svc as Service<<S as Stream>::Item>>::Response: Send,
impl<Svc, S> Sync for CallAll<Svc, S> where
S: Sync,
Svc: Sync,
<Svc as Service<<S as Stream>::Item>>::Error: Sync,
<Svc as Service<<S as Stream>::Item>>::Future: Sync,
<Svc as Service<<S as Stream>::Item>>::Response: Sync,
S: Sync,
Svc: Sync,
<Svc as Service<<S as Stream>::Item>>::Error: Sync,
<Svc as Service<<S as Stream>::Item>>::Future: Sync,
<Svc as Service<<S as Stream>::Item>>::Response: Sync,
impl<Svc, S> !UnwindSafe for CallAll<Svc, S>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized, [src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized, [src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized, [src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T[src]
impl<T> From<T> for T[src]
impl<T, U> Into<U> for T where
U: From<T>, [src]
U: From<T>,
impl<T> StreamExt for T where
T: Stream + ?Sized, [src]
T: Stream + ?Sized,
pub fn next(&mut self) -> Next<'_, Self> where
Self: Unpin, [src]
Self: Unpin,
pub fn into_future(self) -> StreamFuture<Self> where
Self: Unpin, [src]
Self: Unpin,
pub fn map<T, F>(self, f: F) -> Map<Self, F> where
F: FnMut(Self::Item) -> T, [src]
F: FnMut(Self::Item) -> T,
pub fn enumerate(self) -> Enumerate<Self>[src]
pub fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
pub fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>, [src]
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>,
pub fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future, [src]
F: FnMut(Self::Item) -> Fut,
Fut: Future,
pub fn collect<C>(self) -> Collect<Self, C> where
C: Default + Extend<Self::Item>, [src]
C: Default + Extend<Self::Item>,
pub fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where
Self: Stream<Item = (A, B)>,
FromA: Default + Extend<A>,
FromB: Default + Extend<B>, [src]
Self: Stream<Item = (A, B)>,
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
pub fn concat(self) -> Concat<Self> where
Self::Item: Extend<<Self::Item as IntoIterator>::Item>,
Self::Item: IntoIterator,
Self::Item: Default, [src]
Self::Item: Extend<<Self::Item as IntoIterator>::Item>,
Self::Item: IntoIterator,
Self::Item: Default,
pub fn cycle(self) -> Cycle<Self> where
Self: Clone, [src]
Self: Clone,
pub fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where
F: FnMut(T, Self::Item) -> Fut,
Fut: Future<Output = T>, [src]
F: FnMut(T, Self::Item) -> Fut,
Fut: Future<Output = T>,
pub fn flatten(self) -> Flatten<Self> where
Self::Item: Stream, [src]
Self::Item: Stream,
pub fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where
U: Stream,
F: FnMut(Self::Item) -> U, [src]
U: Stream,
F: FnMut(Self::Item) -> U,
pub fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where
F: FnMut(&mut S, Self::Item) -> Fut,
Fut: Future<Output = Option<B>>, [src]
F: FnMut(&mut S, Self::Item) -> Fut,
Fut: Future<Output = Option<B>>,
pub fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
pub fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
pub fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where
Fut: Future, [src]
Fut: Future,
pub fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>, [src]
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>,
pub fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F
) -> ForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>, [src]
self,
limit: impl Into<Option<usize>>,
f: F
) -> ForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>,
pub fn take(self, n: usize) -> Take<Self>[src]
pub fn skip(self, n: usize) -> Skip<Self>[src]
pub fn fuse(self) -> Fuse<Self>[src]
pub fn by_ref(&mut self) -> &mut Self[src]
pub fn catch_unwind(self) -> CatchUnwind<Self> where
Self: UnwindSafe, [src]
Self: UnwindSafe,
pub fn boxed<'a>(
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a + Send, Global>> where
Self: Send + 'a, [src]
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a + Send, Global>> where
Self: Send + 'a,
pub fn boxed_local<'a>(
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a, Global>> where
Self: 'a, [src]
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a, Global>> where
Self: 'a,
pub fn buffered(self, n: usize) -> Buffered<Self> where
Self::Item: Future, [src]
Self::Item: Future,
pub fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> where
Self::Item: Future, [src]
Self::Item: Future,
pub fn zip<St>(self, other: St) -> Zip<Self, St> where
St: Stream, [src]
St: Stream,
pub fn chain<St>(self, other: St) -> Chain<Self, St> where
St: Stream<Item = Self::Item>, [src]
St: Stream<Item = Self::Item>,
pub fn peekable(self) -> Peekable<Self>[src]
pub fn chunks(self, capacity: usize) -> Chunks<Self>[src]
pub fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>[src]
pub fn forward<S>(self, sink: S) -> Forward<Self, S> where
Self: TryStream,
S: Sink<Self::Ok, Error = Self::Error>, [src]
Self: TryStream,
S: Sink<Self::Ok, Error = Self::Error>,
pub fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where
Self: Sink<Item>, [src]
Self: Sink<Item>,
pub fn inspect<F>(self, f: F) -> Inspect<Self, F> where
F: FnMut(&Self::Item), [src]
F: FnMut(&Self::Item),
pub fn left_stream<B>(self) -> Either<Self, B> where
B: Stream<Item = Self::Item>, [src]
B: Stream<Item = Self::Item>,
pub fn right_stream<B>(self) -> Either<B, Self> where
B: Stream<Item = Self::Item>, [src]
B: Stream<Item = Self::Item>,
pub fn poll_next_unpin(
&mut self,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> where
Self: Unpin, [src]
&mut self,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> where
Self: Unpin,
pub fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where
Self: Unpin + FusedStream, [src]
Self: Unpin + FusedStream,
impl<T, U> TryFrom<U> for T where
U: Into<T>, [src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>, [src]
U: TryFrom<T>,
type Error = <U as TryFrom<T>>::Error
The type returned in the event of a conversion error.
pub fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>[src]
impl<S, T, E> TryStream for S where
S: Stream<Item = Result<T, E>> + ?Sized, [src]
S: Stream<Item = Result<T, E>> + ?Sized,
type Ok = T
The type of successful values yielded by this future
type Error = E
The type of failures yielded by this future
pub fn try_poll_next(
self: Pin<&mut S>,
cx: &mut Context<'_>
) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>[src]
self: Pin<&mut S>,
cx: &mut Context<'_>
) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>
impl<S> TryStreamExt for S where
S: TryStream + ?Sized, [src]
S: TryStream + ?Sized,
pub fn err_into<E>(self) -> ErrInto<Self, E> where
Self::Error: Into<E>, [src]
Self::Error: Into<E>,
pub fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> where
F: FnMut(Self::Ok) -> T, [src]
F: FnMut(Self::Ok) -> T,
pub fn map_err<E, F>(self, f: F) -> MapErr<Self, F> where
F: FnMut(Self::Error) -> E, [src]
F: FnMut(Self::Error) -> E,
pub fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Error = Self::Error>, [src]
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Error = Self::Error>,
pub fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where
F: FnMut(Self::Error) -> Fut,
Fut: TryFuture<Ok = Self::Ok>, [src]
F: FnMut(Self::Error) -> Fut,
Fut: TryFuture<Ok = Self::Ok>,
pub fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where
F: FnMut(&Self::Ok), [src]
F: FnMut(&Self::Ok),
pub fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> where
F: FnMut(&Self::Error), [src]
F: FnMut(&Self::Error),
pub fn into_stream(self) -> IntoStream<Self>[src]
pub fn try_next(&mut self) -> TryNext<'_, Self> where
Self: Unpin, [src]
Self: Unpin,
pub fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>, [src]
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>,
pub fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>, [src]
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
pub fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>, [src]
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
pub fn try_for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F
) -> TryForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: Future<Output = Result<(), Self::Error>>, [src]
self,
limit: impl Into<Option<usize>>,
f: F
) -> TryForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: Future<Output = Result<(), Self::Error>>,
pub fn try_collect<C>(self) -> TryCollect<Self, C> where
C: Default + Extend<Self::Ok>, [src]
C: Default + Extend<Self::Ok>,
pub fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where
F: FnMut(&Self::Ok) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Ok) -> Fut,
Fut: Future<Output = bool>,
pub fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, [src]
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
pub fn try_flatten(self) -> TryFlatten<Self> where
Self::Ok: TryStream,
<Self::Ok as TryStream>::Error: From<Self::Error>, [src]
Self::Ok: TryStream,
<Self::Ok as TryStream>::Error: From<Self::Error>,
pub fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where
F: FnMut(T, Self::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = Self::Error>, [src]
F: FnMut(T, Self::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = Self::Error>,
pub fn try_concat(self) -> TryConcat<Self> where
Self::Ok: Extend<<Self::Ok as IntoIterator>::Item>,
Self::Ok: IntoIterator,
Self::Ok: Default, [src]
Self::Ok: Extend<<Self::Ok as IntoIterator>::Item>,
Self::Ok: IntoIterator,
Self::Ok: Default,
pub fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error, [src]
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error,
pub fn try_buffered(self, n: usize) -> TryBuffered<Self> where
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error, [src]
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error,
pub fn try_poll_next_unpin(
&mut self,
cx: &mut Context<'_>
) -> Poll<Option<Result<Self::Ok, Self::Error>>> where
Self: Unpin, [src]
&mut self,
cx: &mut Context<'_>
) -> Poll<Option<Result<Self::Ok, Self::Error>>> where
Self: Unpin,
pub fn into_async_read(self) -> IntoAsyncRead<Self> where
Self: TryStreamExt<Error = Error> + Unpin,
Self::Ok: AsRef<[u8]>, [src]
Self: TryStreamExt<Error = Error> + Unpin,
Self::Ok: AsRef<[u8]>,