#![deny(missing_docs)]
use super::p2c::Balance;
use crate::error;
use futures_core::ready;
use pin_project::pin_project;
use slab::Slab;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_make::MakeService;
use tower_service::Service;
#[cfg(test)]
mod test;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum Level {
Low,
Normal,
High,
}
#[pin_project]
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
}
impl<MS, Target, Request> fmt::Debug for PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request> + fmt::Debug,
Target: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolDiscoverer")
.field("maker", &self.maker)
.field("making", &self.making.is_some())
.field("target", &self.target)
.field("load", &self.load)
.field("services", &self.services)
.field("limit", &self.limit)
.finish()
}
}
impl<MS, Target, Request> Discover for PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
type Key = usize;
type Service = DropNotifyService<MS::Service>;
type Error = MS::MakeError;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
let mut this = self.project();
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) {
this.services.remove(sid);
tracing::trace!(
pool.services = this.services.len(),
message = "removing dropped service"
);
}
if this.services.len() == 0 && this.making.is_none() {
let _ = ready!(this.maker.poll_ready(cx))?;
tracing::trace!("construct initial pool connection");
this.making
.set(Some(this.maker.make_service(this.target.clone())));
}
if let Level::High = this.load {
if this.making.is_none() {
if this
.limit
.map(|limit| this.services.len() >= limit)
.unwrap_or(false)
{
return Poll::Pending;
}
tracing::trace!(
pool.services = this.services.len(),
message = "decided to add service to loaded pool"
);
ready!(this.maker.poll_ready(cx))?;
tracing::trace!("making new service");
this.making
.set(Some(this.maker.make_service(this.target.clone())));
}
}
if let Some(fut) = this.making.as_mut().as_pin_mut() {
let svc = ready!(fut.poll(cx))?;
this.making.set(None);
let id = this.services.insert(());
let svc = DropNotifyService {
svc,
id,
notify: this.died_tx.clone(),
};
tracing::trace!(
pool.services = this.services.len(),
message = "finished creating new service"
);
*this.load = Level::Normal;
return Poll::Ready(Ok(Change::Insert(id, svc)));
}
match this.load {
Level::High => {
unreachable!("found high load but no Service being made");
}
Level::Normal => Poll::Pending,
Level::Low if this.services.len() == 1 => Poll::Pending,
Level::Low => {
*this.load = Level::Normal;
let rm = this.services.iter().next().unwrap().0;
tracing::trace!(
pool.services = this.services.len(),
message = "removing service for over-provisioned pool"
);
Poll::Ready(Ok(Change::Remove(rm)))
}
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct Builder {
low: f64,
high: f64,
init: f64,
alpha: f64,
limit: Option<usize>,
}
impl Default for Builder {
fn default() -> Self {
Builder {
init: 0.1,
low: 0.00001,
high: 0.2,
alpha: 0.03,
limit: None,
}
}
}
impl Builder {
pub fn new() -> Self {
Self::default()
}
pub fn underutilized_below(&mut self, low: f64) -> &mut Self {
self.low = low;
self
}
pub fn loaded_above(&mut self, high: f64) -> &mut Self {
self.high = high;
self
}
pub fn initial(&mut self, init: f64) -> &mut Self {
self.init = init;
self
}
pub fn urgency(&mut self, alpha: f64) -> &mut Self {
self.alpha = alpha.max(0.0).min(1.0);
self
}
pub fn max_services(&mut self, limit: Option<usize>) -> &mut Self {
self.limit = limit;
self
}
pub fn build<MS, Target, Request>(
&self,
make_service: MS,
target: Target,
) -> Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
let (died_tx, died_rx) = tokio::sync::mpsc::unbounded_channel();
let d = PoolDiscoverer {
maker: make_service,
making: None,
target,
load: Level::Normal,
services: Slab::new(),
died_tx,
died_rx,
limit: self.limit,
};
Pool {
balance: Balance::from_entropy(Box::pin(d)),
options: *self,
ewma: self.init,
}
}
}
pub struct Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
balance: Balance<Pin<Box<PoolDiscoverer<MS, Target, Request>>>, Request>,
options: Builder,
ewma: f64,
}
impl<MS, Target, Request> fmt::Debug for Pool<MS, Target, Request>
where
MS: MakeService<Target, Request> + fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone + fmt::Debug,
MS::Service: fmt::Debug,
Request: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pool")
.field("balance", &self.balance)
.field("options", &self.options)
.field("ewma", &self.ewma)
.finish()
}
}
impl<MS, Target, Request> Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
pub fn new(make_service: MS, target: Target) -> Self {
Builder::new().build(make_service, target)
}
}
type PinBalance<S, Request> = Balance<Pin<Box<S>>, Request>;
impl<MS, Target, Req> Service<Req> for Pool<MS, Target, Req>
where
MS: MakeService<Target, Req>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
type Response = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
type Error = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error;
type Future = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Poll::Ready(()) = self.balance.poll_ready(cx)? {
self.ewma = (1.0 - self.options.alpha) * self.ewma;
let discover = self.balance.discover_mut().as_mut().project();
if self.ewma < self.options.low {
if *discover.load != Level::Low {
tracing::trace!({ ewma = %self.ewma }, "pool is over-provisioned");
}
*discover.load = Level::Low;
if discover.services.len() > 1 {
self.ewma = self.options.init;
}
} else {
if *discover.load != Level::Normal {
tracing::trace!({ ewma = %self.ewma }, "pool is appropriately provisioned");
}
*discover.load = Level::Normal;
}
return Poll::Ready(Ok(()));
}
let discover = self.balance.discover_mut().as_mut().project();
if discover.making.is_none() {
self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma;
if self.ewma > self.options.high {
if *discover.load != Level::High {
tracing::trace!({ ewma = %self.ewma }, "pool is under-provisioned");
}
*discover.load = Level::High;
self.ewma = self.options.high;
return self.balance.poll_ready(cx);
} else {
*discover.load = Level::Normal;
}
}
Poll::Pending
}
fn call(&mut self, req: Req) -> Self::Future {
self.balance.call(req)
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct DropNotifyService<Svc> {
svc: Svc,
id: usize,
notify: tokio::sync::mpsc::UnboundedSender<usize>,
}
impl<Svc> Drop for DropNotifyService<Svc> {
fn drop(&mut self) {
let _ = self.notify.send(self.id).is_ok();
}
}
impl<Svc: Load> Load for DropNotifyService<Svc> {
type Metric = Svc::Metric;
fn load(&self) -> Self::Metric {
self.svc.load()
}
}
impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc> {
type Response = Svc::Response;
type Future = Svc::Future;
type Error = Svc::Error;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.svc.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
self.svc.call(req)
}
}