Chris Goddard
09/01/2020, 1:40 AMLocalDaskExecutor()
to the flow.run(..
method but when I pass it to LocalEnvironment(executor=LocalDaskExecutor())
and then pass that to the flow constructor like this:
with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
I only get one task running at once.
Here's my test code:
@task
def generate():
return [x for x in range(0,40)]
@task
def log_sleep(x):
logger = prefect.context.get('logger')
time.sleep(5 + x)
<http://logger.info|logger.info>(x)
return x * x
@task
def collect(lst):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(lst)
schedule = Schedule(clocks=[DatesClock([pendulum.now() + timedelta(seconds=5)])])
with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
nums = generate()
results = log_sleep.map(nums)
x = collect(results)
flow.run()
Chris White
09/01/2020, 1:42 AMenvironment
that you configure for your Flow is only used when running against a Prefect backend, so when you call flow.run
the default LocalExecutor
is usedChris Goddard
09/01/2020, 1:44 AMChris White
09/01/2020, 1:45 AMChris Goddard
09/01/2020, 1:46 AMChris White
09/01/2020, 1:51 AMChris Goddard
09/01/2020, 1:56 AMChris White
09/01/2020, 2:01 AMChris Goddard
09/01/2020, 2:01 AMChris White
09/01/2020, 2:03 AMChris Goddard
09/01/2020, 2:04 AMChris White
09/01/2020, 2:09 AMPREFECT__CLOUD__AUTH_TOKEN=YYYY
PREFECT__CLOUD__USE_LOCAL_SECRETS=false
Chris Goddard
09/01/2020, 2:10 AMChris White
09/01/2020, 2:10 AMChris Goddard
09/01/2020, 2:18 AMturn it off and on again
problem 😛Chris White
09/01/2020, 2:19 AMChris Goddard
09/01/2020, 2:21 AMChris White
09/01/2020, 2:21 AM