Skip to content

threadshare: Task rework

Sorry for the long description. I think this MR requires details about its motivations and the proposed designed.

Current Design

The threadshare::runtime::Task design is inspired by GStreamer GstTask, adapted to the threadshare runtime:

  • It's a means to loop on an iteration Future operating in a Context.
  • User can control the state of the loop.

To keep track of its environment (the equivalent of the "user data" in the GstTask model), the iteration Future is actually returned by a closure. The same principle holds for prepare. In order to ease the explanations below, "iteration function" will refer to either the closure which returns the Future for the iteration or the resulting Future.

The following graph represents the state machine in current design:

Current runtime::Task state machine

During recent changes to the gst-plugin-threadshare implementation, some shortcomings were identified.

Pause / Resume

After the Task is requested to pause (the loop remains on hold after current iteration), the user can call start for the loop to resume iterating. Due to the start signature, the "iteration function" must be provided again. IMHO, the loop should resume iterating on the same "iteration function". The user should be able to call start again without the need to provide the "iteration function".

FlushStart / FlushStop

  • When a Pad receives a FlushStart Event, the Element must stop the data-flow as soon as possible.
  • The Element must flush any pending data-flow item and get ready to start a new segment.
  • When a Pad receives the FlushStop Event, the Element must start the data-flow again.

Implementation

The Task model uses the Cancel trigger to perform the "stop the flow as soon as possible" part. While the transition resulting from a pause trigger would only occur after current iteration completes (if it does), the Cancel transition interrupts the running iteration. Technically, the interruption happens on the next await point.

We could be tempted to implement FlushStart & FlushStop handling as follows:

fn flush_start(&self) {
    self.task.cancel();
}

fn flush_stop(&self) {
    self.task.stop();
    self.flow_stream.flush();
    self.src_pad_handler.set_need_segment();
    self.task.start();
}

Note that flush_stop can diverge from a typical ElementImpl::stop followed by an ElementImpl::start since we usually don't want FlushStop to push new StreamStart and Caps events.

The above implementation for flush_stop is racy: if a state transition occurs during its execution, we might end up in an unexpected state. We need to hold a lock on a synchronization primitive (e.g. a Mutex) to make sure the whole function is performed before another transition is handled.

Pause, FlushStart, Start

With the following sequence:

  1. Element Paused.
  2. Event FlushStart.
  3. Element Paused -> Playing (this executes task.start()).

The Task ends up Started, meaning that it handles dataflow items, when it should not because FlushStop has not been received yet.

We could add a check in ElementImpl::start and start the Task only when it is not in the Cancelled state. For this to be threadsafe, we would need to hold the Task state, but that would prevent us from calling Task::start, so we need a new state protected by a Mutex. This state would be controlled by the ElementImpl which would implement its own state machine. We would also need to differentiate from a loop that was cancelled because the iteration returned Continue(false) from a loop that was cancelled because FlushStart was received.

This MR proposes another solution so that ts-elements can still rely on the Task state machine for usual use cases. See § "Proposed State Machine" below.

Pause, FlushStart, FlushStop

With the following sequence:

  1. Element Paused.
  2. Event FlushStart.
  3. Event FlushStop.

The expected result is a paused flow but the Task ends up in the Started state.

The solution is similar to the one described above.

FlushStop // with Start

With the following sequence:

  1. Element Playing.
  2. Event FlushStart
  3. Concurrently:
    • Event FlushStop.
    • Element Playing -> Paused.

Depending on the actual sequence, the Task may end up either in the Started or Paused state, when the final state should always be Paused.

The solution is similar to the one described above.

Proposal

Requirements

This MR proposes an extension to the Task model which aims at complying with the following requirements:

  • A. After a FlushStop or Start trigger, the Task should end up either in the Started or Paused states, depending on the transition sequence.
  • B. The user shouldn't need to rely on an external synchronization primitive to prevent transition action race conditions.
  • C. The user should get mutable and lock-less access to the loop environment during transition actions and in the iteration function. This should reduce the need for synchronization primitives, especially in the hot path.
  • D. The user should be able to trigger start to resume a Paused loop without the need to provide the iteration Future again.

