1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
use super::{layer::ServiceBuilderExt, reconnect::Reconnect, AddOrigin}; use crate::{body::BoxBody, transport::Endpoint}; use http::Uri; use hyper::client::conn::Builder; use hyper::client::connect::Connection as HyperConnection; use hyper::client::service::Connect as HyperConnect; use std::{ fmt, future::Future, pin::Pin, task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite}; use tower::{ layer::Layer, limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer}, timeout::TimeoutLayer, util::BoxService, ServiceBuilder, ServiceExt, }; use tower_load::Load; use tower_service::Service; pub(crate) type Request = http::Request<BoxBody>; pub(crate) type Response = http::Response<hyper::Body>; pub(crate) struct Connection { inner: BoxService<Request, Response, crate::Error>, } impl Connection { pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error> where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { let mut settings = Builder::new() .http2_initial_stream_window_size(endpoint.init_stream_window_size) .http2_initial_connection_window_size(endpoint.init_connection_window_size) .http2_only(true) .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) .clone(); if let Some(val) = endpoint.http2_keep_alive_timeout { settings.http2_keep_alive_timeout(val); } if let Some(val) = endpoint.http2_keep_alive_while_idle { settings.http2_keep_alive_while_idle(val); } let settings = settings.clone(); let stack = ServiceBuilder::new() .layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone())) .optional_layer(endpoint.timeout.map(TimeoutLayer::new)) .optional_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new)) .optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); let connector = HyperConnect::new(connector, settings); let conn = Reconnect::new(connector, endpoint.uri.clone()); let inner = stack.layer(conn); Ok(Self { inner: BoxService::new(inner), }) } pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error> where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { Self::new(connector, endpoint)?.ready_oneshot().await } } impl Service<Request> for Connection { type Response = Response; type Error = crate::Error; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Service::poll_ready(&mut self.inner, cx).map_err(Into::into) } fn call(&mut self, req: Request) -> Self::Future { self.inner.call(req) } } impl Load for Connection { type Metric = usize; fn load(&self) -> Self::Metric { 0 } } impl fmt::Debug for Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() } }