Ben Ayers-Glassey
04/09/2023, 4:41 AMfrom time import sleep
from prefect import task, flow
@task
def t(n):
print(f"Starting: {n}")
sleep(3)
print(f"...done: {n}")
return n * 10
@flow
def f():
# These are run serially:
t(1)
t(2)
# These are run in parallel (including t(6)!):
t.map([3, 4, 5])
t(6)
^ It makes sense to me that t.map([3, 4, 5])
would run in parallel, but why is t(6)
run in parallel with them?..
Does the use of .map
cause the flow to switch from a serial to parallel "execution mode" of some kind?..f()
, in case it's useful:
In [11]: f()
21:37:20.615 | INFO | prefect.engine - Created flow run 'illegal-ostrich' for flow 'f'
21:37:21.470 | INFO | Flow run 'illegal-ostrich' - Created task run 't-41644de6-0' for task 't'
21:37:21.472 | INFO | Flow run 'illegal-ostrich' - Executing 't-41644de6-0' immediately...
Starting: 1
...done: 1
21:37:24.856 | INFO | Task run 't-41644de6-0' - Finished in state Completed()
21:37:24.966 | INFO | Flow run 'illegal-ostrich' - Created task run 't-41644de6-1' for task 't'
21:37:24.968 | INFO | Flow run 'illegal-ostrich' - Executing 't-41644de6-1' immediately...
Starting: 2
...done: 2
21:37:28.362 | INFO | Task run 't-41644de6-1' - Finished in state Completed()
21:37:28.534 | INFO | Flow run 'illegal-ostrich' - Created task run 't-41644de6-3' for task 't'
21:37:28.536 | INFO | Flow run 'illegal-ostrich' - Submitted task run 't-41644de6-3' for execution.
21:37:28.731 | INFO | Flow run 'illegal-ostrich' - Created task run 't-41644de6-4' for task 't'
21:37:28.733 | INFO | Flow run 'illegal-ostrich' - Submitted task run 't-41644de6-4' for execution.
21:37:28.738 | INFO | Flow run 'illegal-ostrich' - Created task run 't-41644de6-5' for task 't'
21:37:28.740 | INFO | Flow run 'illegal-ostrich' - Executing 't-41644de6-5' immediately...
21:37:28.751 | INFO | Flow run 'illegal-ostrich' - Created task run 't-41644de6-2' for task 't'
21:37:28.753 | INFO | Flow run 'illegal-ostrich' - Submitted task run 't-41644de6-2' for execution.
Starting: 4
Starting: 6
Starting: 5
Starting: 3
...done: 4
...done: 6
...done: 5
21:37:32.041 | INFO | Task run 't-41644de6-3' - Finished in state Completed()
...done: 3
21:37:32.181 | INFO | Task run 't-41644de6-4' - Finished in state Completed()
21:37:32.184 | INFO | Task run 't-41644de6-5' - Finished in state Completed()
21:37:32.370 | INFO | Task run 't-41644de6-2' - Finished in state Completed()
21:37:32.631 | INFO | Flow run 'illegal-ostrich' - Finished in state Completed('All states completed.')
Out[11]:
[Completed(message=None, type=COMPLETED, result=10),
Completed(message=None, type=COMPLETED, result=20),
Completed(message=None, type=COMPLETED, result=30),
Completed(message=None, type=COMPLETED, result=40),
Completed(message=None, type=COMPLETED, result=50),
Completed(message=None, type=COMPLETED, result=60)]
t(1)
and t(2)
which it ran serially, the logs say e.g. "Executing 't-41644de6-0' *immediately*".
Whereas for the mapped ones, it says e.g. "*Submitted* task run 't-41644de6-3' for execution".
But for t(6)
, it says "executing immediately", just like t(1)
and t(2)
.
So I assume what's going on is that the non-mapped ones are all being run in one thread, and the mapped ones are being run in separate threads.
I verified that if I add a t(7)
after the t(6)
, those two tasks are run serially with respect to each other.threading.current_thread
and threading.enumerate
showed that I'm not quite correct that the non-mapped ones are run in the same thread.
But anyway, it certainly seems true that non-mapped task runs are run serially with respect to other non-mapped task runs.help(f.task_runner)
, I see:
class ConcurrentTaskRunner(BaseTaskRunner)
| A concurrent task runner that allows tasks to switch when blocking on IO.
| Synchronous tasks will be submitted to a thread pool maintained by `anyio`.
So that's kind of weird. It "allows tasks to switch when blocking on IO", but decided not to allow the non-mapped tasks to switch. 🤷
Anyway, sounds like I probably shouldn't rely on this "non-mapped tasks are always run serially" behaviour, might be fixed/changed in a future Prefect version?t(6)
wait for the mapped tasks to finish; and it also still runs the mapped tasks in parallel. Perfect!
from time import sleep
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
@task
def t(n):
print(f"Starting: {n}")
sleep(3)
print(f"...done: {n}")
return n * 10
@flow(task_runner=SequentialTaskRunner)
def f():
# These are run serially:
t(1)
t(2)
# These are run in parallel!
t.map([3, 4, 5])
# These are run serially (*after* the mapped tasks finish):
t(6)
t(7)
f()
Ryan Peden
04/09/2023, 5:25 AMConcurrentTaskRunner
is the default, but SequentialTaskRunner
is there when you need it.Ben Ayers-Glassey
04/11/2023, 2:01 AMTask.map
while using SequentialTaskRunner
and somehow specify how many parallel tasks to run at any given time?..
Like, I assume a ThreadPoolExecutor is involved somewhere, and I want to tell it how many max workers to use. 🤔begin_task_map
, it says:
# Maintain the order of the task runs when using the sequential task runner
runner = task_runner if task_runner else flow_run_context.task_runner
if runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
return [await task_run() for task_run in task_runs]
...where the entries of task_runs
are defined like this:
task_runs.append(
partial(
get_task_call_return_value,
task=task,
flow_run_context=flow_run_context,
parameters=call_parameters,
wait_for=wait_for,
return_type=return_type,
task_runner=task_runner,
extra_task_inputs=task_inputs,
)
)
get_task_call_return_value
, which calls create_task_run_future
, which creates a background task:
# Create and submit the task run in the background
flow_run_context.background_tasks.start_soon(
partial(
create_task_run_then_submit,
task=task,
...etc
...and then waits for it:
# Track the task run future in the flow run context
flow_run_context.task_run_futures.append(future)
if task_runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
await future._wait()
start_soon
looks like GatherTaskGroup.start_soon
, which in turn calls anyio.abc.TaskGroup.start_soon
.start_soon
is a partial of create_task_run_then_submit
... which calls create_task_run
and then submit_task_run
...
Ah ha! And submit_task_run
has:
if task_runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
<http://logger.info|logger.info>(f"Executing {task_run.name!r} immediately...")
...which I've seen in logging output.
And then it calls `task_runner.submit`; and SequentialTaskRunner.submit
immediately calls result = await call()
.anyio
is using one under the hood.
@Ryan Peden do you think it's worth opening a ticket about this, like asking whether an option could be added to SequentialTaskRunner for "maximum number of mapped tasks to run in parallel"?
In other words, if I said:
@flow(task_runner=SequentialTaskRunner(max_workers=5))
def my_flow():
some_task.map(range(10))
...then it would immediately start 5 mapped runs of some_task
, and queue another 5. So all 10 would eventually run, but only 5 at any given time.threading.Semaphore
, but that seems a bit hacky:
from time import sleep
from threading import Semaphore
from prefect import task, flow, unmapped
@task
def t(n, semaphore: Semaphore):
with semaphore:
print(f"Starting: {n}")
sleep(4)
print(f" ...done: {n}")
return n * 100
@flow
def f():
# Run up to 5 tasks in parallel, the rest are queued until other
# tasks finish
semaphore = Semaphore(5)
t.map(range(10), unmapped(semaphore))
f()
The logs show the 10 tasks running in bursts of 5.Ryan Peden
04/12/2023, 6:15 PMBen Ayers-Glassey
04/12/2023, 8:30 PMIn [1]: from anyio import CapacityLimiter
In [2]: CapacityLimiter(5)
---------------------------------------------------------------------------
...stack trace...
AsyncLibraryNotFoundError: unknown async library, or not in async context
If you don't mind using tags, then Prefect's task concurrency limit is the easiest way to achieve your goal.Gotcha 👍
Ryan Peden
04/12/2023, 8:32 PMCapacityLimiter
a fair amount but I must never have tried it outside of async code.Ben Ayers-Glassey
04/12/2023, 8:34 PMrun_in_executor
so... I don't know, it seems to me like it wouldn't be conceptually unreasonable to have some kind of mapped_task_runner
argument to flow
, similar to task_runner
, which allows some configuration of how mapped tasks are run.
It's already the case that SequentialTaskRunner doesn't actually cause mapped tasks to run sequentially, so it seems like there's some design space to explore there.
But if nobody wants it but me, then it's probably not worth asking for. 🙂