https://prefect.io logo
s

Sébastien

12/10/2020, 12:53 AM
Current test:
Copy code
schedule = CronSchedule("0 * * * *", start_date=datetime.now())

with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(cluster_kwargs={"processes": True, "n_workers": 2, "threads_per_worker": 1})) as flow:
    r1 = task1()
    r2 = task2.map(r)
    task3(flatten(r2))

flow.register(project_name="Q")
z

Zanie

12/10/2020, 3:34 PM
Hi! Could you try creating your executor with
Copy code
LocalDaskExecutor(scheduler="processes", num_workers=8)
s

Sébastien

12/10/2020, 8:38 PM
@Zanie Using that, it reflects in the logging (
Running with LocalDaskExecutor(scheduler='processes', num_workers=8)
), but the mapped task is still executing one by one.
I'm running this last test on a VPS intentionally provisioned with 2 cores, 1 thread per core. Once it can run on 2 cores instead of only 1, I'll scale it up.
Copy code
CPU(s):              2
Thread(s) per core:  1
Tried running on a physical machine as well, a Mac with
Copy code
$ sysctl hw.physicalcpu hw.logicalcpu
hw.physicalcpu: 6
hw.logicalcpu: 12
Mac picked it up:
Copy code
[2020-12-10 20:50:02,059] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-12-10 20:50:02,317] INFO - agent | Deploying flow run fd6ea055-0233-4aca-a0a0-8b2c4f3b02e1
Logs:
Copy code
Running with LocalDaskExecutor(scheduler='processes', num_workers=8)
Exact same result, just one task is running.
Tried a standard
DaskExecutor()
without args on the Mac with a LocalAgent (
prefect agent local start
), same result — single worker. Mapped task running one at a time. Unmapped tasks without dependencies also ran one by one.
If anyone has any ideas, I'm all ears. I'm about to have to replace prefect if I can't get this sorted.
z

Zanie

12/10/2020, 9:26 PM
My apologies — I’ve responded in the wrong thread.
s

Sébastien

12/10/2020, 9:26 PM
I figured, no worries
z

Zanie

12/10/2020, 9:28 PM
Well I’ve been off solving that logging issue 😄 could you please paste a minimal example of your code and I’ll test it locally?
s

Sébastien

12/10/2020, 9:28 PM
Alright, I'll whip one up
z

Zanie

12/10/2020, 9:34 PM
Also, can you confirm you’re seeing this same issue with
flow.run()
locally?
For example
Copy code
import time
import random

import prefect
from prefect import Flow, task
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor

tasks = [i for i in range(3)]

@task
def display(i):
    sleep = random.randint(0, 3)
    time.sleep(sleep)
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"In {i} after {sleep} seconds")


with Flow("mapped-sequential", executor=LocalExecutor()) as flow:
    display.map(tasks)

flow.run()


with Flow("mapped-parallel", executor=LocalDaskExecutor()) as flow:
    display.map(tasks)

