threadshare: Task rework
Sorry for the long description. I think this MR requires details about its motivations and the proposed designed.
Current Design
The threadshare::runtime::Task
design is inspired by GStreamer GstTask
, adapted to the threadshare runtime:
- It's a means to loop on an iteration
Future
operating in aContext
. - User can control the state of the loop.
To keep track of its environment (the equivalent of the "user data"
in the GstTask
model), the iteration Future
is actually returned by a
closure. The same principle holds for prepare
. In order to ease the
explanations below, "iteration function" will refer to either the closure
which returns the Future
for the iteration or the resulting Future
.
The following graph represents the state machine in current design:
During recent changes to the gst-plugin-threadshare
implementation, some
shortcomings were identified.
Pause / Resume
After the Task
is requested to pause (the loop remains on hold after current
iteration), the user can call start
for the loop to resume iterating. Due to
the start
signature, the "iteration function" must be provided again.
IMHO, the loop should resume iterating on the same "iteration function".
The user should be able to call start
again without the need to provide the
"iteration function".
FlushStart / FlushStop
- When a
Pad
receives aFlushStart
Event
, theElement
must stop the data-flow as soon as possible. - The
Element
must flush any pending data-flow item and get ready to start a new segment. - When a
Pad
receives theFlushStop
Event
, theElement
must start the data-flow again.
Implementation
The Task
model uses the Cancel
trigger to perform the "stop the flow as
soon as possible" part. While the transition resulting from a pause
trigger
would only occur after current iteration completes (if it does), the Cancel
transition interrupts the running iteration. Technically, the interruption
happens on the next await
point.
We could be tempted to implement FlushStart
& FlushStop
handling as follows:
fn flush_start(&self) {
self.task.cancel();
}
fn flush_stop(&self) {
self.task.stop();
self.flow_stream.flush();
self.src_pad_handler.set_need_segment();
self.task.start();
}
Note that flush_stop
can diverge from a typical ElementImpl::stop
followed
by an ElementImpl::start
since we usually don't want FlushStop
to push new
StreamStart
and Caps
events.
The above implementation for flush_stop
is racy: if a state transition occurs
during its execution, we might end up in an unexpected state. We need to hold a
lock on a synchronization primitive (e.g. a Mutex
) to make sure the whole
function is performed before another transition is handled.
Pause, FlushStart, Start
With the following sequence:
- Element Paused.
- Event FlushStart.
- Element Paused -> Playing (this executes
task.start()
).
The Task
ends up Started
, meaning that it handles dataflow items, when it
should not because FlushStop
has not been received yet.
We could add a check in ElementImpl::start
and start the Task
only when
it is not in the Cancelled
state. For this to be threadsafe, we would need to
hold the Task
state, but that would prevent us from calling Task::start
,
so we need a new state protected by a Mutex
. This state would be controlled by
the ElementImpl
which would implement its own state machine. We would also
need to differentiate from a loop that was cancelled because the iteration
returned Continue(false)
from a loop that was cancelled because FlushStart
was received.
This MR proposes another solution so that ts-elements can still rely on the
Task
state machine for usual use cases. See § "Proposed State Machine" below.
Pause, FlushStart, FlushStop
With the following sequence:
- Element Paused.
- Event FlushStart.
- Event FlushStop.
The expected result is a paused flow but the Task
ends up in the Started
state.
The solution is similar to the one described above.
FlushStop // with Start
With the following sequence:
- Element Playing.
- Event FlushStart
- Concurrently:
- Event FlushStop.
- Element Playing -> Paused.
Depending on the actual sequence, the Task
may end up either in the Started
or Paused
state, when the final state should always be Paused
.
The solution is similar to the one described above.
Proposal
Requirements
This MR proposes an extension to the Task
model which aims at complying with
the following requirements:
- A. After a
FlushStop
orStart
trigger, theTask
should end up either in theStarted
orPaused
states, depending on the transition sequence. - B. The user shouldn't need to rely on an external synchronization primitive to prevent transition action race conditions.
- C. The user should get mutable and lock-less access to the loop environment during transition actions and in the iteration function. This should reduce the need for synchronization primitives, especially in the hot path.
- D. The user should be able to trigger
start
to resume aPaused
loop without the need to provide the iteration Future again.
Design
State Machine
The main differences with current design are:
- A dedicated
async
action is executed when a transition occurs. - In the event of a transition action failure,
TaskImpl::handle_action_error
is invoked to determine the appropriate behaviour. The default is to log the error and to switch to theError
state. - Iteration returns a
Result<(), gst::FlowError>
instead ofglib::Continue
. By default, theTask
state machine switches to a different state if the result isError
,Eos
orFlushing
. This behaviour is defined in theTaskImpl::handle_iterate_error
default implementation and can be over-ridden by the implementer. - In its
Flushing
use cases, theCancelled
state is replaced with two states:Flushing
andPausedFlushing
. - After preparation, users can choose to switch to
Paused
orStopped
in addition toStarted
.
TaskImpl trait
The user provides a struct that implements the trait TaskImpl
. This trait
defines (optional) async
actions which will be called when a transition occurs
and the (mandatory) async
iteration to loop on in the Started
state. The
struct also contains the environment which can be mutability accessed within the
transition actions and the iteration.
State Machine
When triggering a Task
transition, we need to:
- Lock the
Mutex<TaskInner>
. - Prepare the transition.
- Execute the
async
action on theTask
Context
.
The problem with (3) is that we can't hold the lock on the Mutex<TaskInner>
while executing the action:
- If the action directly or indirectly triggers an event resulting in a transition
on its own
Task
, this would deadlock. - If the transition is triggered from a
Context
, we can'tblock_on
theasync
hook because that would block all theFuture
s executing on the sameContext
. The solution for this is to add a substask to theContext
and the caller (e.g. aPad{Src,Sink}
) is responsible for draining the subtask in the proper async environment when appropriate. This happens out of the scope of theTask::_trigger_
function, so the lock on theMutex
is released.
Still, we don't want concurrent transitions to race with current transition, which would result in an inconsistent state.
The solution in this proposal is to post the triggering event to a channel.
The state machine runs on a dedicated thread (actually a dedicated runtime
common to all the Task
s state machines), awaiting for transition triggering
events. When required, a simple mechanism allows holding the triggering event
requester until the transition is complete, in conformity with the sync
or
async
nature of the execution environment.
- The state machine serializes transition handling. This ensures requirement (B).
- The state machine owns the instance of
TaskImpl
, so it can mutate it without the need for anArc<Mutex<_>>
. This ensures requirement (C).
Edit: the following paragraph is obsolete: the state machines are now spawned on
a ThreadPool
instance from the futures
crate.
Most of the time, the state machine (runtime) Context
is idle, awaiting for
the next triggering event. The transition action or loop is spawned on the Task
configured Context
(let's call it the "group" Context
). Since its load is
rather low, the state machine Context
is common to all the Task
s in the
process, but we could decide to easily balance the load by spawning a
different state machine Context
for each "group" Context
.
New Runtime Feature
When starting a pipeline, the pipeline iterates on each element to change their states. This is a serial process, meaning that the overall start up time is the sum of each element start up time.
A ts-element usually uses a non-null "max. throttling" duration for its runtime
Context
. The benefit of this approach is that we can group I/O polling and
reduce CPU usage when processing typical multimedia data-flows. See this page
for more details.
Executing an asynchronous process on a Context
can incur a short delay of
"max. throttling" (typically 20ms). When the pipeline changes state, each
ts-element transition action might be subject to the "max. throttling" delay,
resulting in long start-up durations for pipelines with many ts-elements.
In current Task
model, this problem was addressed as follows:
- When a
prepare
future is defined, don't wait for the future to complete before returning to the caller. - When
Task::start
is triggered, don't wait for the loop to actually reachStarted
. The spawned loop awaits for the preparation to complete and then loops on the iteration future.
In both cases, the runtime scheduler can group the futures since we don't wait for each to complete before spawning the next one.
With the proposed design however, we want to wait for the async
transition actions
to complete before returning. In order to avoid incurring the throttling duration,
the runtime scheduler was modified so that it can be awaken in case it was
throttling. The new Context::awake_and_spawn
takes advantage of this new
feature by awaking the scheduler when spawning a transition hook.
See this commit for the implementation of this feature. Edit: had to add a fix to allow compiling with rustc 1.40.0.
(Former) Open Question
Transition actions and the iteration Future
can directly or indirectly spawn
subtasks, which would require calling Context::drain_sub_tasks
for the
processing to complete. This also involves handling the potential errors.
When the prepare
hook completes, it is followed by a Context::drain_sub_tasks
.
I added a handle_prepare_error
hook which is called when an error occurs. The
user is supposed to take the necessary measures to preventi subsequent state transitions.
The other transitions could also automatically call Context::drain_sub_tasks
after
completion, but we would need error handlers. I see several options for this:
- Use a single
handle_error
hook for every transitions and pass the trigger as an argument so that the handler can act accordingly when necessary. This would be convenient if most of the time the error handling is the same whatever the transition. - Use a
handle_xxx_error
per transition: this is the fine grain variant. For manyTaskImpl
implementers, there would be none or few of them to implement, but it could result verbose when several of them are required. - Let the implementer call
Context::drain_sub_tasks
and handle the error when necessary. This leaves theTask
implementation andTaskImpl
declaration simpler, but requires the implementer to be aware of details of the threadshare model.
Edit: see this answer regarding the preferred approach.
Metrics
The plots below compare the following versions:
-
TaskImpl mono uses monorphization for
TaskImpl
in theStateMachine
. This causes larger lib size and higher memory usage. CPU usage & throughput are better than those observed with current master. -
TaskImpl dyn uses dynamic dispatch for
TaskImpl
in theStateMachine
. We get almost the same CPU usage & throughput as with TaskImpl mono and the same lib size and memory usage as current master. - master corresponds to the master repository the 2020/05/16 at 12:00 UTC, head was https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/d130b291461dc1134f0c3420ffa31257e8031f5c.
CPU Usage
Throughput
Memory Usage
Initial MR Description
This is the initial description for this MR. It is kept here for tracking.
This MR is the continuation of previous MRs and discussions.
-
Since the introduction of the
Task::pause
transition (see !277 (merged)), the behaviour ofTask::start
was not consistent when resuming from thePaused
state (see !277 (comment 442981)).The MR defines the iteration function once and for all in
Task::prepare
and leavesstart
without additional arguments. -
Getting flush start/stop handling right when writting a ts element can result challenging (see !240 (comment 385778), !271 (comment 430807) & !277 (comment 439529)). One of the problems is the potential concurrency between state changes and flush events handling. Current model doesn't allow performing all the changes in the flush handlers in a "transactional" way. The workaround for this implies using a dedicated
Mutex
with a flag reflecting the state of the element with regard to item handling.The MR introduces new functions
Task::flush_{start,stop}
and hooks that the element implementer can define to execute the corresponding operations for these state transitions. -
Since the element model relies on a shared pointer to the underlying object, we can't take advantage of the compiler's borrow checker to verify the access to the ts elements attributes from the element hooks. The solution for this is to wrap the attributes in
Mutex
es. Depending on how an attribute is used, we may or may nor be able to use async-friendlyMutex
es. Most of the time, this is not an issue because there is little concurrency on these attributes. But still, when a lock is called from aContext
and the underlying resource is in use, all the tasks in the sameContext
are suspended. Edit: see this thread !298 (comment 458654) for a discussion about this statement.The MR was an opportunity to define a solution to allow a ts element implementer to define async
Task
transitions and make use of async-friendly types most of the time. Moreover, the specific task implementation gets immediate mutable access to its attributes including in theTask
iteration function, which IMHO allows for more Rust idiomatic implementations.