public class Coroutine<I,O> extends org.obrel.core.RelatedObject implements org.obrel.core.FluentRelatable<Coroutine<I,O>>
To achieve this functionality, the implementation makes use of several
modern Java features of which some are available starting with Java 8. The
execution of coroutine steps is done with CompletableFuture
which in
turn runs the code by default in the common
thread pool
defined in the ForkJoinPool
class. But if necessary the
Executor
used for running coroutines can be changed.
To provide a fluent and readable declaration the API makes extensive use of the new functional programming features of Java and the (recommended) use of static imports. By using these, especially by applying lambda expressions and method references, a concise and easily understandable declaration of coroutines is possible.
From the outside a coroutine is a function that receives an input value,
processes it, and return an output value as the result of the execution. This
is similar to the Function
interface introduced with Java 8. If
invoked blocking
it will behave exactly
like a standard function, blocking the current thread until the processing
has finished and the result value has been produced. But if invoked asynchronously
the coroutine will be executed in
parallel to the current thread, suspending it's execution shortly between
processing steps or even pausing until data is available.
Besides this Coroutine class there are a few other classes that play an important role in the execution of coroutines:
CoroutineContext
: coroutines run in a certain context. The
context can either be provided explicitly or else the default context
is used. If different
coroutines need to communicate during their execution they need to run in
the same context. The context can also be used to provide configuration
for the coroutines running in it.CoroutineScope
: coroutines can only be launched inside a scope.
The scope provides the runtime environment for an arbitrary number of
coroutines. It also serves as a defined entry and exit-point into
coroutine executions: a scope will block execution of the launching
thread until all coroutines in it have finished execution (either
successfully, by cancelation, or with an error). This follows the pattern
of structured concurrency which prevents "forgotten" executions
running in the background or terminating silently with an error. The
scope also provides configuration and shared state for the coroutines in
it, overriding any configuration in the context.Continuation
: Every execution of a coroutine is associated with
a continuation object that contains the current execution state. It is
local to a single execution and thus not shared with other running
instances of the same or other coroutines.CoroutineStep
: The base for all steps that can be executed in a
coroutine. Like the coroutine itself it is basically a function that
receives an input value and produces a result. When executing
asynchronously a step implementation can suspend it's execution by
stopping the background execution completely until the condition that
caused the suspension no longer exists (e.g. a resource becomes
available). Several standard steps are defined in the 'step' sub-package
but the base class can also be extended to create new kinds of coroutine
steps.Suspension
: If a step signals to suspend it's asynchronous
execution a suspension object is created. The suspension contains the
suspended execution state, mainly by referencing the associated Continuation
. When the suspending condition is resolved the suspension
can be used to resume the asynchronous execution of the coroutine.Channel
: The previous classes are always involved when building
and executing coroutines. Channels are an optional but important feature
because they allow multiple coroutines to communicate without blocking
their threads. A coroutine will automatically suspend it's execution if a
channel that is used for receiving or sending has no data or capacity
available. As soon as the channel becomes available again the coroutine
will continue to run. Channels are managed by either the scope or the
context the coroutine runs in. If coroutines in different scopes need to
communicate through a channel their scopes need to have the same context
and the channel must be created in the context (not in the scope which is
the default).A coroutine can either be created by invoking the public constructor
with the first CoroutineStep
to execute or by invoking the factory method first(CoroutineStep)
.
The latter allows to declare a coroutine in a fluent way with better
readability. There is a slight limitation caused by the generic type system
of Java: if the result of first()
is assigned
to a variable with a specific input type it may be necessary to declare the
input type explicitly in the parameters of a lambda expression. For example,
the following example (using a static import of first() may cause a compiler
error (depending on the Java version used):
Coroutine<String, String> toUpper = first(apply(s ->
s.toUpperCase()));
To make the code compile, the type of the lambda argument needs to be declared explicitly:
Coroutine<String, String> toUpper = first(apply((String s) ->
s.toUpperCase()));
After a coroutine has been created it can be extended with additional
steps by invoking the instance method then(CoroutineStep)
on it.
This method takes the next step to be executed and returns a new coroutine
instance. This means that coroutines are immutable, i.e. they
cannot be modified after they have been created. Only new coroutines can be
created from them. This allows to declare coroutine templates which can be
extended by adding additional processing steps without the risk of changing
the original. Thus the first()
and then()
methods implement a builder pattern where each
invocation creates a new coroutine instance.
The immutability of coroutines initially only covers their code (i.e. the
step sequence). The Coroutine class also extends RelatedObject
and
therefore allows to set arbitrary relations on it which can be used to
configure step behavior or set default data, for example. To also make the
relations of an instance immutable (basically sealing the coroutine template)
just set the flag relation IMMUTABLE
on it which
will prevent the further modification of relations. This flag will not be
copied on started coroutines (which are always a copy of the original
coroutine). This allows to modify the instances of immutable coroutine
templates.
When a coroutine is executed a copy of it is created and then associated
with a new Continuation
instance. That prevents running code to
modify the coroutine (template) it has been started from but gives it access
to any of it's relations. The actual runtime state is stored in the
continuation object and may be modified freely by the coroutine code. It is
recommended that coroutine steps use the continuation if they need to share
data with other steps besides the standard input and output parameters.
Accessing state in the continuation can be done without further
synchronization because the steps in a coroutine are executed sequentially
and never concurrently (unless stated otherwise by some special step
implementations). But if steps access variables outside the continuation they
must apply the same caution like other multi-threaded code in Java because
access to such resource may (and will probably) need synchronization to avoid
concurrency issues (which are notoriously difficult to debug). This includes
CoroutineScope
and CoroutineContext
which are shared by
multiple running coroutine instances. There are no synchronization mechanisms
for access to the relations in these objects. If a step implementation wants
to modify relations in the scope or context it must perform the necessary
synchronization itself.
Attention: Any synchronization between coroutines should be applied with caution. Coroutines implement cooperative multi-tasking by executing their steps in a thread pool. These pools assume that the code running in a pool thread only occupies it as long as needed for the processing. Blocking such a thread in some way (like waiting for a lock, accessing a synchronized resource, or just sleeping) counteracts the purpose of the thread pool in particular and of cooperative multi-tasking in general.
Therefore it is strongly advised to not perform "classical" synchronizations from coroutine steps. Instead it should be checked whether it is possible to implement this in a cooperative way by suspending the coroutine execution while waiting for a resource. This can be achieved by using an asynchronous API for accessing a resource, like that provided by the java.nio package (see the sub-package 'step.nio' for examples).
Modifier and Type | Class and Description |
---|---|
static class |
Coroutine.Subroutine<I,T,O>
A coroutine subclass for the invocation of coroutines as subroutines in
the context of another execution.
|
Constructor and Description |
---|
Coroutine(CoroutineStep<I,O> rFirstStep)
Creates a new instance that starts execution with a certain step.
|
Modifier and Type | Method and Description |
---|---|
static <I,O> Coroutine<I,O> |
first(CoroutineStep<I,O> rStep)
A factory method that creates a new coroutine which starts with the
execution of a certain code function.
|
static <I,O> Coroutine<I,O> |
first(java.lang.String sStepName,
CoroutineStep<I,O> rStep)
A variant of
first(CoroutineStep) that also sets an explicit
step name. |
Continuation<O> |
runAsync(CoroutineScope rScope)
Runs a copy of this coroutine asynchronously in the given scope with an
input value of NULL.
|
Continuation<O> |
runAsync(CoroutineScope rScope,
I rInput)
Runs a copy of this coroutine asynchronously in a certain scope.
|
Continuation<O> |
runBlocking(CoroutineScope rScope)
Runs a copy of this coroutine in the given scope with an input value of
NULL and blocks the current thread until the coroutine finished.
|
Continuation<O> |
runBlocking(CoroutineScope rScope,
I rInput)
Runs a copy of this coroutine in a certain scope and blocks the current
thread until the coroutine finished.
|
<T> Coroutine<I,T> |
then(CoroutineStep<O,T> rStep)
Returns a new coroutine that executes additional code after that of this
instance.
|
<T> Coroutine<I,T> |
then(java.lang.String sStepName,
CoroutineStep<O,T> rStep)
A variant of
then(CoroutineStep) that also sets an explicit step
name. |
java.lang.String |
toString() |
deleteRelation, get, getRelation, getRelations, notifyRelationListeners, readRelations, relationsEqual, relationsHashCode, relationsString, set, set, transform, writeRelations
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
public Coroutine(CoroutineStep<I,O> rFirstStep)
rFirstStep
- The first step to executepublic static <I,O> Coroutine<I,O> first(CoroutineStep<I,O> rStep)
rStep
- fCode The function containing the starting code of the
coroutinepublic static <I,O> Coroutine<I,O> first(java.lang.String sStepName, CoroutineStep<I,O> rStep)
first(CoroutineStep)
that also sets an explicit
step name. Naming steps can help debugging coroutines.sStepName
- A name that identifies this step in this coroutinerStep
- The step to executepublic Continuation<O> runAsync(CoroutineScope rScope)
rScope
- The scope to run inContinuation
that provides access to the execution
resultrunAsync(CoroutineScope, Object)
public Continuation<O> runAsync(CoroutineScope rScope, I rInput)
Continuation
that contains the execution state
and provides access to the coroutine result AFTER it finishes. Because
the execution happens asynchronously (i.e. in another thread) the
receiving code must always use the corresponding continuation methods to
check for completion before accessing the continuation state.
Because a copy of this coroutine is executed, the continuation also references the copy and not this instance. If the running code tries to modify state of the coroutine it will only modify the copy, not the original instance.
If multiple coroutines need to communicate through channels
they must run in the same context because channels are managed
by the context based on the channel ID.
rScope
- The scope to run inrInput
- The input valueContinuation
that provides access to the execution
resultpublic Continuation<O> runBlocking(CoroutineScope rScope)
rScope
- The scope to run inContinuation
that provides access to the execution
resultrunBlocking(CoroutineScope, Object)
public Continuation<O> runBlocking(CoroutineScope rScope, I rInput)
Continuation
that
contains the execution state and provides access to the coroutine result.
Because a copy of this coroutine is executed, the continuation also references the copy and not this instance. If the running code tries to modify state of the coroutine it will only modify the copy, not the original instance.
rScope
- The scope to run inrInput
- The input valueContinuation
that provides access to the execution
resultpublic <T> Coroutine<I,T> then(CoroutineStep<O,T> rStep)
first(CoroutineStep)
or
with the public constructor. Invoking a builder method creates a new
coroutine with the combined code while the original coroutine remains
unchanged (or is discarded in the case of a builder chain).
Each invocation of a builder method creates a coroutine suspension point at which the execution will be interrupted to allow other code to run on the current thread (e.g. another coroutine). Some steps may suspend the execution until values from another coroutine or some external source become available.
An extended coroutine re-uses the original code of the coroutine it is
derived from. Therefore it is necessary to ensure that the code in shared
(base) coroutines contains no external dependencies that could change the
behavior of all derived coroutines if modified (unless desired, but
beware of side-effects). The best way to achieve this is by using
correctly defined closures when declaring step. If steps need to share
information during execution that can be achieved by setting relations on
the Continuation
which is always local to the respective
execution.
rStep
- The step to executepublic <T> Coroutine<I,T> then(java.lang.String sStepName, CoroutineStep<O,T> rStep)
then(CoroutineStep)
that also sets an explicit step
name. Naming steps can help debugging coroutines.sStepName
- A name that identifies this step in this coroutinerStep
- The step to executepublic java.lang.String toString()
toString
in class org.obrel.core.RelatedObject