1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::PollIo;
use futures::{channel::mpsc, task::*, Sink, Stream};
use pin_project::pin_project;
use std::io;
use std::pin::Pin;
pub fn unbounded<SinkItem, Item>() -> (
UnboundedChannel<SinkItem, Item>,
UnboundedChannel<Item, SinkItem>,
) {
let (tx1, rx2) = mpsc::unbounded();
let (tx2, rx1) = mpsc::unbounded();
(
UnboundedChannel { tx: tx1, rx: rx1 },
UnboundedChannel { tx: tx2, rx: rx2 },
)
}
#[pin_project]
#[derive(Debug)]
pub struct UnboundedChannel<Item, SinkItem> {
#[pin]
rx: mpsc::UnboundedReceiver<Item>,
#[pin]
tx: mpsc::UnboundedSender<SinkItem>,
}
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
type Item = Result<Item, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
self.project().rx.poll_next(cx).map(|option| option.map(Ok))
}
}
impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project()
.tx
.poll_ready(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.project()
.tx
.start_send(item)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project()
.tx
.poll_flush(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project()
.tx
.poll_close(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
}
}
#[cfg(test)]
mod tests {
use crate::{
client, context,
server::{Handler, Server},
transport,
};
use assert_matches::assert_matches;
use futures::{prelude::*, stream};
use log::trace;
use std::io;
#[cfg(feature = "tokio1")]
#[tokio::test]
async fn integration() -> io::Result<()> {
let _ = env_logger::try_init();
let (client_channel, server_channel) = transport::channel::unbounded();
tokio::spawn(
Server::default()
.incoming(stream::once(future::ready(server_channel)))
.respond_with(|_ctx, request: String| {
future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("{:?} is not an int", request),
)
}))
}),
);
let mut client = client::new(client::Config::default(), client_channel).spawn()?;
let response1 = client.call(context::current(), "123".into()).await?;
let response2 = client.call(context::current(), "abc".into()).await?;
trace!("response1: {:?}, response2: {:?}", response1, response2);
assert_matches!(response1, Ok(123));
assert_matches!(response2, Err(ref e) if e.kind() == io::ErrorKind::InvalidInput);
Ok(())
}
}