Design

State Machine

Proposed runtime::Task state machine

The main differences with current design are:

  • A dedicated async action is executed when a transition occurs.
  • In the event of a transition action failure, TaskImpl::handle_action_error is invoked to determine the appropriate behaviour. The default is to log the error and to switch to the Error state.
  • Iteration returns a Result<(), gst::FlowError> instead of glib::Continue. By default, the Task state machine switches to a different state if the result is Error, Eos or Flushing. This behaviour is defined in the TaskImpl::handle_iterate_error default implementation and can be over-ridden by the implementer.
  • In its Flushing use cases, the Cancelled state is replaced with two states: Flushing and PausedFlushing.
  • After preparation, users can choose to switch to Paused or Stopped in addition to Started.

TaskImpl trait

The user provides a struct that implements the trait TaskImpl. This trait defines (optional) async actions which will be called when a transition occurs and the (mandatory) async iteration to loop on in the Started state. The struct also contains the environment which can be mutability accessed within the transition actions and the iteration.

State Machine

When triggering a Task transition, we need to:

  1. Lock the Mutex<TaskInner>.
  2. Prepare the transition.
  3. Execute the async action on the Task Context.

The problem with (3) is that we can't hold the lock on the Mutex<TaskInner> while executing the action:

  • If the action directly or indirectly triggers an event resulting in a transition on its own Task, this would deadlock.
  • If the transition is triggered from a Context, we can't block_on the async hook because that would block all the Futures executing on the same Context. The solution for this is to add a substask to the Context and the caller (e.g. a Pad{Src,Sink}) is responsible for draining the subtask in the proper async environment when appropriate. This happens out of the scope of the Task::_trigger_ function, so the lock on the Mutex is released.

Still, we don't want concurrent transitions to race with current transition, which would result in an inconsistent state.

The solution in this proposal is to post the triggering event to a channel. The state machine runs on a dedicated thread (actually a dedicated runtime common to all the Tasks state machines), awaiting for transition triggering events. When required, a simple mechanism allows holding the triggering event requester until the transition is complete, in conformity with the sync or async nature of the execution environment.

  • The state machine serializes transition handling. This ensures requirement (B).
  • The state machine owns the instance of TaskImpl, so it can mutate it without the need for an Arc<Mutex<_>>. This ensures requirement (C).

Edit: the following paragraph is obsolete: the state machines are now spawned on a ThreadPool instance from the futures crate.

