use crate::{
context, trace, util::Compact, util::TimeUntil, ClientMessage, PollIo, Request, Response,
ServerError, Transport,
};
use fnv::FnvHashMap;
use futures::{
channel::mpsc,
future::{AbortHandle, AbortRegistration, Abortable},
prelude::*,
ready,
stream::Fuse,
task::*,
};
use humantime::format_rfc3339;
use log::{debug, trace};
use pin_project::pin_project;
use std::{fmt, hash::Hash, io, marker::PhantomData, pin::Pin, time::SystemTime};
use tokio::time::Timeout;
mod filter;
#[cfg(test)]
mod testing;
mod throttle;
pub use self::{
filter::ChannelFilter,
throttle::{Throttler, ThrottlerStream},
};
#[derive(Debug)]
pub struct Server<Req, Resp> {
config: Config,
ghost: PhantomData<(Req, Resp)>,
}
impl<Req, Resp> Default for Server<Req, Resp> {
fn default() -> Self {
new(Config::default())
}
}
#[derive(Clone, Debug)]
pub struct Config {
pub pending_response_buffer: usize,
}
impl Default for Config {
fn default() -> Self {
Config {
pending_response_buffer: 100,
}
}
}
impl Config {
pub fn channel<Req, Resp, T>(self, transport: T) -> BaseChannel<Req, Resp, T>
where
T: Transport<Response<Resp>, ClientMessage<Req>>,
{
BaseChannel::new(self, transport)
}
}
pub fn new<Req, Resp>(config: Config) -> Server<Req, Resp> {
Server {
config,
ghost: PhantomData,
}
}
impl<Req, Resp> Server<Req, Resp> {
pub fn config(&self) -> &Config {
&self.config
}
pub fn incoming<S, T>(self, listener: S) -> impl Stream<Item = BaseChannel<Req, Resp, T>>
where
S: Stream<Item = T>,
T: Transport<Response<Resp>, ClientMessage<Req>>,
{
listener.map(move |t| BaseChannel::new(self.config.clone(), t))
}
}
pub trait Serve<Req>: Sized + Clone {
type Resp;
type Fut: Future<Output = Self::Resp>;
fn serve(self, ctx: context::Context, req: Req) -> Self::Fut;
}
impl<Req, Resp, Fut, F> Serve<Req> for F
where
F: FnOnce(context::Context, Req) -> Fut + Clone,
Fut: Future<Output = Resp>,
{
type Resp = Resp;
type Fut = Fut;
fn serve(self, ctx: context::Context, req: Req) -> Self::Fut {
self(ctx, req)
}
}
pub trait Handler<C>
where
Self: Sized + Stream<Item = C>,
C: Channel,
{
fn max_channels_per_key<K, KF>(self, n: u32, keymaker: KF) -> filter::ChannelFilter<Self, K, KF>
where
K: fmt::Display + Eq + Hash + Clone + Unpin,
KF: Fn(&C) -> K,
{
ChannelFilter::new(self, n, keymaker)
}
fn max_concurrent_requests_per_channel(self, n: usize) -> ThrottlerStream<Self> {
ThrottlerStream::new(self, n)
}
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
fn respond_with<S>(self, server: S) -> Running<Self, S>
where
S: Serve<C::Req, Resp = C::Resp>,
{
Running {
incoming: self,
server,
}
}
}
impl<S, C> Handler<C> for S
where
S: Sized + Stream<Item = C>,
C: Channel,
{
}
#[pin_project]
#[derive(Debug)]
pub struct BaseChannel<Req, Resp, T> {
config: Config,
#[pin]
transport: Fuse<T>,
in_flight_requests: FnvHashMap<u64, AbortHandle>,
ghost: PhantomData<(Req, Resp)>,
}
impl<Req, Resp, T> BaseChannel<Req, Resp, T>
where
T: Transport<Response<Resp>, ClientMessage<Req>>,
{
pub fn new(config: Config, transport: T) -> Self {
BaseChannel {
config,
transport: transport.fuse(),
in_flight_requests: FnvHashMap::default(),
ghost: PhantomData,
}
}
pub fn with_defaults(transport: T) -> Self {
Self::new(Config::default(), transport)
}
pub fn get_ref(&self) -> &T {
self.transport.get_ref()
}
pub fn get_pin_ref(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().transport.get_pin_mut()
}
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
if let Some(cancel_handle) = self
.as_mut()
.project()
.in_flight_requests
.remove(&request_id)
{
self.as_mut().project().in_flight_requests.compact(0.1);
cancel_handle.abort();
let remaining = self.as_mut().project().in_flight_requests.len();
trace!(
"[{}] Request canceled. In-flight requests = {}",
trace_context.trace_id,
remaining,
);
} else {
trace!(
"[{}] Received cancellation, but response handler \
is already complete.",
trace_context.trace_id,
);
}
}
}
pub trait Channel
where
Self: Transport<Response<<Self as Channel>::Resp>, Request<<Self as Channel>::Req>>,
{
type Req;
type Resp;
fn config(&self) -> &Config;
fn in_flight_requests(self: Pin<&mut Self>) -> usize;
fn max_concurrent_requests(self, n: usize) -> Throttler<Self>
where
Self: Sized,
{
Throttler::new(self, n)
}
fn start_request(self: Pin<&mut Self>, request_id: u64) -> AbortRegistration;
fn respond_with<S>(self, server: S) -> ClientHandler<Self, S>
where
S: Serve<Self::Req, Resp = Self::Resp>,
Self: Sized,
{
let (responses_tx, responses) = mpsc::channel(self.config().pending_response_buffer);
let responses = responses.fuse();
ClientHandler {
channel: self,
server,
pending_responses: responses,
responses_tx,
}
}
}
impl<Req, Resp, T> Stream for BaseChannel<Req, Resp, T>
where
T: Transport<Response<Resp>, ClientMessage<Req>>,
{
type Item = io::Result<Request<Req>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
match ready!(self.as_mut().project().transport.poll_next(cx)?) {
Some(message) => match message {
ClientMessage::Request(request) => {
return Poll::Ready(Some(Ok(request)));
}
ClientMessage::Cancel {
trace_context,
request_id,
} => {
self.as_mut().cancel_request(&trace_context, request_id);
}
},
None => return Poll::Ready(None),
}
}
}
}
impl<Req, Resp, T> Sink<Response<Resp>> for BaseChannel<Req, Resp, T>
where
T: Transport<Response<Resp>, ClientMessage<Req>>,
{
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.project().transport.poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> Result<(), Self::Error> {
if self
.as_mut()
.project()
.in_flight_requests
.remove(&response.request_id)
.is_some()
{
self.as_mut().project().in_flight_requests.compact(0.1);
}
self.project().transport.start_send(response)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.project().transport.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.project().transport.poll_close(cx)
}
}
impl<Req, Resp, T> AsRef<T> for BaseChannel<Req, Resp, T> {
fn as_ref(&self) -> &T {
self.transport.get_ref()
}
}
impl<Req, Resp, T> Channel for BaseChannel<Req, Resp, T>
where
T: Transport<Response<Resp>, ClientMessage<Req>>,
{
type Req = Req;
type Resp = Resp;
fn config(&self) -> &Config {
&self.config
}
fn in_flight_requests(mut self: Pin<&mut Self>) -> usize {
self.as_mut().project().in_flight_requests.len()
}
fn start_request(self: Pin<&mut Self>, request_id: u64) -> AbortRegistration {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
assert!(self
.project()
.in_flight_requests
.insert(request_id, abort_handle)
.is_none());
abort_registration
}
}
#[pin_project]
#[derive(Debug)]
pub struct ClientHandler<C, S>
where
C: Channel,
{
#[pin]
channel: C,
#[pin]
pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<C::Resp>)>>,
#[pin]
responses_tx: mpsc::Sender<(context::Context, Response<C::Resp>)>,
server: S,
}
impl<C, S> ClientHandler<C, S>
where
C: Channel,
S: Serve<C::Req, Resp = C::Resp>,
{
pub fn get_pin_channel(self: Pin<&mut Self>) -> Pin<&mut C> {
self.project().channel
}
fn pump_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<RequestHandler<S::Fut, C::Resp>> {
match ready!(self.as_mut().project().channel.poll_next(cx)?) {
Some(request) => Poll::Ready(Some(Ok(self.handle_request(request)))),
None => Poll::Ready(None),
}
}
fn pump_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
read_half_closed: bool,
) -> PollIo<()> {
match self.as_mut().poll_next_response(cx)? {
Poll::Ready(Some((ctx, response))) => {
trace!(
"[{}] Staging response. In-flight requests = {}.",
ctx.trace_id(),
self.as_mut().project().channel.in_flight_requests(),
);
self.as_mut().project().channel.start_send(response)?;
Poll::Ready(Some(Ok(())))
}
Poll::Ready(None) => {
ready!(self.as_mut().project().channel.poll_flush(cx)?);
Poll::Ready(None)
}
Poll::Pending => {
ready!(self.as_mut().project().channel.poll_flush(cx)?);
if read_half_closed && self.as_mut().project().channel.in_flight_requests() == 0 {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
}
fn poll_next_response(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<(context::Context, Response<C::Resp>)> {
while let Poll::Pending = self.as_mut().project().channel.poll_ready(cx)? {
ready!(self.as_mut().project().channel.poll_flush(cx)?);
}
match ready!(self.as_mut().project().pending_responses.poll_next(cx)) {
Some((ctx, response)) => Poll::Ready(Some(Ok((ctx, response)))),
None => {
Poll::Ready(None)
}
}
}
fn handle_request(
mut self: Pin<&mut Self>,
request: Request<C::Req>,
) -> RequestHandler<S::Fut, C::Resp> {
let request_id = request.id;
let deadline = request.context.deadline;
let timeout = deadline.time_until();
trace!(
"[{}] Received request with deadline {} (timeout {:?}).",
request.context.trace_id(),
format_rfc3339(deadline),
timeout,
);
let ctx = request.context;
let request = request.message;
let response = self.as_mut().project().server.clone().serve(ctx, request);
let response = Resp {
state: RespState::PollResp,
request_id,
ctx,
deadline,
f: tokio::time::timeout(timeout, response),
response: None,
response_tx: self.as_mut().project().responses_tx.clone(),
};
let abort_registration = self.as_mut().project().channel.start_request(request_id);
RequestHandler {
resp: Abortable::new(response, abort_registration),
}
}
}
#[pin_project]
#[derive(Debug)]
pub struct RequestHandler<F, R> {
#[pin]
resp: Abortable<Resp<F, R>>,
}
impl<F, R> Future for RequestHandler<F, R>
where
F: Future<Output = R>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let _ = ready!(self.project().resp.poll(cx));
Poll::Ready(())
}
}
#[pin_project]
#[derive(Debug)]
struct Resp<F, R> {
state: RespState,
request_id: u64,
ctx: context::Context,
deadline: SystemTime,
#[pin]
f: Timeout<F>,
response: Option<Response<R>>,
#[pin]
response_tx: mpsc::Sender<(context::Context, Response<R>)>,
}
#[derive(Debug)]
#[allow(clippy::enum_variant_names)]
enum RespState {
PollResp,
PollReady,
PollFlush,
}
impl<F, R> Future for Resp<F, R>
where
F: Future<Output = R>,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.as_mut().project().state {
RespState::PollResp => {
let result = ready!(self.as_mut().project().f.poll(cx));
*self.as_mut().project().response = Some(Response {
request_id: self.request_id,
message: match result {
Ok(message) => Ok(message),
Err(tokio::time::error::Elapsed { .. }) => {
debug!(
"[{}] Response did not complete before deadline of {}s.",
self.ctx.trace_id(),
format_rfc3339(self.deadline)
);
Err(ServerError {
kind: io::ErrorKind::TimedOut,
detail: Some(format!(
"Response did not complete before deadline of {}s.",
format_rfc3339(self.deadline)
)),
})
}
},
});
*self.as_mut().project().state = RespState::PollReady;
}
RespState::PollReady => {
let ready = ready!(self.as_mut().project().response_tx.poll_ready(cx));
if ready.is_err() {
return Poll::Ready(());
}
let resp = (self.ctx, self.as_mut().project().response.take().unwrap());
if self
.as_mut()
.project()
.response_tx
.start_send(resp)
.is_err()
{
return Poll::Ready(());
}
*self.as_mut().project().state = RespState::PollFlush;
}
RespState::PollFlush => {
let ready = ready!(self.as_mut().project().response_tx.poll_flush(cx));
if ready.is_err() {
return Poll::Ready(());
}
return Poll::Ready(());
}
}
}
}
}
impl<C, S> Stream for ClientHandler<C, S>
where
C: Channel,
S: Serve<C::Req, Resp = C::Resp>,
{
type Item = io::Result<RequestHandler<S::Fut, C::Resp>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let read = self.as_mut().pump_read(cx)?;
let read_closed = matches!(read, Poll::Ready(None));
match (read, self.as_mut().pump_write(cx, read_closed)?) {
(Poll::Ready(None), Poll::Ready(None)) => {
return Poll::Ready(None);
}
(Poll::Ready(Some(request_handler)), _) => {
return Poll::Ready(Some(Ok(request_handler)));
}
(_, Poll::Ready(Some(()))) => {}
_ => {
return Poll::Pending;
}
}
}
}
}
impl<C, S> ClientHandler<C, S>
where
C: Channel + 'static,
C::Req: Send + 'static,
C::Resp: Send + 'static,
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
S::Fut: Send + 'static,
{
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub fn execute(self) -> impl Future<Output = ()> {
self.try_for_each(|request_handler| async {
tokio::spawn(request_handler);
Ok(())
})
.map_ok(|()| log::info!("ClientHandler finished."))
.unwrap_or_else(|e| log::info!("ClientHandler errored out: {}", e))
}
}
#[pin_project]
#[derive(Debug)]
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub struct Running<St, Se> {
#[pin]
incoming: St,
server: Se,
}
#[cfg(feature = "tokio1")]
impl<St, C, Se> Future for Running<St, Se>
where
St: Sized + Stream<Item = C>,
C: Channel + Send + 'static,
C::Req: Send + 'static,
C::Resp: Send + 'static,
Se: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
Se::Fut: Send + 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
while let Some(channel) = ready!(self.as_mut().project().incoming.poll_next(cx)) {
tokio::spawn(
channel
.respond_with(self.as_mut().project().server.clone())
.execute(),
);
}
log::info!("Server shutting down.");
Poll::Ready(())
}
}