Task runners enable you to engage specific executors for Prefect tasks, such as for concurrent, parallel, or distributed execution of tasks.
Task runners are not required for task execution. If you call a task function directly, the task executes as a regular Python function, without a task runner, and produces whatever result is returned by the function.
Calling a task function from within a flow, using the default task settings, executes the function sequentially. Execution of the task function blocks execution of the flow until the task completes. This means, by default, calling multiple tasks in a flow causes them to run in order.
However, that's not the only way to run tasks!
You can use the .submit() method on a task function to submit the task to a task runner. Using a task runner enables you to control whether tasks run sequentially, concurrently, or if you want to take advantage of a parallel or distributed execution library such as Dask or Ray.
Using the .submit() method to submit a task also causes the task run to return a PrefectFuture, a Prefect object that contains both any data returned by the task function and a State, a Prefect object indicating the state of the task run.
Prefect currently provides the following built-in task runners:
ConcurrentTaskRunner can run tasks concurrently, allowing tasks to switch when blocking on IO. Tasks will be submitted to a thread pool maintained by anyio.
In addition, the following Prefect-developed task runners for parallel or distributed task execution may be installed as Prefect Integrations.
RayTaskRunner can run tasks requiring parallel execution using Ray.
Concurrency versus parallelism
The words "concurrency" and "parallelism" may sound the same, but they mean different things in computing.
Concurrency refers to a system that can do more than one thing simultaneously, but not at the exact same time. It may be more accurate to think of concurrent execution as non-blocking: within the restrictions of resources available in the execution environment and data dependencies between tasks, execution of one task does not block execution of other tasks in a flow.
Parallelism refers to a system that can do more than one thing at the exact same time. Again, within the restrictions of resources available, parallel execution can run tasks at the same time, such as for operations mapped across a dataset.
You do not need to specify a task runner for a flow unless your tasks require a specific type of execution.
To configure your flow to use a specific task runner, import a task runner and assign it as an argument for the flow when the flow is defined.
Remember to call .submit() when using a task runner
Make sure you use .submit() to run your task with a task runner. Calling the task directly, without .submit(), from within a flow will run the task sequentially instead of using a specified task runner.
For example, you can use ConcurrentTaskRunner to allow tasks to switch when they would block.
fromprefectimportflow,taskfromprefect.task_runnersimportConcurrentTaskRunnerimporttime@taskdefstop_at_floor(floor):print(f"elevator moving to floor {floor}")time.sleep(floor)print(f"elevator stops on floor {floor}")@flow(task_runner=ConcurrentTaskRunner())defelevator():forfloorinrange(10,0,-1):stop_at_floor.submit(floor)
If you specify an uninitialized task runner class, a task runner instance of that type is created with the default settings. You can also pass additional configuration parameters for task runners that accept parameters, such as DaskTaskRunner and RayTaskRunner.
Default task runner
If you don't specify a task runner for a flow and you call a task with .submit() within the flow, Prefect uses the default ConcurrentTaskRunner.
Sometimes, it's useful to force tasks to run sequentially to make it easier to reason about the behavior of your program. Switching to the SequentialTaskRunner will force submitted tasks to run sequentially rather than concurrently.
Synchronous and asynchronous tasks
The SequentialTaskRunner works with both synchronous and asynchronous task functions. Asynchronous tasks are Python functions defined using async def rather than def.
The following example demonstrates using the SequentialTaskRunner to ensure that tasks run sequentially. In the example, the flow glass_tower runs the task stop_at_floor for floors one through 38, in that order.
fromprefectimportflow,taskfromprefect.task_runnersimportSequentialTaskRunnerimportrandom@taskdefstop_at_floor(floor):situation=random.choice(["on fire","clear"])print(f"elevator stops on {floor} which is {situation}")@flow(task_runner=SequentialTaskRunner(),name="towering-infernflow",)defglass_tower():forfloorinrange(1,39):stop_at_floor.submit(floor)glass_tower()
Each flow can only have a single task runner, but sometimes you may want a subset of your tasks to run using a specific task runner. In this case, you can create subflows for tasks that need to use a different task runner.
For example, you can have a flow (in the example below called sequential_flow) that runs its tasks locally using the SequentialTaskRunner. If you have some tasks that can run more efficiently in parallel on a Dask cluster, you could create a subflow (such as dask_subflow) to run those tasks using the DaskTaskRunner.
fromprefectimportflow,taskfromprefect.task_runnersimportSequentialTaskRunnerfromprefect_dask.task_runnersimportDaskTaskRunner@taskdefhello_local():print("Hello!")@taskdefhello_dask():print("Hello from Dask!")@flow(task_runner=SequentialTaskRunner())defsequential_flow():hello_local.submit()dask_subflow()hello_local.submit()@flow(task_runner=DaskTaskRunner())defdask_subflow():hello_dask.submit()if__name__=="__main__":sequential_flow()
Guarding main
Note that you should guard the main function by using if __name__ == "__main__" to avoid issues with parallel processing.
This script outputs the following logs demonstrating the use of the Dask task runner:
120:14:29.785|INFO|prefect.engine-Createdflowrun'ivory-caiman'forflow'sequential-flow'20:14:29.785|INFO|Flowrun'ivory-caiman'-Starting'SequentialTaskRunner';submittedtaskswillberunsequentially...
20:14:29.880|INFO|Flowrun'ivory-caiman'-Createdtaskrun'hello_local-7633879f-0'fortask'hello_local'20:14:29.881|INFO|Flowrun'ivory-caiman'-Executing'hello_local-7633879f-0'immediately...
Hello!
20:14:29.904|INFO|Taskrun'hello_local-7633879f-0'-FinishedinstateCompleted()20:14:29.952|INFO|Flowrun'ivory-caiman'-Createdsubflowrun'nimble-sparrow'forflow'dask-subflow'20:14:29.953|INFO|prefect.task_runner.dask-CreatinganewDaskclusterwith`distributed.deploy.local.LocalCluster`20:14:31.862|INFO|prefect.task_runner.dask-TheDaskdashboardisavailableathttp://127.0.0.1:8787/status
20:14:31.901|INFO|Flowrun'nimble-sparrow'-Createdtaskrun'hello_dask-2b96d711-0'fortask'hello_dask'20:14:32.370|INFO|Flowrun'nimble-sparrow'-Submittedtaskrun'hello_dask-2b96d711-0'forexecution.
HellofromDask!
20:14:33.358|INFO|Flowrun'nimble-sparrow'-FinishedinstateCompleted('All states completed.')20:14:33.368|INFO|Flowrun'ivory-caiman'-Createdtaskrun'hello_local-7633879f-1'fortask'hello_local'20:14:33.368|INFO|Flowrun'ivory-caiman'-Executing'hello_local-7633879f-1'immediately...
Hello!
20:14:33.386|INFO|Taskrun'hello_local-7633879f-1'-FinishedinstateCompleted()20:14:33.399|INFO|Flowrun'ivory-caiman'-FinishedinstateCompleted('All states completed.')
When you use .submit() to submit a task to a task runner, the task runner creates a PrefectFuture for access to the state and result of the task.
A PrefectFuture is an object that provides access to a computation happening in a task runner — even if that computation is happening on a remote system.
In the following example, we save the return value of calling .submit() on the task say_hello to the variable future, and then we print the type of the variable:
fromprefectimportflow,task@taskdefsay_hello(name):returnf"Hello {name}!"@flowdefhello_world():future=say_hello.submit("Marvin")print(f"variable 'future' is type {type(future)}")hello_world()
When you run this code, you'll see that the variable future is a PrefectFuture:
When you pass a future into a task, Prefect waits for the "upstream" task — the one that the future references — to reach a final state before starting the downstream task.
This means that the downstream task won't receive the PrefectFuture you passed as an argument. Instead, the downstream task will receive the value that the upstream task returned.
Take a look at how this works in the following example
The .result() method will wait for the task to complete before returning the result to the caller. If the task run fails, .result() will raise the task run's exception. You may disable this behavior with the raise_on_failure option:
fromprefectimportflow,task@taskdefmy_task():return"I'm a task!"@flowdefmy_flow():future=my_task.submit()result=future.result(raise_on_failure=False)iffuture.get_state().is_failed():# `result` is an exception! handle accordingly...else:# `result` is the expected return value of our task...
You can retrieve the current state of the task run associated with the PrefectFuture using .get_state():
You can include a timeout in the wait call to perform logic if the task has not finished in a given amount of time:
@flowdefmy_flow():future=my_task.submit()final_state=future.wait(1)# Wait one second maxiffinal_state:# Take action if the task is doneresult=final_state.result()else:...# Task action if the task is still running
You may also use the wait_for=[] parameter when calling a task, specifying upstream task dependencies. This enables you to control task execution order for tasks that do not share data dependencies.
@taskdeftask_a():pass@taskdeftask_b():pass@taskdeftask_c():pass@taskdeftask_d():pass@flowdefmy_flow():a=task_a.submit()b=task_b.submit()# Wait for task_a and task_b to completec=task_c.submit(wait_for=[a,b])# task_d will wait for task_c to complete# Note: If waiting for one task it must still be in a list.d=task_d(wait_for=[c])
The simplest pattern for writing a flow is either only using tasks or only using pure Python functions. When you need to mix the two, use .result().
Using only tasks:
fromprefectimportflow,task@taskdefsay_hello(name):returnf"Hello {name}!"@taskdefsay_nice_to_meet_you(hello_greeting):returnf"{hello_greeting} Nice to meet you :)"@flowdefhello_world():hello=say_hello.submit("Marvin")nice_to_meet_you=say_nice_to_meet_you.submit(hello)hello_world()
Using only Python functions:
fromprefectimportflow,taskdefsay_hello(name):returnf"Hello {name}!"defsay_nice_to_meet_you(hello_greeting):returnf"{hello_greeting} Nice to meet you :)"@flowdefhello_world():# because this is just a Python function, calls will not be trackedhello=say_hello("Marvin")nice_to_meet_you=say_nice_to_meet_you(hello)hello_world()
Mixing tasks and Python functions:
fromprefectimportflow,taskdefsay_hello_extra_nicely_to_marvin(hello):# not a task or flow!ifhello=="Hello Marvin!":return"HI MARVIN!"returnhello@taskdefsay_hello(name):returnf"Hello {name}!"@taskdefsay_nice_to_meet_you(hello_greeting):returnf"{hello_greeting} Nice to meet you :)"@flowdefhello_world():# run a task and get the resulthello=say_hello.submit("Marvin").result()# not calling a task or flowspecial_greeting=say_hello_extra_nicely_to_marvin(hello)# pass our modified greeting back into a tasknice_to_meet_you=say_nice_to_meet_you.submit(special_greeting)print(nice_to_meet_you.result())hello_world()
Note that .result() also limits Prefect's ability to track task dependencies. In the "mixed" example above, Prefect will not be aware that say_hello is upstream of nice_to_meet_you.
Calling .result() is blocking
When calling .result(), be mindful your flow function will have to wait until the task run is completed before continuing.
fromprefectimportflow,task@taskdefsay_hello(name):returnf"Hello {name}!"@taskdefdo_important_stuff():print("Doing lots of important stuff!")@flowdefhello_world():# blocks until `say_hello` has finishedresult=say_hello.submit("Marvin").result()do_important_stuff.submit()hello_world()
The DaskTaskRunner is a parallel task runner that submits tasks to the dask.distributed scheduler. By default, a temporary Dask cluster is created for the duration of the flow run. If you already have a Dask cluster running, either local or cloud hosted, you can provide the connection URL via the address kwarg.
Make sure the prefect-dask collection is installed: pip install prefect-dask.
In your flow code, import DaskTaskRunner from prefect_dask.task_runners.
Assign it as the task runner when the flow is defined using the task_runner=DaskTaskRunner argument.
For example, this flow uses the DaskTaskRunner configured to access an existing Dask cluster at http://my-dask-cluster.
DaskTaskRunner accepts the following optional parameters:
Parameter
Description
address
Address of a currently running Dask scheduler.
cluster_class
The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster"), or the class itself.
cluster_kwargs
Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster.
adapt_kwargs
Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
Note that, because the DaskTaskRunner uses multiprocessing, calls to flows
in scripts must be guarded with if __name__ == "__main__": or you will encounter
warnings and errors.
If you don't provide the address of a Dask scheduler, Prefect creates a temporary local cluster automatically. The number of workers used is based on the number of cores on your machine. The default provides a mix of processes and threads that should work well for
most workloads. If you want to specify this explicitly, you can pass values for n_workers or threads_per_worker to cluster_kwargs.
# Use 4 worker processes, each with 2 threadsDaskTaskRunner(cluster_kwargs={"n_workers":4,"threads_per_worker":2})
The DaskTaskRunner is capable of creating a temporary cluster using any of Dask's cluster-manager options. This can be useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure, you need to provide a cluster_class. This can be:
A string specifying the import path to the cluster class (for example, "dask_cloudprovider.aws.FargateCluster")
The cluster class itself
A function for creating a custom cluster.
You can also configure cluster_kwargs, which takes a dictionary of keyword arguments to pass to cluster_class when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster with 4 workers running with an image named my-prefect-image:
Multiple Prefect flow runs can all use the same existing Dask cluster. You
might manage a single long-running Dask cluster (maybe using the Dask
Helm Chart) and
configure flows to connect to it during execution. This has a few downsides
when compared to using a temporary cluster (as described above):
All workers in the cluster must have dependencies installed for all flows you
intend to run.
Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues.
That said, you may prefer managing a single long-running cluster.
To configure a DaskTaskRunner to connect to an existing cluster, pass in the address of the
scheduler to the address argument:
# Connect to an existing cluster running at a specified addressDaskTaskRunner(address="tcp://...")
One nice feature of using a DaskTaskRunner is the ability to scale adaptively
to the workload. Instead of specifying n_workers as a fixed number, this lets
you specify a minimum and maximum number of workers to use, and the dask
cluster will scale up and down as needed.
To do this, you can pass adapt_kwargs to DaskTaskRunner. This takes the
following fields:
maximum (int or None, optional): the maximum number of workers to scale
to. Set to None for no maximum.
minimum (int or None, optional): the minimum number of workers to scale
to. Set to None for no minimum.
For example, here we configure a flow to run on a FargateCluster scaling up
to at most 10 workers.
importdaskfromprefectimportflow,taskfromprefect_dask.task_runnersimportDaskTaskRunner@taskdefshow(x):print(x)# Create a `LocalCluster` with some resource annotations# Annotations are abstract in dask and not inferred from your system.# Here, we claim that our system has 1 GPU and 1 process available per worker@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers":1,"resources":{"GPU":1,"process":1}}))defmy_flow():withdask.annotate(resources={'GPU':1}):future=show(0)# this task requires 1 GPU resource on a workerwithdask.annotate(resources={'process':1}):# These tasks each require 1 process on a worker; because we've # specified that our cluster has 1 process per worker and 1 worker,# these tasks will run sequentiallyfuture=show(1)future=show(2)future=show(3)if__name__=="__main__":my_flow()
The RayTaskRunner — installed separately as a Prefect Collection — is a parallel task runner that submits tasks to Ray. By default, a temporary Ray instance is created for the duration of the flow run. If you already have a Ray instance running, you can provide the connection URL via an address argument.
Remote storage and Ray tasks
We recommend configuring remote storage for task execution with the RayTaskRunner. This ensures tasks executing in Ray have access to task result storage, particularly when accessing a Ray instance outside of your execution environment.
To configure your flow to use the RayTaskRunner:
Make sure the prefect-ray collection is installed: pip install prefect-ray.
In your flow code, import RayTaskRunner from prefect_ray.task_runners.
Assign it as the task runner when the flow is defined using the task_runner=RayTaskRunner argument.
For example, this flow uses the RayTaskRunner configured to access an existing Ray instance at ray://192.0.2.255:8786.
RayTaskRunner accepts the following optional parameters:
Parameter
Description
address
Address of a currently running Ray instance, starting with the ray:// URI.
init_kwargs
Additional kwargs to use when calling ray.init.
Note that Ray Client uses the ray:// URI to indicate the address of a Ray instance. If you don't provide the address of a Ray instance, Prefect creates a temporary instance automatically.
Ray environment limitations
While we're excited about adding support for parallel task execution via Ray to Prefect, there are some inherent limitations with Ray you should be aware of:
Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from pip alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with conda. See the Ray documentation for instructions.