diff options
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
-rw-r--r-- | embassy-executor/src/raw/mod.rs | 428 |
1 files changed, 428 insertions, 0 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs new file mode 100644 index 00000000..afe67dec --- /dev/null +++ b/embassy-executor/src/raw/mod.rs @@ -0,0 +1,428 @@ +//! Raw executor. +//! +//! This module exposes "raw" Executor and Task structs for more low level control. +//! +//! ## WARNING: here be dragons! +//! +//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe +//! executor wrappers in [`executor`](crate::executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. + +mod run_queue; +#[cfg(feature = "integrated-timers")] +mod timer_queue; +pub(crate) mod util; +mod waker; + +use core::cell::Cell; +use core::future::Future; +use core::pin::Pin; +use core::ptr::NonNull; +use core::task::{Context, Poll}; +use core::{mem, ptr}; + +use atomic_polyfill::{AtomicU32, Ordering}; +use critical_section::CriticalSection; +#[cfg(feature = "integrated-timers")] +use embassy_time::driver::{self, AlarmHandle}; +#[cfg(feature = "integrated-timers")] +use embassy_time::Instant; + +use self::run_queue::{RunQueue, RunQueueItem}; +use self::util::UninitCell; +pub use self::waker::task_from_waker; +use super::SpawnToken; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +/// Raw task header for use in task pointers. +/// +/// This is an opaque struct, used for raw pointers to tasks, for use +/// with funtions like [`wake_task`] and [`task_from_waker`]. +pub struct TaskHeader { + pub(crate) state: AtomicU32, + pub(crate) run_queue_item: RunQueueItem, + pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED + + #[cfg(feature = "integrated-timers")] + pub(crate) expires_at: Cell<Instant>, + #[cfg(feature = "integrated-timers")] + pub(crate) timer_queue_item: timer_queue::TimerQueueItem, +} + +impl TaskHeader { + pub(crate) const fn new() -> Self { + Self { + state: AtomicU32::new(0), + run_queue_item: RunQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + + #[cfg(feature = "integrated-timers")] + expires_at: Cell::new(Instant::from_ticks(0)), + #[cfg(feature = "integrated-timers")] + timer_queue_item: timer_queue::TimerQueueItem::new(), + } + } +} + +/// Raw storage in which a task can be spawned. +/// +/// This struct holds the necessary memory to spawn one task whose future is `F`. +/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You +/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. +/// +/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished +/// running. Hence the relevant methods require `&'static self`. It may be reused, however. +/// +/// Internally, the [embassy_executor::task](embassy_macros::task) macro allocates an array of `TaskStorage`s +/// in a `static`. The most common reason to use the raw `Task` is to have control of where +/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. + +// repr(C) is needed to guarantee that the Task is located at offset 0 +// This makes it safe to cast between TaskHeader and TaskStorage pointers. +#[repr(C)] +pub struct TaskStorage<F: Future + 'static> { + raw: TaskHeader, + future: UninitCell<F>, // Valid if STATE_SPAWNED +} + +impl<F: Future + 'static> TaskStorage<F> { + const NEW: Self = Self::new(); + + /// Create a new TaskStorage, in not-spawned state. + pub const fn new() -> Self { + Self { + raw: TaskHeader::new(), + future: UninitCell::uninit(), + } + } + + /// Try to spawn the task. + /// + /// The `future` closure constructs the future. It's only called if spawning is + /// actually possible. It is a closure instead of a simple `future: F` param to ensure + /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to + /// NRVO optimizations. + /// + /// This function will fail if the task is already spawned and has not finished running. + /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will + /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. + /// + /// Once the task has finished running, you may spawn it again. It is allowed to spawn it + /// on a different executor. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { + if self.spawn_mark_used() { + return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; + } + + SpawnToken::<F>::new_failed() + } + + fn spawn_mark_used(&'static self) -> bool { + let state = STATE_SPAWNED | STATE_RUN_QUEUED; + self.raw + .state + .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> { + // Initialize the task + self.raw.poll_fn.write(Self::poll); + self.future.write(future()); + NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader) + } + + unsafe fn poll(p: NonNull<TaskHeader>) { + let this = &*(p.as_ptr() as *const TaskStorage<F>); + + let future = Pin::new_unchecked(this.future.as_mut()); + let waker = waker::from_task(p); + let mut cx = Context::from_waker(&waker); + match future.poll(&mut cx) { + Poll::Ready(_) => { + this.future.drop_in_place(); + this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + } + Poll::Pending => {} + } + + // the compiler is emitting a virtual call for waker drop, but we know + // it's a noop for our waker. + mem::forget(waker); + } +} + +unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} + +/// Raw storage that can hold up to N tasks of the same type. +/// +/// This is essentially a `[TaskStorage<F>; N]`. +pub struct TaskPool<F: Future + 'static, const N: usize> { + pool: [TaskStorage<F>; N], +} + +impl<F: Future + 'static, const N: usize> TaskPool<F, N> { + /// Create a new TaskPool, with all tasks in non-spawned state. + pub const fn new() -> Self { + Self { + pool: [TaskStorage::NEW; N], + } + } + + /// Try to spawn a task in the pool. + /// + /// See [`TaskStorage::spawn()`] for details. + /// + /// This will loop over the pool and spawn the task in the first storage that + /// is currently free. If none is free, a "poisoned" SpawnToken is returned, + /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { + for task in &self.pool { + if task.spawn_mark_used() { + return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; + } + } + + SpawnToken::<F>::new_failed() + } + + /// Like spawn(), but allows the task to be send-spawned if the args are Send even if + /// the future is !Send. + /// + /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used + /// by the Embassy macros ONLY. + /// + /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` + /// is an `async fn`, NOT a hand-written `Future`. + #[doc(hidden)] + pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> + where + FutFn: FnOnce() -> F, + { + // When send-spawning a task, we construct the future in this thread, and effectively + // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, + // send-spawning should require the future `F` to be `Send`. + // + // The problem is this is more restrictive than needed. Once the future is executing, + // it is never sent to another thread. It is only sent when spawning. It should be + // enough for the task's arguments to be Send. (and in practice it's super easy to + // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) + // + // We can do it by sending the task args and constructing the future in the executor thread + // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy + // of the args. + // + // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the + // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. + // + // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, + // but it's possible it'll be guaranteed in the future. See zulip thread: + // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) + // + // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. + // This is why we return `SpawnToken<FutFn>` below. + // + // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly + // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. + + for task in &self.pool { + if task.spawn_mark_used() { + return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); + } + } + + SpawnToken::<FutFn>::new_failed() + } +} + +/// Raw executor. +/// +/// This is the core of the Embassy executor. It is low-level, requiring manual +/// handling of wakeups and task polling. If you can, prefer using one of the +/// higher level executors in [`crate::executor`]. +/// +/// The raw executor leaves it up to you to handle wakeups and scheduling: +/// +/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks +/// that "want to run"). +/// - You must supply a `signal_fn`. The executor will call it to notify you it has work +/// to do. You must arrange for `poll()` to be called as soon as possible. +/// +/// `signal_fn` can be called from *any* context: any thread, any interrupt priority +/// level, etc. It may be called synchronously from any `Executor` method call as well. +/// You must deal with this correctly. +/// +/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates +/// the requirement for `poll` to not be called reentrantly. +pub struct Executor { + run_queue: RunQueue, + signal_fn: fn(*mut ()), + signal_ctx: *mut (), + + #[cfg(feature = "integrated-timers")] + pub(crate) timer_queue: timer_queue::TimerQueue, + #[cfg(feature = "integrated-timers")] + alarm: AlarmHandle, +} + +impl Executor { + /// Create a new executor. + /// + /// When the executor has work to do, it will call `signal_fn` with + /// `signal_ctx` as argument. + /// + /// See [`Executor`] docs for details on `signal_fn`. + pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + #[cfg(feature = "integrated-timers")] + let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; + #[cfg(feature = "integrated-timers")] + driver::set_alarm_callback(alarm, signal_fn, signal_ctx); + + Self { + run_queue: RunQueue::new(), + signal_fn, + signal_ctx, + + #[cfg(feature = "integrated-timers")] + timer_queue: timer_queue::TimerQueue::new(), + #[cfg(feature = "integrated-timers")] + alarm, + } + } + + /// Enqueue a task in the task queue + /// + /// # Safety + /// - `task` must be a valid pointer to a spawned task. + /// - `task` must be set up to run in this executor. + /// - `task` must NOT be already enqueued (in this executor or another one). + #[inline(always)] + unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) { + if self.run_queue.enqueue(cs, task) { + (self.signal_fn)(self.signal_ctx) + } + } + + /// Spawn a task in this executor. + /// + /// # Safety + /// + /// `task` must be a valid pointer to an initialized but not-already-spawned task. + /// + /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. + /// In this case, the task's Future must be Send. This is because this is effectively + /// sending the task to the executor thread. + pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { + task.as_ref().executor.set(self); + + critical_section::with(|cs| { + self.enqueue(cs, task); + }) + } + + /// Poll all queued tasks in this executor. + /// + /// This loops over all tasks that are queued to be polled (i.e. they're + /// freshly spawned or they've been woken). Other tasks are not polled. + /// + /// You must call `poll` after receiving a call to `signal_fn`. It is OK + /// to call `poll` even when not requested by `signal_fn`, but it wastes + /// energy. + /// + /// # Safety + /// + /// You must NOT call `poll` reentrantly on the same executor. + /// + /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you + /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to + /// somehow schedule for `poll()` to be called later, at a time you know for sure there's + /// no `poll()` already running. + pub unsafe fn poll(&'static self) { + #[cfg(feature = "integrated-timers")] + self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); + + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); + + #[cfg(feature = "integrated-timers")] + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + // Run the task + task.poll_fn.read()(p as _); + + // Enqueue or update into timer_queue + #[cfg(feature = "integrated-timers")] + self.timer_queue.update(p); + }); + + #[cfg(feature = "integrated-timers")] + { + // If this is already in the past, set_alarm will immediately trigger the alarm. + // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, + // so we immediately do another poll loop iteration. + let next_expiration = self.timer_queue.next_expiration(); + driver::set_alarm(self.alarm, next_expiration.as_ticks()); + } + } + + /// Get a spawner that spawns tasks in this executor. + /// + /// It is OK to call this method multiple times to obtain multiple + /// `Spawner`s. You may also copy `Spawner`s. + pub fn spawner(&'static self) -> super::Spawner { + super::Spawner::new(self) + } +} + +/// Wake a task by raw pointer. +/// +/// You can obtain task pointers from `Waker`s using [`task_from_waker`]. +/// +/// # Safety +/// +/// `task` must be a valid task pointer obtained from [`task_from_waker`]. +pub unsafe fn wake_task(task: NonNull<TaskHeader>) { + critical_section::with(|cs| { + let header = task.as_ref(); + let state = header.state.load(Ordering::Relaxed); + + // If already scheduled, or if not started, + if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { + return; + } + + // Mark it as scheduled + header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*header.executor.get(); + executor.enqueue(cs, task); + }) +} + +#[cfg(feature = "integrated-timers")] +#[no_mangle] +unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { + let task = waker::task_from_waker(waker); + let task = task.as_ref(); + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); +} |