Trying to understand when/why Prefect 2 is decidin...
# prefect-community
b
Trying to understand when/why Prefect 2 is deciding to run my tasks in parallel here:
Copy code
from time import sleep
from prefect import task, flow

@task
def t(n):
    print(f"Starting: {n}")
    sleep(3)
    print(f"...done: {n}")
    return n * 10

@flow
def f():
    # These are run serially:
    t(1)
    t(2)

    # These are run in parallel (including t(6)!):
    t.map([3, 4, 5])
    t(6)
^ It makes sense to me that
t.map([3, 4, 5])
would run in parallel, but why is
t(6)
run in parallel with them?.. Does the use of
.map
cause the flow to switch from a serial to parallel "execution mode" of some kind?..
1
Here is the output of running the flow
f()
, in case it's useful:
Copy code
In [11]: f()
21:37:20.615 | INFO    | prefect.engine - Created flow run 'illegal-ostrich' for flow 'f'
21:37:21.470 | INFO    | Flow run 'illegal-ostrich' - Created task run 't-41644de6-0' for task 't'
21:37:21.472 | INFO    | Flow run 'illegal-ostrich' - Executing 't-41644de6-0' immediately...
Starting: 1
...done: 1
21:37:24.856 | INFO    | Task run 't-41644de6-0' - Finished in state Completed()
21:37:24.966 | INFO    | Flow run 'illegal-ostrich' - Created task run 't-41644de6-1' for task 't'
21:37:24.968 | INFO    | Flow run 'illegal-ostrich' - Executing 't-41644de6-1' immediately...
Starting: 2
...done: 2
21:37:28.362 | INFO    | Task run 't-41644de6-1' - Finished in state Completed()
21:37:28.534 | INFO    | Flow run 'illegal-ostrich' - Created task run 't-41644de6-3' for task 't'
21:37:28.536 | INFO    | Flow run 'illegal-ostrich' - Submitted task run 't-41644de6-3' for execution.
21:37:28.731 | INFO    | Flow run 'illegal-ostrich' - Created task run 't-41644de6-4' for task 't'
21:37:28.733 | INFO    | Flow run 'illegal-ostrich' - Submitted task run 't-41644de6-4' for execution.
21:37:28.738 | INFO    | Flow run 'illegal-ostrich' - Created task run 't-41644de6-5' for task 't'
21:37:28.740 | INFO    | Flow run 'illegal-ostrich' - Executing 't-41644de6-5' immediately...
21:37:28.751 | INFO    | Flow run 'illegal-ostrich' - Created task run 't-41644de6-2' for task 't'
21:37:28.753 | INFO    | Flow run 'illegal-ostrich' - Submitted task run 't-41644de6-2' for execution.
Starting: 4
Starting: 6
Starting: 5
Starting: 3
...done: 4
...done: 6
...done: 5
21:37:32.041 | INFO    | Task run 't-41644de6-3' - Finished in state Completed()
...done: 3
21:37:32.181 | INFO    | Task run 't-41644de6-4' - Finished in state Completed()
21:37:32.184 | INFO    | Task run 't-41644de6-5' - Finished in state Completed()
21:37:32.370 | INFO    | Task run 't-41644de6-2' - Finished in state Completed()
21:37:32.631 | INFO    | Flow run 'illegal-ostrich' - Finished in state Completed('All states completed.')
Out[11]: 
[Completed(message=None, type=COMPLETED, result=10),
 Completed(message=None, type=COMPLETED, result=20),
 Completed(message=None, type=COMPLETED, result=30),
 Completed(message=None, type=COMPLETED, result=40),
 Completed(message=None, type=COMPLETED, result=50),
 Completed(message=None, type=COMPLETED, result=60)]
^ That's not very readable without colour... here:
Ah, I may have figured it out, actually. I notice that for the
t(1)
and
t(2)
which it ran serially, the logs say e.g. "Executing 't-41644de6-0' *immediately*". Whereas for the mapped ones, it says e.g. "*Submitted* task run 't-41644de6-3' for execution". But for
t(6)
, it says "executing immediately", just like
t(1)
and
t(2)
. So I assume what's going on is that the non-mapped ones are all being run in one thread, and the mapped ones are being run in separate threads. I verified that if I add a
t(7)
after the
t(6)
, those two tasks are run serially with respect to each other.
...hmm,
threading.current_thread
and
threading.enumerate
showed that I'm not quite correct that the non-mapped ones are run in the same thread. But anyway, it certainly seems true that non-mapped task runs are run serially with respect to other non-mapped task runs.
Looking at
help(f.task_runner)
, I see:
Copy code
class ConcurrentTaskRunner(BaseTaskRunner)
 |  A concurrent task runner that allows tasks to switch when blocking on IO.
 |  Synchronous tasks will be submitted to a thread pool maintained by `anyio`.
So that's kind of weird. It "allows tasks to switch when blocking on IO", but decided not to allow the non-mapped tasks to switch. 🤷 Anyway, sounds like I probably shouldn't rely on this "non-mapped tasks are always run serially" behaviour, might be fixed/changed in a future Prefect version?
Ooh! SequentialTaskRunner does exactly what I want. It makes
t(6)
wait for the mapped tasks to finish; and it also still runs the mapped tasks in parallel. Perfect!
Copy code
from time import sleep
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner

@task
def t(n):
    print(f"Starting: {n}")
    sleep(3)
    print(f"...done: {n}")
    return n * 10

