1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
mod blocking;
mod blocking_state;
mod state;

pub(crate) use self::blocking::{Blocking, CanBlock};
use self::blocking_state::BlockingState;
use self::state::State;

use notifier::Notifier;
use pool::Pool;

use futures::executor::{self, Spawn};
use futures::{self, Async, Future};

use std::cell::{Cell, UnsafeCell};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use std::sync::Arc;
use std::{fmt, panic, ptr};

/// Harness around a future.
///
/// This also behaves as a node in the inbound work queue and the blocking
/// queue.
pub(crate) struct Task {
    /// Task lifecycle state
    state: AtomicUsize,

    /// Task blocking related state
    blocking: AtomicUsize,

    /// Next pointer in the queue of tasks pending blocking capacity.
    next_blocking: AtomicPtr<Task>,

    /// ID of the worker that polled this task first.
    ///
    /// This field can be a `Cell` because it's only accessed by the worker thread that is
    /// executing the task.
    ///
    /// The worker ID is represented by a `u32` rather than `usize` in order to save some space
    /// on 64-bit platforms.
    pub reg_worker: Cell<Option<u32>>,

    /// The key associated with this task in the `Slab` it was registered in.
    ///
    /// This field can be a `Cell` because it's only accessed by the worker thread that has
    /// registered the task.
    pub reg_index: Cell<usize>,

    /// Store the future at the head of the struct
    ///
    /// The future is dropped immediately when it transitions to Complete
    future: UnsafeCell<Option<Spawn<BoxFuture>>>,
}

#[derive(Debug)]
pub(crate) enum Run {
    Idle,
    Schedule,
    Complete,
}

type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send + 'static>;

// ===== impl Task =====

impl Task {
    /// Create a new `Task` as a harness for `future`.
    pub fn new(future: BoxFuture) -> Task {
        // Wrap the future with an execution context.
        let task_fut = executor::spawn(future);

        Task {
            state: AtomicUsize::new(State::new().into()),
            blocking: AtomicUsize::new(BlockingState::new().into()),
            next_blocking: AtomicPtr::new(ptr::null_mut()),
            reg_worker: Cell::new(None),
            reg_index: Cell::new(0),
            future: UnsafeCell::new(Some(task_fut)),
        }
    }

    /// Create a fake `Task` to be used as part of the intrusive mpsc channel
    /// algorithm.
    fn stub() -> Task {
        let future = Box::new(futures::empty()) as BoxFuture;
        let task_fut = executor::spawn(future);

        Task {
            state: AtomicUsize::new(State::stub().into()),
            blocking: AtomicUsize::new(BlockingState::new().into()),
            next_blocking: AtomicPtr::new(ptr::null_mut()),
            reg_worker: Cell::new(None),
            reg_index: Cell::new(0),
            future: UnsafeCell::new(Some(task_fut)),
        }
    }

    /// Execute the task returning `Run::Schedule` if the task needs to be
    /// scheduled again.
    pub fn run(&self, unpark: &Arc<Notifier>) -> Run {
        use self::State::*;

        // Transition task to running state. At this point, the task must be
        // scheduled.
        let actual: State = self
            .state
            .compare_and_swap(Scheduled.into(), Running.into(), AcqRel)
            .into();

        match actual {
            Scheduled => {}
            _ => panic!("unexpected task state; {:?}", actual),
        }

        trace!(
            "Task::run; state={:?}",
            State::from(self.state.load(Relaxed))
        );

        // The transition to `Running` done above ensures that a lock on the
        // future has been obtained.
        let fut = unsafe { &mut (*self.future.get()) };

        // This block deals with the future panicking while being polled.
        //
        // If the future panics, then the drop handler must be called such that
        // `thread::panicking() -> true`. To do this, the future is dropped from
        // within the catch_unwind block.
        let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
            struct Guard<'a>(&'a mut Option<Spawn<BoxFuture>>, bool);

            impl<'a> Drop for Guard<'a> {
                fn drop(&mut self) {
                    // This drops the future
                    if self.1 {
                        let _ = self.0.take();
                    }
                }
            }

            let mut g = Guard(fut, true);

