How to develop an Activity Definition in Python
You can develop an Activity Definition by using the @activity.defn
decorator.
@activity.defn
async def your_activity(name: str) -> str:
return f"Hello, {name}!"
You can register the function as an Activity with a custom name through a decorator argument. For example, @activity.defn(name="your-activity")
.
@activity.defn(name="your-activity")
async def your_activity(name: str) -> str:
return f"Hello, {name}!"
Types of Activities
The following lists the different types of Activity callables:
Only positional arguments are supported by Activities.
Asynchronous Activities
Asynchronous Activities (recommended) are functions using async def
. When using asynchronous Activities there aren't any additional Worker parameters needed.
Cancellation for asynchronous activities is done by means of the
asyncio.Task.cancel
operation. This means that asyncio.CancelledError
will be raised (and can be caught, but it is not recommended).
An Activity must Heartbeat to receive cancellation.
Synchronous Activities
The activity_executor
Worker parameter must be set with a concurrent.futures.Executor
instance to use for executing the Activities.
Cancellation for synchronous Activities is done in the background and the Activity must choose to listen for it and react appropriately.
An Activity must Heartbeat to receive cancellation.
Multithreaded Activities are functions that use activity_executor
set to an instance of concurrent.futures.ThreadPoolExecutor
.
Besides activity_executor
, no other additional Worker parameters are required for synchronous multithreaded Activities.
If activity_executor
is set to an instance of concurrent.futures.Executor
that is not concurrent.futures.ThreadPoolExecutor
, then the synchronous activities are considered multiprocess/other activities.
These require special primitives for heartbeating and cancellation. The shared_state_manager
Worker parameter must be set to an instance of worker.SharedStateManager
. The most common implementation can be created by passing a multiprocessing.managers.SyncManager
(for example, as a result of multiprocessing.managers.Manager()
) to worker.SharedStateManager.create_from_multiprocessing()
.