1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use super::Error;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use tower_service::Service;

/// TODO: Dox
#[pin_project]
#[derive(Debug)]
pub(crate) struct CallAll<Svc, S, Q> {
    service: Option<Svc>,
    #[pin]
    stream: S,
    queue: Q,
    eof: bool,
}

pub(crate) trait Drive<F: Future> {
    fn is_empty(&self) -> bool;

    fn push(&mut self, future: F);

    // NOTE: this implicitly requires Self: Unpin just like Service does
    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
}

impl<Svc, S, Q> CallAll<Svc, S, Q>
where
    Svc: Service<S::Item>,
    Svc::Error: Into<Error>,
    S: Stream,
    Q: Drive<Svc::Future>,
{
    pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
        CallAll {
            service: Some(service),
            stream,
            queue,
            eof: false,
        }
    }

    /// Extract the wrapped `Service`.
    pub(crate) fn into_inner(mut self) -> Svc {
        self.service.take().expect("Service already taken")
    }

    /// Extract the wrapped `Service`.
    pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc {
        self.project()
            .service
            .take()
            .expect("Service already taken")
    }

    pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
        assert!(self.queue.is_empty() && !self.eof);

        super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
    }
}

impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
where
    Svc: Service<S::Item>,
    Svc::Error: Into<Error>,
    S: Stream,
    Q: Drive<Svc::Future>,
{
    type Item = Result<Svc::Response, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        loop {
            // First, see if we have any responses to yield
            if let Poll::Ready(r) = this.queue.poll(cx) {
                if let Some(rsp) = r.transpose().map_err(Into::into)? {
                    return Poll::Ready(Some(Ok(rsp)));
                }
            }

            // If there are no more requests coming, check if we're done
            if *this.eof {
                if this.queue.is_empty() {
                    return Poll::Ready(None);
                } else {
                    return Poll::Pending;
                }
            }

            // Then, see that the service is ready for another request
            let svc = this
                .service
                .as_mut()
                .expect("Using CallAll after extracing inner Service");
            ready!(svc.poll_ready(cx)).map_err(Into::into)?;

            // If it is, gather the next request (if there is one)
            match this.stream.as_mut().poll_next(cx) {
                Poll::Ready(r) => match r {
                    Some(req) => {
                        this.queue.push(svc.call(req));
                    }
                    None => {
                        // We're all done once any outstanding requests have completed
                        *this.eof = true;
                    }
                },
                Poll::Pending => {
                    // TODO: We probably want to "release" the slot we reserved in Svc here.
                    // It may be a while until we get around to actually using it.
                }
            }
        }
    }
}