            let ret =
                g.0.as_mut()
                    .unwrap()
                    .poll_future_notify(unpark, self as *const _ as usize);

            g.1 = false;

            ret
        }));

        match res {
            Ok(Ok(Async::Ready(_))) | Ok(Err(_)) | Err(_) => {
                trace!("    -> task complete");

                // The future has completed. Drop it immediately to free
                // resources and run drop handlers.
                //
                // The `Task` harness will stay around longer if it is contained
                // by any of the various queues.
                self.drop_future();

                // Transition to the completed state
                self.state.store(State::Complete.into(), Release);

                if let Err(panic_err) = res {
                    if let Some(ref f) = unpark.pool.config.panic_handler {
                        f(panic_err);
                    }
                }

                Run::Complete
            }
            Ok(Ok(Async::NotReady)) => {
                trace!("    -> not ready");

                // Attempt to transition from Running -> Idle, if successful,
                // then the task does not need to be scheduled again. If the CAS
                // fails, then the task has been unparked concurrent to running,
                // in which case it transitions immediately back to scheduled
                // and we return `true`.
                let prev: State = self
                    .state
                    .compare_and_swap(Running.into(), Idle.into(), AcqRel)
                    .into();

                match prev {
                    Running => Run::Idle,
                    Notified => {
                        self.state.store(Scheduled.into(), Release);
                        Run::Schedule
                    }
                    _ => unreachable!(),
                }
            }
        }
    }

    /// Aborts this task.
    ///
    /// This is called when the threadpool shuts down and the task has already beed polled but not
    /// completed.
    pub fn abort(&self) {
        use self::State::*;

        let mut state = self.state.load(Acquire).into();

        loop {
            match state {
                Idle | Scheduled => {}
                Running | Notified | Complete | Aborted => {
                    // It is assumed that no worker threads are running so the task must be either
                    // in the idle or scheduled state.
                    panic!("unexpected state while aborting task: {:?}", state);
                }
            }

            let actual = self
                .state
                .compare_and_swap(state.into(), Aborted.into(), AcqRel)
                .into();

            if actual == state {
                // The future has been aborted. Drop it immediately to free resources and run drop
                // handlers.
                self.drop_future();
                break;
            }

            state = actual;
        }
    }

    /// Notify the task
    pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) {
        if me.schedule() {
            let _ = pool.submit(me, pool);
        }
    }

    /// Notify the task it has been allocated blocking capacity
    pub fn notify_blocking(me: Arc<Task>, pool: &Arc<Pool>) {
        BlockingState::notify_blocking(&me.blocking, AcqRel);
        Task::notify(me, pool);
    }

    /// Transition the task state to scheduled.
    ///
    /// Returns `true` if the caller is permitted to schedule the task.
    pub fn schedule(&self) -> bool {
        use self::State::*;

        loop {
            // Scheduling can only be done from the `Idle` state.
            let actual = self
                .state
                .compare_and_swap(Idle.into(), Scheduled.into(), AcqRel)
                .into();

            match actual {
                Idle => return true,
                Running => {
                    // The task is already running on another thread. Transition
                    // the state to `Notified`. If this CAS fails, then restart
                    // the logic again from `Idle`.
                    let actual = self
                        .state
                        .compare_and_swap(Running.into(), Notified.into(), AcqRel)
                        .into();

                    match actual {
                        Idle => continue,
                        _ => return false,
                    }
                }
                Complete | Aborted | Notified | Scheduled => return false,
            }
        }
    }

    /// Consumes any allocated capacity to block.
    ///
    /// Returns `true` if capacity was allocated, `false` otherwise.
    pub fn consume_blocking_allocation(&self) -> CanBlock {
        // This flag is the primary point of coordination. The queued flag
        // happens "around" setting the blocking capacity.
        BlockingState::consume_allocation(&self.blocking, AcqRel)
    }

    /// Drop the future
    ///
    /// This must only be called by the thread that successfully transitioned
    /// the future state to `Running`.
    fn drop_future(&self) {
        let _ = unsafe { (*self.future.get()).take() };
    }
}

impl fmt::Debug for Task {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("Task")
            .field("state", &self.state)
            .field("future", &"Spawn<BoxFuture>")
            .finish()
    }
}