1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
use super::super::service; use super::connection::Connection; use crate::transport::Endpoint; use std::{ future::Future, hash::Hash, pin::Pin, task::{Context, Poll}, }; use tokio::{stream::Stream, sync::mpsc::Receiver}; use tower::discover::{Change, Discover}; pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> { changes: Receiver<Change<K, Endpoint>>, connecting: Option<( K, Pin<Box<dyn Future<Output = Result<Connection, crate::Error>> + Send + 'static>>, )>, } impl<K: Hash + Eq + Clone> DynamicServiceStream<K> { pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self { Self { changes, connecting: None, } } } impl<K: Hash + Eq + Clone> Discover for DynamicServiceStream<K> { type Key = K; type Service = Connection; type Error = crate::Error; fn poll_discover( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> { loop { if let Some((key, connecting)) = &mut self.connecting { let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?; let key = key.to_owned(); self.connecting = None; let change = Ok(Change::Insert(key, svc)); return Poll::Ready(change); }; let c = &mut self.changes; match Pin::new(&mut *c).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { return Poll::Pending; } Poll::Ready(Some(change)) => match change { Change::Insert(k, endpoint) => { let mut http = hyper::client::connect::HttpConnector::new(); http.set_nodelay(endpoint.tcp_nodelay); http.set_keepalive(endpoint.tcp_keepalive); http.enforce_http(false); #[cfg(feature = "tls")] let connector = service::connector(http, endpoint.tls.clone()); #[cfg(not(feature = "tls"))] let connector = service::connector(http); let fut = Connection::connect(connector, endpoint); self.connecting = Some((k, Box::pin(fut))); continue; } Change::Remove(k) => return Poll::Ready(Ok(Change::Remove(k))), }, } } } } impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}