@flow(task_runner=SequentialTaskRunner)
def f():
    # These are run serially:
    t(1)
    t(2)

    # These are run in parallel!
    t.map([3, 4, 5])

    # These are run serially (*after* the mapped tasks finish):
    t(6)
    t(7)

f()
r
Sounds like you worked it out. 🙂
ConcurrentTaskRunner
is the default, but
SequentialTaskRunner
is there when you need it.
b
Yeah, almost! But when I ran some actual code with this, it turns out it ran all the mapped tasks in parallel immediately. And there were 2000 or so of them, so the flow ran out of memory and crashed. 😅 Is there a way to call
Task.map
while using
SequentialTaskRunner
and somehow specify how many parallel tasks to run at any given time?.. Like, I assume a ThreadPoolExecutor is involved somewhere, and I want to tell it how many max workers to use. 🤔
Looking at the source, in src/prefect/engine.py, in
begin_task_map
, it says:
Copy code
# Maintain the order of the task runs when using the sequential task runner
    runner = task_runner if task_runner else flow_run_context.task_runner
    if runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
        return [await task_run() for task_run in task_runs]
...where the entries of
task_runs
are defined like this:
Copy code
task_runs.append(
            partial(
                get_task_call_return_value,
                task=task,
                flow_run_context=flow_run_context,
                parameters=call_parameters,
                wait_for=wait_for,
                return_type=return_type,
                task_runner=task_runner,
                extra_task_inputs=task_inputs,
            )
        )
...so ok, that calls
get_task_call_return_value
, which calls
create_task_run_future
, which creates a background task:
Copy code
# Create and submit the task run in the background
    flow_run_context.background_tasks.start_soon(
        partial(
            create_task_run_then_submit,
            task=task,
            ...etc
...and then waits for it:
Copy code
# Track the task run future in the flow run context
    flow_run_context.task_run_futures.append(future)

    if task_runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
        await future._wait()
...and that
start_soon
looks like
GatherTaskGroup.start_soon
, which in turn calls
anyio.abc.TaskGroup.start_soon
.
Meanwhile, the thing being passed to
start_soon
is a partial of
create_task_run_then_submit
... which calls
create_task_run
and then
submit_task_run
... Ah ha! And
submit_task_run
has:
Copy code
if task_runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL:
        <http://logger.info|logger.info>(f"Executing {task_run.name!r} immediately...")
...which I've seen in logging output. And then it calls `task_runner.submit`; and
SequentialTaskRunner.submit
immediately calls
result = await call()
.
So okay, that makes sense. With SequentialTaskRunner, it will always run all mapped tasks immediately, in parallel. So there is no ThreadPoolExecutor, unless
anyio
is using one under the hood. @Ryan Peden do you think it's worth opening a ticket about this, like asking whether an option could be added to SequentialTaskRunner for "maximum number of mapped tasks to run in parallel"? In other words, if I said:
Copy code
@flow(task_runner=SequentialTaskRunner(max_workers=5))
def my_flow():
    some_task.map(range(10))
...then it would immediately start 5 mapped runs of
some_task
, and queue another 5. So all 10 would eventually run, but only 5 at any given time.
I'm working around this with
threading.Semaphore
, but that seems a bit hacky:
Copy code
from time import sleep
from threading import Semaphore
from prefect import task, flow, unmapped


@task
def t(n, semaphore: Semaphore):
    with semaphore:
        print(f"Starting: {n}")
        sleep(4)
        print(f"  ...done: {n}")
        return n * 100


@flow
def f():

    # Run up to 5 tasks in parallel, the rest are queued until other
    # tasks finish
    semaphore = Semaphore(5)
    t.map(range(10), unmapped(semaphore))


f()
The logs show the 10 tasks running in bursts of 5.
...oh, maybe this is the intended way to control this? 🤔 https://docs.prefect.io/latest/concepts/tasks/?h=conc#task-run-concurrency-limits
r
If you don't mind using tags, then Prefect's task concurrency limit is the easiest way to achieve your goal. And if you need more control, you might like anyio capacity limiters: https://anyio.readthedocs.io/en/stable/synchronization.html#capacity-limiters Since Prefect uses anyio internally you won't need to pip install it.
👀 1
b
Hmmm, seems like CapacityLimiter can only be instantiated inside an async context:
Copy code
In [1]: from anyio import CapacityLimiter

In [2]: CapacityLimiter(5)
---------------------------------------------------------------------------
 ...stack trace...
AsyncLibraryNotFoundError: unknown async library, or not in async context
If you don't mind using tags, then Prefect's task concurrency limit is the easiest way to achieve your goal.
Gotcha 👍
The task concurrency thing is definitely cool, because it applies across runs, and even across tasks, I believe. Not quite what I needed here, but good to know about.
r
Ahh - sorry about that. I've used
CapacityLimiter
a fair amount but I must never have tried it outside of async code.
b
That's ok. It seems like Semaphore is probably the best available solution here, and it seems like the core of the engine is designed more around async than threads, so maybe it would be too big of an ask, for me to open a ticket or request about some kind of "max workers" to apply to mapped tasks + SequentialTaskRunner. 🤔
Although asyncio does have
run_in_executor
so... I don't know, it seems to me like it wouldn't be conceptually unreasonable to have some kind of
mapped_task_runner
argument to
flow
, similar to
task_runner
, which allows some configuration of how mapped tasks are run. It's already the case that SequentialTaskRunner doesn't actually cause mapped tasks to run sequentially, so it seems like there's some design space to explore there. But if nobody wants it but me, then it's probably not worth asking for. 🙂