s

    Sébastien

    1 year ago
    Current test:
    schedule = CronSchedule("0 * * * *", start_date=datetime.now())
    
    with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(cluster_kwargs={"processes": True, "n_workers": 2, "threads_per_worker": 1})) as flow:
        r1 = task1()
        r2 = task2.map(r)
        task3(flatten(r2))
    
    flow.register(project_name="Q")
    Michael Adkins

    Michael Adkins

    1 year ago
    Hi! Could you try creating your executor with
    LocalDaskExecutor(scheduler="processes", num_workers=8)
    s

    Sébastien

    1 year ago
    @Michael Adkins Using that, it reflects in the logging (
    Running with LocalDaskExecutor(scheduler='processes', num_workers=8)
    ), but the mapped task is still executing one by one.
    I'm running this last test on a VPS intentionally provisioned with 2 cores, 1 thread per core. Once it can run on 2 cores instead of only 1, I'll scale it up.
    CPU(s):              2
    Thread(s) per core:  1
    Tried running on a physical machine as well, a Mac with
    $ sysctl hw.physicalcpu hw.logicalcpu
    hw.physicalcpu: 6
    hw.logicalcpu: 12
    Mac picked it up:
    [2020-12-10 20:50:02,059] INFO - agent | Found 1 flow run(s) to submit for execution.
    [2020-12-10 20:50:02,317] INFO - agent | Deploying flow run fd6ea055-0233-4aca-a0a0-8b2c4f3b02e1
    Logs:
    Running with LocalDaskExecutor(scheduler='processes', num_workers=8)
    Exact same result, just one task is running.
    Tried a standard
    DaskExecutor()
    without args on the Mac with a LocalAgent (
    prefect agent local start
    ), same result — single worker. Mapped task running one at a time. Unmapped tasks without dependencies also ran one by one.
    If anyone has any ideas, I'm all ears. I'm about to have to replace prefect if I can't get this sorted.
    Michael Adkins

    Michael Adkins

    1 year ago
    My apologies — I’ve responded in the wrong thread.
    s

    Sébastien

    1 year ago
    I figured, no worries
    Michael Adkins

    Michael Adkins

    1 year ago
    Well I’ve been off solving that logging issue 😄 could you please paste a minimal example of your code and I’ll test it locally?
    s

    Sébastien

    1 year ago
    Alright, I'll whip one up
    Michael Adkins

    Michael Adkins

    1 year ago
    Also, can you confirm you’re seeing this same issue with
    flow.run()
    locally?
    For example
    import time
    import random
    
    import prefect
    from prefect import Flow, task
    from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
    
    tasks = [i for i in range(3)]
    
    @task
    def display(i):
        sleep = random.randint(0, 3)
        time.sleep(sleep)
        <http://prefect.context.logger.info|prefect.context.logger.info>(f"In {i} after {sleep} seconds")
    
    
    with Flow("mapped-sequential", executor=LocalExecutor()) as flow:
        display.map(tasks)
    
    flow.run()
    
    
    with Flow("mapped-parallel", executor=LocalDaskExecutor()) as flow:
        display.map(tasks)
    
    flow.run()
    ❯ python example-mapped-parallel.py
    [2020-12-10 15:39:23] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-sequential'
    [2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Starting task run...
    [2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
    [2020-12-10 15:39:23] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
    [2020-12-10 15:39:25] INFO - prefect.display[0] | In 0 after 2 seconds
    [2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
    [2020-12-10 15:39:25] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
    [2020-12-10 15:39:26] INFO - prefect.display[1] | In 1 after 1 seconds
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
    [2020-12-10 15:39:27] INFO - prefect.display[2] | In 2 after 0 seconds
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
    [2020-12-10 15:39:27] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-12-10 15:39:27] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped-parallel'
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Starting task run...
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display': Finished task run for task with final state: 'Mapped'
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[2]': Starting task run...
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[1]': Starting task run...
    [2020-12-10 15:39:27] INFO - prefect.TaskRunner | Task 'display[0]': Starting task run...
    [2020-12-10 15:39:28] INFO - prefect.display[2] | In 2 after 1 seconds
    [2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[2]': Finished task run for task with final state: 'Success'
    [2020-12-10 15:39:28] INFO - prefect.display[0] | In 0 after 1 seconds
    [2020-12-10 15:39:28] INFO - prefect.TaskRunner | Task 'display[0]': Finished task run for task with final state: 'Success'
    [2020-12-10 15:39:30] INFO - prefect.display[1] | In 1 after 3 seconds
    [2020-12-10 15:39:30] INFO - prefect.TaskRunner | Task 'display[1]': Finished task run for task with final state: 'Success'
    [2020-12-10 15:39:30] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    s

    Sébastien

    1 year ago
    I did keep the CronSchedule in there, in order to make sure that's not interfering somehow (shouldn't, but you never know), and it only gave me this:
    [2020-12-10 22:37:54+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:38:00+00:00
    Oh wait, the timezone's messing up
    @Michael Adkins Now I keep getting this
    [2020-12-10 22:48:31+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T21:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Q'
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    [2020-12-10 22:49:00+0100] INFO - prefect.Q | Waiting for next scheduled run at 2020-12-10T22:49:00+00:00
    When the cron hits, it seems to spin up the 8 workers, then waits for the next cron instead of running the initial task.
    schedule = CronSchedule("49 * * * *")
    
    with Flow("Q", schedule=schedule, executor=LocalDaskExecutor(scheduler="processes", num_workers=8)) as flow:
        r1 = task1()
        # Each sublist is meant to be a task run
        r2 = task2.map(r1)
        # Last task takes all entries as a single list
        task3(flatten(r2))
    
    flow.run()
    @Michael Adkins Using
    flow.run()
    on my original (real) code, without CronSchedule, does work with 8 workers. Using
    flow.schedule()
    with CronSchedule does not (even when the code runs on exactly the same Mac).
    Michael Adkins

    Michael Adkins

    1 year ago
    I’m continuing to look into this!
    Can you try setting the flow executor to just
    DaskExecutor()
    without args?
    Can you also change the logging in your
    config.toml
    file to
    DEBUG
    e.g.
    [logging]
    # The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
    level = "DEBUG"
    j

    josh

    1 year ago
    Hi @Sébastien the 
    flow.executor
     is only going to be used if the flow has a 
    flow.run_config
    . Right now flows still have a default 
    LocalEnvironment
     and in that case it’s trying to use the executor in the environment. If you set a run config then the 
    flow.executor
     is used properly. This is part of the ongoing deprecation cycle for environments (and this mismatch issue will be resolved in the next release). So one of these two options should work for you:
    from prefect.run_configs import LocalRun
    
    flow.executor = DaskExecutor(...) / LocalDaskExecutor(...)
    flow.run_config = LocalRun()
    or using the soon to be deprecated environments
    from prefect.environments import LocalEnvironment
    
    flow.environment = LocalEnvironment(executor=DaskExecutor(...) / LocalDaskExecutor(...))
    For more information on flow run configuration check out this section in the docs!
    s

    Sébastien

    1 year ago
    @josh Thanks for the info, but wow that may need to be clarified in the docs.
    run_config
    is a vague name, and since we already pass
    executor=X()
    to the flow, you'd expect any environment to default to use the defined executor rather than requiring
    run_config
    which isn't mentioned in the tutorial or Dask setup.
    I may have misunderstood, and it's open source so I'm not complaining, but I spent hours on docs & rewriting the code in separate ways/approaches, and
    run_config
    never once came up.
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    I'm sorry you're having a frustrating experience here. You've caught us right in the middle of a transition (the docs in some places reflect the dev branch, which will be released in 3-4 days). We've tried to keep things accurate with both the latest release and the upcoming one, but some stuff has slipped through (as you've found).
    s

    Sébastien

    1 year ago
    Yeah, I still don't get it. Docs:
    RunConfig objects define where and how a flow run should be executed
    Then what's the difference with an executor? And when using Cloud, which has all known agents and picks a proper runner, why's this relevant? And why would the default,
    UniversalRun
    , not choose to honor the passed
    executor
    for that flow?
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    That said, run-config stuff is mentioned in both the tutorial and several places throughout the docs: https://docs.prefect.io/orchestration/flow_config/overview.html.
    s

    Sébastien

    1 year ago
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    And why would the default,
    UniversalRun
    , not choose to honor the passed
    executor
    for that flow?
    This is one of those places where the docs are slightly leading the latest release (apologies). The docs are for the new system (which is currently available, but not on by default til next release), which is causing you issues.
    s

    Sébastien

    1 year ago
    Gotcha. So for now, sit quiet, just define a LocalRun to make it use the executor, and wait until the update is official before reviewing/submitting docs PRs?
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    That'd work for now, although you're always welcome to submit PRs for changes you'd like to see. To help prevent others from following down the same path, I'll add a warning check that would have helped catch this early on (mixing of
    environment
    based configuration with an
    executor
    on the flow).
    With the 0.14.0 release, you'll be using the new system by default, so you'd only need to configure an
    executor
    for things to work.
    s

    Sébastien

    1 year ago
    Thanks for jumping in. And in the name of anyone affected, early thanks for adding the warning.
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    No problem. Please feel free to reach out if you run into any other issues.
    s

    Sébastien

    1 year ago
    @Jim Crist-Harif I set everything up: LocalRun + DaskExecutor + SSHCluster, and it worked perfectly with
    flow.run()
    . But when I used
    flow.register()
    instead, I'm getting this error:
    Any tips to get better logging on what's going wrong? All agents are running on the cloud, labels are set properly, the run worked when run manually (over multiple nodes via SSHCluster).
    I got it running, the port was still in use. Though the question stands - how would I add better logging for this particular case, to know that the real error is the address in use? And is it possible to make the flow auto-shutdown the SSHCluster, even if an error did occur (in this case it was a KeyboardInterrupt)? At the moment, the scheduled flow can't run more than once because DaskExecutor with SSHCluster doesn't auto-shutdown the cluster after use.
    Found the dask config needed for it. For reference, since this is all really useful stuff that anyone using prefect w/o dask knowledge can use:
    cluster_kwargs={
                # ...
                "scheduler_options": {"idle_timeout": "5s"},
                "worker_options": {"death_timeout": "5s"},
            }