use super::list;
use futures::Poll;
use loom::{
futures::AtomicTask,
sync::atomic::AtomicUsize,
sync::{Arc, CausalCell},
};
use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
pub(crate) struct Tx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
permit: S::Permit,
}
impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
where
S::Permit: fmt::Debug,
S: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Tx")
.field("inner", &self.inner)
.field("permit", &self.permit)
.finish()
}
}
pub(crate) struct Rx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
}
impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
where
S: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Rx").field("inner", &self.inner).finish()
}
}
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum TrySendError {
Closed,
NoPermits,
}
pub(crate) trait Semaphore {
type Permit;
fn new_permit() -> Self::Permit;
fn drop_permit(&self, permit: &mut Self::Permit);
fn is_idle(&self) -> bool;
fn add_permit(&self);
fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
fn forget(&self, permit: &mut Self::Permit);
fn close(&self);
}
struct Chan<T, S> {
tx: list::Tx<T>,
semaphore: S,
rx_task: AtomicTask,
tx_count: AtomicUsize,
rx_fields: CausalCell<RxFields<T>>,
}
impl<T, S> fmt::Debug for Chan<T, S>
where
S: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
.field("rx_task", &self.rx_task)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
}
}
struct RxFields<T> {
list: list::Rx<T>,
rx_closed: bool,
}
impl<T> fmt::Debug for RxFields<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("RxFields")
.field("list", &self.list)
.field("rx_closed", &self.rx_closed)
.finish()
}
}
unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
where
S: Semaphore,
{
let (tx, rx) = list::channel();
let chan = Arc::new(Chan {
tx,
semaphore,
rx_task: AtomicTask::new(),
tx_count: AtomicUsize::new(1),
rx_fields: CausalCell::new(RxFields {
list: rx,
rx_closed: false,
}),
});
(Tx::new(chan.clone()), Rx::new(chan))
}
impl<T, S> Tx<T, S>
where
S: Semaphore,
{
fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
Tx {
inner: chan,
permit: S::new_permit(),
}
}
pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
self.inner.semaphore.poll_acquire(&mut self.permit)
}
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) {
return Err((value, e));
}
self.inner.tx.push(value);
self.inner.rx_task.notify();
self.inner.semaphore.forget(&mut self.permit);
Ok(())
}
}
impl<T, S> Clone for Tx<T, S>
where
S: Semaphore,
{
fn clone(&self) -> Tx<T, S> {
self.inner.tx_count.fetch_add(1, Relaxed);
Tx {
inner: self.inner.clone(),
permit: S::new_permit(),
}
}
}
impl<T, S> Drop for Tx<T, S>
where
S: Semaphore,
{
fn drop(&mut self) {
self.inner.semaphore.drop_permit(&mut self.permit);
if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
return;
}
self.inner.tx.close();
self.inner.rx_task.notify();
}
}
impl<T, S> Rx<T, S>
where
S: Semaphore,
{
fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
Rx { inner: chan }
}
pub(crate) fn close(&mut self) {
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
if rx_fields.rx_closed {
return;
}
rx_fields.rx_closed = true;
});
self.inner.semaphore.close();
}
pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
use super::block::Read::*;
use futures::Async::*;
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
macro_rules! try_recv {
() => {
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
return Ok(Ready(Some(value)));
}
Some(Closed) => {
assert!(self.inner.semaphore.is_idle());
return Ok(Ready(None));
}
None => {}
}
};
}
try_recv!();
self.inner.rx_task.register();
try_recv!();
debug!(
"recv; rx_closed = {:?}; is_idle = {:?}",
rx_fields.rx_closed,
self.inner.semaphore.is_idle()
);
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ok(Ready(None))
} else {
Ok(NotReady)
}
})
}
}
impl<T, S> Drop for Rx<T, S>
where
S: Semaphore,
{
fn drop(&mut self) {
use super::block::Read::Value;
self.close();
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
self.inner.semaphore.add_permit();
}
})
}
}
impl<T, S> Drop for Chan<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
self.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
unsafe { rx_fields.list.free_blocks() };
});
}
}
use semaphore::TryAcquireError;
impl From<TryAcquireError> for TrySendError {
fn from(src: TryAcquireError) -> TrySendError {
if src.is_closed() {
TrySendError::Closed
} else if src.is_no_permits() {
TrySendError::NoPermits
} else {
unreachable!();
}
}
}
use semaphore::Permit;
impl Semaphore for (::semaphore::Semaphore, usize) {
type Permit = Permit;
fn new_permit() -> Permit {
Permit::new()
}
fn drop_permit(&self, permit: &mut Permit) {
permit.release(&self.0);
}
fn add_permit(&self) {
self.0.add_permits(1)
}
fn is_idle(&self) -> bool {
self.0.available_permits() == self.1
}
fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
permit.poll_acquire(&self.0).map_err(|_| ())
}
fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
permit.try_acquire(&self.0)?;
Ok(())
}
fn forget(&self, permit: &mut Self::Permit) {
permit.forget()
}
fn close(&self) {
self.0.close();
}
}
use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;
impl Semaphore for AtomicUsize {
type Permit = ();
fn new_permit() {}
fn drop_permit(&self, _permit: &mut ()) {}
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
if prev >> 1 == 0 {
process::abort();
}
}
fn is_idle(&self) -> bool {
self.load(Acquire) >> 1 == 0
}
fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
use futures::Async::Ready;
self.try_acquire(permit).map(Ready).map_err(|_| ())
}
fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
let mut curr = self.load(Acquire);
loop {
if curr & 1 == 1 {
return Err(TrySendError::Closed);
}
if curr == usize::MAX ^ 1 {
process::abort()
}
match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
Ok(_) => return Ok(()),
Err(actual) => {
curr = actual;
}
}
}
}
fn forget(&self, _permit: &mut ()) {}
fn close(&self) {
self.fetch_or(1, Release);
}
}