flow.run()
Copy code
❯ python example-mapped-parallel.py
[2020-12-10 15:39:23] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-sequential'
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Starting task run...
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
[2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
[2020-12-10 15:39:25] INFO - prefect.display[0] | In 0 after 2 seconds
[2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
[2020-12-10 15:39:26] INFO - prefect.display[1] | In 1 after 1 seconds
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.display[2] | In 2 after 0 seconds
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:27] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-12-10 15:39:27] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-parallel'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
[2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
[2020-12-10 15:39:28] INFO - prefect.display[2] | In 2 after 1 seconds
[2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:28] INFO - prefect.display[0] | In 0 after 1 seconds
[2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:30] INFO - prefect.display[1] | In 1 after 3 seconds
[2020-12-10 15:39:30] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
[2020-12-10 15:39:30] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
s

Sébastien

12/10/2020, 9:40 PM
I did keep the CronSchedule in there, in order to make sure that's not interfering somehow (shouldn't, but you never know), and it only gave me this:
Copy code
[2020-12-10 22:37:54+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:38:00+00:00
Oh wait, the timezone's messing up
@Zanie Now I keep getting this
Copy code
[2020-12-10 22:48:31+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T21:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Q'
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
[2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
When the cron hits, it seems to spin up the 8 workers, then waits for the next cron instead of running the initial task.
Copy code
schedule = CronSchedule("49 * * * *")

with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(scheduler="processes", num_workers=8)) as flow:
    r1 = task1()
    # Each sublist is meant to be a task run
    r2 = task2.map(r1)
    # Last task takes all entries as a single list
    task3(flatten(r2))

flow.run()
@Zanie Using
flow.run()
on my original (real) code, without CronSchedule, does work with 8 workers. Using
flow.schedule()
with CronSchedule does not (even when the code runs on exactly the same Mac).
z

Zanie

12/10/2020, 10:16 PM
I’m continuing to look into this!
Can you try setting the flow executor to just
DaskExecutor()
without args?
Can you also change the logging in your
config.toml
file to
DEBUG
e.g.
Copy code
[logging]
# The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
level = "DEBUG"
j

josh

12/10/2020, 10:31 PM
Hi @Sébastien the 
flow.executor
 is only going to be used if the flow has a 
flow.run_config
. Right now flows still have a default 
LocalEnvironment
 and in that case it’s trying to use the executor in the environment. If you set a run config then the 
flow.executor
 is used properly. This is part of the ongoing deprecation cycle for environments (and this mismatch issue will be resolved in the next release). So one of these two options should work for you:
Copy code
from prefect.run_configs import LocalRun

flow.executor = DaskExecutor(...) / LocalDaskExecutor(...)
flow.run_config = LocalRun()
or using the soon to be deprecated environments
Copy code
from prefect.environments import LocalEnvironment

flow.environment = LocalEnvironment(executor=DaskExecutor(...) / LocalDaskExecutor(...))
For more information on flow run configuration check out this section in the docs!
s

Sébastien

12/10/2020, 10:50 PM
@josh Thanks for the info, but wow that may need to be clarified in the docs.
run_config
is a vague name, and since we already pass
executor=X()
to the flow, you'd expect any environment to default to use the defined executor rather than requiring
run_config
which isn't mentioned in the tutorial or Dask setup.
I may have misunderstood, and it's open source so I'm not complaining, but I spent hours on docs & rewriting the code in separate ways/approaches, and
run_config
never once came up.
j

Jim Crist-Harif

12/10/2020, 10:55 PM
I'm sorry you're having a frustrating experience here. You've caught us right in the middle of a transition (the docs in some places reflect the dev branch, which will be released in 3-4 days). We've tried to keep things accurate with both the latest release and the upcoming one, but some stuff has slipped through (as you've found).
s

Sébastien

12/10/2020, 10:55 PM
Yeah, I still don't get it. Docs:
RunConfig objects define where and how a flow run should be executed
Then what's the difference with an executor? And when using Cloud, which has all known agents and picks a proper runner, why's this relevant? And why would the default,
UniversalRun
, not choose to honor the passed
executor
for that flow?
j

Jim Crist-Harif

12/10/2020, 10:56 PM
That said, run-config stuff is mentioned in both the tutorial and several places throughout the docs: https://docs.prefect.io/orchestration/flow_config/overview.html.
s

Sébastien

12/10/2020, 10:56 PM
j

Jim Crist-Harif

12/10/2020, 10:57 PM
And why would the default,
UniversalRun
, not choose to honor the passed
executor
for that flow?
This is one of those places where the docs are slightly leading the latest release (apologies). The docs are for the new system (which is currently available, but not on by default til next release), which is causing you issues.
s

Sébastien

12/10/2020, 10:58 PM
Gotcha. So for now, sit quiet, just define a LocalRun to make it use the executor, and wait until the update is official before reviewing/submitting docs PRs?
j

Jim Crist-Harif

12/10/2020, 10:59 PM
That'd work for now, although you're always welcome to submit PRs for changes you'd like to see. To help prevent others from following down the same path, I'll add a warning check that would have helped catch this early on (mixing of
environment
based configuration with an
executor
on the flow).
With the 0.14.0 release, you'll be using the new system by default, so you'd only need to configure an
executor
for things to work.
s

Sébastien

12/10/2020, 11:00 PM
Thanks for jumping in. And in the name of anyone affected, early thanks for adding the warning.
👍 1
j

Jim Crist-Harif

12/10/2020, 11:01 PM
No problem. Please feel free to reach out if you run into any other issues.
s

Sébastien

12/10/2020, 11:52 PM
@Jim Crist-Harif I set everything up: LocalRun + DaskExecutor + SSHCluster, and it worked perfectly with
flow.run()
. But when I used
flow.register()
instead, I'm getting this error:
Any tips to get better logging on what's going wrong? All agents are running on the cloud, labels are set properly, the run worked when run manually (over multiple nodes via SSHCluster).
I got it running, the port was still in use. Though the question stands - how would I add better logging for this particular case, to know that the real error is the address in use? And is it possible to make the flow auto-shutdown the SSHCluster, even if an error did occur (in this case it was a KeyboardInterrupt)? At the moment, the scheduled flow can't run more than once because DaskExecutor with SSHCluster doesn't auto-shutdown the cluster after use.
Found the dask config needed for it. For reference, since this is all really useful stuff that anyone using prefect w/o dask knowledge can use:
Copy code
cluster_kwargs={
            # ...
            "scheduler_options": {"idle_timeout": "5s"},
            "worker_options": {"death_timeout": "5s"},
        }