Sébastien
12/10/2020, 12:53 AMschedule = CronSchedule("0 * * * *", start_date=datetime.now())
with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(cluster_kwargs={"processes": True, "n_workers": 2, "threads_per_worker": 1})) as flow:
r1 = task1()
r2 = task2.map(r)
task3(flatten(r2))
flow.register(project_name="Q")
Zanie
LocalDaskExecutor(scheduler="processes", num_workers=8)
Sébastien
12/10/2020, 8:38 PMRunning with LocalDaskExecutor(scheduler='processes', num_workers=8)
), but the mapped task is still executing one by one.CPU(s): 2
Thread(s) per core: 1
$ sysctl hw.physicalcpu hw.logicalcpu
hw.physicalcpu: 6
hw.logicalcpu: 12
Mac picked it up:
[2020-12-10 20:50:02,059] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-12-10 20:50:02,317] INFO - agent | Deploying flow run fd6ea055-0233-4aca-a0a0-8b2c4f3b02e1
Logs:
Running with LocalDaskExecutor(scheduler='processes', num_workers=8)
Exact same result, just one task is running.DaskExecutor()
without args on the Mac with a LocalAgent (prefect agent local start
), same result — single worker. Mapped task running one at a time. Unmapped tasks without dependencies also ran one by one.Zanie
Sébastien
12/10/2020, 9:26 PMZanie
Sébastien
12/10/2020, 9:28 PMZanie
flow.run()
locally?import time
import random
import prefect
from prefect import Flow, task
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
tasks = [i for i in range(3)]
@task
def display(i):
sleep = random.randint(0, 3)
time.sleep(sleep)
<http://prefect.context.logger.info|prefect.context.logger.info>(f"In {i} after {sleep} seconds")
with Flow("mapped-sequential", executor=LocalExecutor()) as flow:
display.map(tasks)
flow.run()
with Flow("mapped-parallel", executor=LocalDaskExecutor()) as flow:
display.map(tasks)
flow.run()
❯ python example-mapped-parallel.py
[2020-12-10 15:39:23] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-sequential'
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Starting task run...
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
[2020-12-10 15:39:25] INFO - prefect.display[0] | In 0 after 2 seconds
[2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
[2020-12-10 15:39:26] INFO - prefect.display[1] | In 1 after 1 seconds
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.display[2] | In 2 after 0 seconds
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:27] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-12-10 15:39:27] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-parallel'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
[2020-12-10 15:39:28] INFO - prefect.display[2] | In 2 after 1 seconds
[2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:28] INFO - prefect.display[0] | In 0 after 1 seconds
[2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:30] INFO - prefect.display[1] | In 1 after 3 seconds
[2020-12-10 15:39:30] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:30] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Sébastien
12/10/2020, 9:40 PM[2020-12-10 22:37:54+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:38:00+00:00
[2020-12-10 22:48:31+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T21:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Q'
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
schedule = CronSchedule("49 * * * *")
with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(scheduler="processes", num_workers=8)) as flow:
r1 = task1()
# Each sublist is meant to be a task run
r2 = task2.map(r1)
# Last task takes all entries as a single list
task3(flatten(r2))
flow.run()
flow.run()
on my original (real) code, without CronSchedule, does work with 8 workers.
Using flow.schedule()
with CronSchedule does not (even when the code runs on exactly the same Mac).Zanie
DaskExecutor()
without args?config.toml
file to DEBUG
e.g.
[logging]
# The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
level = "DEBUG"
josh
12/10/2020, 10:31 PMflow.executor
is only going to be used if the flow has a flow.run_config
. Right now flows still have a default LocalEnvironment
and in that case it’s trying to use the executor in the environment. If you set a run config then the flow.executor
is used properly. This is part of the ongoing deprecation cycle for environments (and this mismatch issue will be resolved in the next release). So one of these two options should work for you:
from prefect.run_configs import LocalRun
flow.executor = DaskExecutor(...) / LocalDaskExecutor(...)
flow.run_config = LocalRun()
or using the soon to be deprecated environments
from prefect.environments import LocalEnvironment
flow.environment = LocalEnvironment(executor=DaskExecutor(...) / LocalDaskExecutor(...))
For more information on flow run configuration check out this section in the docs!Sébastien
12/10/2020, 10:50 PMrun_config
is a vague name, and since we already pass executor=X()
to the flow, you'd expect any environment to default to use the defined executor rather than requiring run_config
which isn't mentioned in the tutorial or Dask setup.run_config
never once came up.Jim Crist-Harif
12/10/2020, 10:55 PMSébastien
12/10/2020, 10:55 PMRunConfig objects define where and how a flow run should be executedThen what's the difference with an executor? And when using Cloud, which has all known agents and picks a proper runner, why's this relevant? And why would the default,
UniversalRun
, not choose to honor the passed executor
for that flow?Jim Crist-Harif
12/10/2020, 10:56 PMSébastien
12/10/2020, 10:56 PMJim Crist-Harif
12/10/2020, 10:57 PMAnd why would the default,This is one of those places where the docs are slightly leading the latest release (apologies). The docs are for the new system (which is currently available, but not on by default til next release), which is causing you issues., not choose to honor the passedUniversalRun
for that flow?executor
Sébastien
12/10/2020, 10:58 PMJim Crist-Harif
12/10/2020, 10:59 PMenvironment
based configuration with an executor
on the flow).executor
for things to work.Sébastien
12/10/2020, 11:00 PMJim Crist-Harif
12/10/2020, 11:01 PMSébastien
12/10/2020, 11:52 PMflow.run()
. But when I used flow.register()
instead, I'm getting this error:cluster_kwargs={
# ...
"scheduler_options": {"idle_timeout": "5s"},
"worker_options": {"death_timeout": "5s"},
}