use failure::format_err;
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
use jsonrpc_core::Id;
use jsonrpc_pubsub::SubscriptionId;
use log::debug;
use serde_json::Value;
use std::collections::HashMap;
use std::collections::VecDeque;
use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage};
struct Subscription {
id: Option<SubscriptionId>,
notification: String,
unsubscribe: String,
channel: mpsc::Sender<Result<Value, RpcError>>,
}
impl Subscription {
fn new(channel: mpsc::Sender<Result<Value, RpcError>>, notification: String, unsubscribe: String) -> Self {
Subscription {
id: None,
notification,
unsubscribe,
channel,
}
}
}
enum PendingRequest {
Call(oneshot::Sender<Result<Value, RpcError>>),
Subscription(Subscription),
}
pub struct Duplex<TSink, TStream> {
request_builder: RequestBuilder,
channel: Option<mpsc::Receiver<RpcMessage>>,
pending_requests: HashMap<Id, PendingRequest>,
subscriptions: HashMap<(SubscriptionId, String), Subscription>,
stream: TStream,
incoming: VecDeque<(Id, Result<Value, RpcError>, Option<String>, Option<SubscriptionId>)>,
outgoing: VecDeque<String>,
sink: TSink,
}
impl<TSink, TStream> Duplex<TSink, TStream> {
fn new(sink: TSink, stream: TStream, channel: mpsc::Receiver<RpcMessage>) -> Self {
log::debug!("open");
Duplex {
request_builder: RequestBuilder::new(),
channel: Some(channel),
pending_requests: Default::default(),
subscriptions: Default::default(),
stream,
incoming: Default::default(),
outgoing: Default::default(),
sink,
}
}
}
pub fn duplex<TSink, TStream>(sink: TSink, stream: TStream) -> (Duplex<TSink, TStream>, RpcChannel) {
let (sender, receiver) = mpsc::channel(0);
let client = Duplex::new(sink, stream, receiver);
(client, sender.into())
}
impl<TSink, TStream> Future for Duplex<TSink, TStream>
where
TSink: Sink<SinkItem = String, SinkError = RpcError>,
TStream: Stream<Item = String, Error = RpcError>,
{
type Item = ();
type Error = RpcError;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
log::debug!("handle requests from client");
loop {
let channel = match self.channel.as_mut() {
Some(channel) => channel,
None => break,
};
let msg = match channel.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) => {
self.channel.take();
break;
}
Ok(Async::NotReady) => break,
Err(()) => continue,
};
let request_str = match msg {
RpcMessage::Call(msg) => {
let (id, request_str) = self.request_builder.call_request(&msg);
if self
.pending_requests
.insert(id.clone(), PendingRequest::Call(msg.sender))
.is_some()
{
log::error!("reuse of request id {:?}", id);
}
request_str
}
RpcMessage::Subscribe(msg) => {
let crate::Subscription {
subscribe,
subscribe_params,
notification,
unsubscribe,
} = msg.subscription;
let (id, request_str) = self.request_builder.subscribe_request(subscribe, subscribe_params);
log::debug!("subscribing to {}", notification);
let subscription = Subscription::new(msg.sender, notification, unsubscribe);
if self
.pending_requests
.insert(id.clone(), PendingRequest::Subscription(subscription))
.is_some()
{
log::error!("reuse of request id {:?}", id);
}
request_str
}
RpcMessage::Notify(msg) => self.request_builder.notification(&msg),
};
log::debug!("outgoing: {}", request_str);
self.outgoing.push_back(request_str);
}
log::debug!("handle stream");
loop {
let response_str = match self.stream.poll() {
Ok(Async::Ready(Some(response_str))) => response_str,
Ok(Async::Ready(None)) => {
debug!("connection closed");
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => break,
Err(err) => Err(err)?,
};
log::debug!("incoming: {}", response_str);
let (id, result, method, sid) = super::parse_response(&response_str)?;
log::debug!(
"id: {:?} (sid: {:?}) result: {:?} method: {:?}",
id,
sid,
result,
method
);
self.incoming.push_back((id, result, method, sid));
}
log::debug!("handle incoming");
loop {
match self.incoming.pop_front() {
Some((id, result, method, sid)) => {
let sid_and_method = sid.and_then(|sid| method.map(|method| (sid, method)));
match self.pending_requests.remove(&id) {
Some(PendingRequest::Call(tx)) => {
tx.send(result)
.map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?;
continue;
}
Some(PendingRequest::Subscription(mut subscription)) => {
let sid = result.as_ref().ok().and_then(|res| SubscriptionId::parse_value(res));
let method = subscription.notification.clone();
if let Some(sid) = sid {
subscription.id = Some(sid.clone());
if self
.subscriptions
.insert((sid.clone(), method.clone()), subscription)
.is_some()
{
log::warn!(
"Overwriting existing subscription under {:?} ({:?}). \
Seems that server returned the same subscription id.",
sid,
method,
);
}
} else {
let err = RpcError::Other(format_err!(
"Subscription {:?} ({:?}) rejected: {:?}",
id,
method,
result,
));
match subscription.channel.poll_ready() {
Ok(Async::Ready(())) => {
subscription
.channel
.try_send(result)
.expect("The channel is ready; qed");
}
Ok(Async::NotReady) => {
self.incoming.push_back((id, result, Some(method), sid));
break;
}
Err(_) => {
log::warn!("{}, but the reply channel has closed.", err);
}
};
}
continue;
}
None if sid_and_method.is_none() => {
log::warn!("Got unexpected response with id {:?} ({:?})", id, sid_and_method);
continue;
}
None => {}
};
let sid_and_method = if let Some(x) = sid_and_method {
x
} else {
continue;
};
if let Some(subscription) = self.subscriptions.get_mut(&sid_and_method) {
match subscription.channel.poll_ready() {
Ok(Async::Ready(())) => {
subscription
.channel
.try_send(result)
.expect("The channel is ready; qed");
}
Ok(Async::NotReady) => {
let (sid, method) = sid_and_method;
self.incoming.push_back((id, result, Some(method), Some(sid)));
break;
}
Err(_) => {
let subscription = self
.subscriptions
.remove(&sid_and_method)
.expect("Subscription was just polled; qed");
let sid = subscription.id.expect(
"Every subscription that ends up in `self.subscriptions` has id already \
assigned; assignment happens during response to subscribe request.",
);
let (_id, request_str) =
self.request_builder.unsubscribe_request(subscription.unsubscribe, sid);
log::debug!("outgoing: {}", request_str);
self.outgoing.push_back(request_str);
log::debug!("unsubscribed from {:?}", sid_and_method);
}
}
} else {
log::warn!("Received unexpected subscription notification: {:?}", sid_and_method);
}
}
None => break,
}
}
log::debug!("handle outgoing");
loop {
match self.outgoing.pop_front() {
Some(request) => match self.sink.start_send(request)? {
AsyncSink::Ready => {}
AsyncSink::NotReady(request) => {
self.outgoing.push_front(request);
break;
}
},
None => break,
}
}
log::debug!("handle sink");
let sink_empty = match self.sink.poll_complete()? {
Async::Ready(()) => true,
Async::NotReady => false,
};
log::debug!("{:?}", self);
if self.channel.is_none()
&& self.outgoing.is_empty()
&& self.incoming.is_empty()
&& self.pending_requests.is_empty()
&& self.subscriptions.is_empty()
&& sink_empty
{
log::debug!("close");
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
impl<TSink, TStream> std::fmt::Debug for Duplex<TSink, TStream> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(f, "channel is none: {}", self.channel.is_none())?;
writeln!(f, "outgoing: {}", self.outgoing.len())?;
writeln!(f, "incoming: {}", self.incoming.len())?;
writeln!(f, "pending_requests: {}", self.pending_requests.len())?;
writeln!(f, "subscriptions: {}", self.subscriptions.len())?;
Ok(())
}
}