Most of the time, the state machine (runtime) Context is idle, awaiting for the next triggering event. The transition action or loop is spawned on the Task configured Context (let's call it the "group" Context). Since its load is rather low, the state machine Context is common to all the Tasks in the process, but we could decide to easily balance the load by spawning a different state machine Context for each "group" Context.

New Runtime Feature

When starting a pipeline, the pipeline iterates on each element to change their states. This is a serial process, meaning that the overall start up time is the sum of each element start up time.

A ts-element usually uses a non-null "max. throttling" duration for its runtime Context. The benefit of this approach is that we can group I/O polling and reduce CPU usage when processing typical multimedia data-flows. See this page for more details.

Executing an asynchronous process on a Context can incur a short delay of "max. throttling" (typically 20ms). When the pipeline changes state, each ts-element transition action might be subject to the "max. throttling" delay, resulting in long start-up durations for pipelines with many ts-elements.

In current Task model, this problem was addressed as follows:

  • When a prepare future is defined, don't wait for the future to complete before returning to the caller.
  • When Task::start is triggered, don't wait for the loop to actually reach Started. The spawned loop awaits for the preparation to complete and then loops on the iteration future.

In both cases, the runtime scheduler can group the futures since we don't wait for each to complete before spawning the next one.

With the proposed design however, we want to wait for the async transition actions to complete before returning. In order to avoid incurring the throttling duration, the runtime scheduler was modified so that it can be awaken in case it was throttling. The new Context::awake_and_spawn takes advantage of this new feature by awaking the scheduler when spawning a transition hook.

See this commit for the implementation of this feature. Edit: had to add a fix to allow compiling with rustc 1.40.0.

(Former) Open Question

Transition actions and the iteration Future can directly or indirectly spawn subtasks, which would require calling Context::drain_sub_tasks for the processing to complete. This also involves handling the potential errors. When the prepare hook completes, it is followed by a Context::drain_sub_tasks. I added a handle_prepare_error hook which is called when an error occurs. The user is supposed to take the necessary measures to preventi subsequent state transitions.

The other transitions could also automatically call Context::drain_sub_tasks after completion, but we would need error handlers. I see several options for this:

  • Use a single handle_error hook for every transitions and pass the trigger as an argument so that the handler can act accordingly when necessary. This would be convenient if most of the time the error handling is the same whatever the transition.
  • Use a handle_xxx_error per transition: this is the fine grain variant. For many TaskImpl implementers, there would be none or few of them to implement, but it could result verbose when several of them are required.
  • Let the implementer call Context::drain_sub_tasks and handle the error when necessary. This leaves the Task implementation and TaskImpl declaration simpler, but requires the implementer to be aware of details of the threadshare model.

Edit: see this answer regarding the preferred approach.

Metrics

The plots below compare the following versions:

  • TaskImpl mono uses monorphization for TaskImpl in the StateMachine. This causes larger lib size and higher memory usage. CPU usage & throughput are better than those observed with current master.
  • TaskImpl dyn uses dynamic dispatch for TaskImpl in the StateMachine. We get almost the same CPU usage & throughput as with TaskImpl mono and the same lib size and memory usage as current master.
  • master corresponds to the master repository the 2020/05/16 at 12:00 UTC, head was https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/d130b291461dc1134f0c3420ffa31257e8031f5c.
CPU Usage

CPU Usage

Throughput

Throughput

Memory Usage

Memory Usage

Initial MR Description

This is the initial description for this MR. It is kept here for tracking.

This MR is the continuation of previous MRs and discussions.

  1. Since the introduction of the Task::pause transition (see !277 (merged)), the behaviour of Task::start was not consistent when resuming from the Paused state (see !277 (comment 442981)).

    The MR defines the iteration function once and for all in Task::prepare and leaves start without additional arguments.

  2. Getting flush start/stop handling right when writting a ts element can result challenging (see !240 (comment 385778), !271 (comment 430807) & !277 (comment 439529)). One of the problems is the potential concurrency between state changes and flush events handling. Current model doesn't allow performing all the changes in the flush handlers in a "transactional" way. The workaround for this implies using a dedicated Mutex with a flag reflecting the state of the element with regard to item handling.

    The MR introduces new functions Task::flush_{start,stop} and hooks that the element implementer can define to execute the corresponding operations for these state transitions.

  3. Since the element model relies on a shared pointer to the underlying object, we can't take advantage of the compiler's borrow checker to verify the access to the ts elements attributes from the element hooks. The solution for this is to wrap the attributes in Mutexes. Depending on how an attribute is used, we may or may nor be able to use async-friendly Mutexes. Most of the time, this is not an issue because there is little concurrency on these attributes. But still, when a lock is called from a Context and the underlying resource is in use, all the tasks in the same Context are suspended. Edit: see this thread !298 (comment 458654) for a discussion about this statement.

    The MR was an opportunity to define a solution to allow a ts element implementer to define async Task transitions and make use of async-friendly types most of the time. Moreover, the specific task implementation gets immediate mutable access to its attributes including in the Task iteration function, which IMHO allows for more Rust idiomatic implementations.

Edited by François Laignel

Merge request reports