Sébastien
12/10/2020, 12:53 AMSébastien
12/10/2020, 12:55 AMschedule = CronSchedule("0 * * * *", start_date=datetime.now())
with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(cluster_kwargs={"processes": True, "n_workers": 2, "threads_per_worker": 1})) as flow:
    r1 = task1()
    r2 = task2.map(r)
    task3(flatten(r2))
flow.register(project_name="Q")Zanie
LocalDaskExecutor(scheduler="processes", num_workers=8)Sébastien
12/10/2020, 8:38 PMRunning with LocalDaskExecutor(scheduler='processes', num_workers=8)Sébastien
12/10/2020, 8:41 PMSébastien
12/10/2020, 8:46 PMCPU(s):              2
Thread(s) per core:  1Sébastien
12/10/2020, 8:53 PM$ sysctl hw.physicalcpu hw.logicalcpu
hw.physicalcpu: 6
hw.logicalcpu: 12[2020-12-10 20:50:02,059] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-12-10 20:50:02,317] INFO - agent | Deploying flow run fd6ea055-0233-4aca-a0a0-8b2c4f3b02e1Running with LocalDaskExecutor(scheduler='processes', num_workers=8)Sébastien
12/10/2020, 9:13 PMDaskExecutor()prefect agent local startSébastien
12/10/2020, 9:14 PMSébastien
12/10/2020, 9:21 PMZanie
Sébastien
12/10/2020, 9:26 PMZanie
Sébastien
12/10/2020, 9:28 PMZanie
flow.run()Zanie
Zanie
import time
import random
import prefect
from prefect import Flow, task
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
tasks = [i for i in range(3)]
@task
def display(i):
    sleep = random.randint(0, 3)
    time.sleep(sleep)
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"In {i} after {sleep} seconds")
with Flow("mapped-sequential", executor=LocalExecutor()) as flow:
    display.map(tasks)
flow.run()
with Flow("mapped-parallel", executor=LocalDaskExecutor()) as flow:
    display.map(tasks)
flow.run()Zanie
❯ python example-mapped-parallel.py
[2020-12-10 15:39:23] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-sequential'
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Starting task run...
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
[2020-12-10 15:39:25] INFO - prefect.display[0] | In 0 after 2 seconds
[2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
[2020-12-10 15:39:26] INFO - prefect.display[1] | In 1 after 1 seconds
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.display[2] | In 2 after 0 seconds
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:27] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-12-10 15:39:27] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-parallel'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
[2020-12-10 15:39:28] INFO - prefect.display[2] | In 2 after 1 seconds
[2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:28] INFO - prefect.display[0] | In 0 after 1 seconds
[2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:30] INFO - prefect.display[1] | In 1 after 3 seconds
[2020-12-10 15:39:30] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:30] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeededSébastien
12/10/2020, 9:40 PM[2020-12-10 22:37:54+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:38:00+00:00Sébastien
12/10/2020, 9:40 PMSébastien
12/10/2020, 9:49 PM[2020-12-10 22:48:31+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T21:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Q'
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00Sébastien
12/10/2020, 9:49 PMSébastien
12/10/2020, 9:50 PMschedule = CronSchedule("49 * * * *")
with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(scheduler="processes", num_workers=8)) as flow:
    r1 = task1()
    # Each sublist is meant to be a task run
    r2 = task2.map(r1)
    # Last task takes all entries as a single list
    task3(flatten(r2))
flow.run()Sébastien
12/10/2020, 9:53 PMflow.run()flow.schedule()Zanie
Zanie
DaskExecutor()Zanie
config.tomlDEBUG[logging]
# The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
level = "DEBUG"josh
12/10/2020, 10:31 PMflow.executorflow.run_configLocalEnvironmentflow.executorfrom prefect.run_configs import LocalRun
flow.executor = DaskExecutor(...) / LocalDaskExecutor(...)
flow.run_config = LocalRun()from prefect.environments import LocalEnvironment
flow.environment = LocalEnvironment(executor=DaskExecutor(...) / LocalDaskExecutor(...))Sébastien
12/10/2020, 10:50 PMrun_configexecutor=X()run_configSébastien
12/10/2020, 10:51 PMrun_configJim Crist-Harif
12/10/2020, 10:55 PMSébastien
12/10/2020, 10:55 PMRunConfig objects define where and how a flow run should be executedThen what's the difference with an executor? And when using Cloud, which has all known agents and picks a proper runner, why's this relevant? And why would the default,
UniversalRunexecutorJim Crist-Harif
12/10/2020, 10:56 PMSébastien
12/10/2020, 10:56 PMJim Crist-Harif
12/10/2020, 10:57 PMAnd why would the default,This is one of those places where the docs are slightly leading the latest release (apologies). The docs are for the new system (which is currently available, but not on by default til next release), which is causing you issues., not choose to honor the passedUniversalRunfor that flow?executor
Sébastien
12/10/2020, 10:58 PMJim Crist-Harif
12/10/2020, 10:59 PMenvironmentexecutorJim Crist-Harif
12/10/2020, 11:00 PMexecutorSébastien
12/10/2020, 11:00 PMJim Crist-Harif
12/10/2020, 11:01 PMSébastien
12/10/2020, 11:52 PMflow.run()flow.register()Sébastien
12/10/2020, 11:52 PMSébastien
12/11/2020, 12:10 AMSébastien
12/11/2020, 1:00 AMcluster_kwargs={
            # ...
            "scheduler_options": {"idle_timeout": "5s"},
            "worker_options": {"death_timeout": "5s"},
        }