https://prefect.io logo
Title
c

Chris Goddard

09/01/2020, 1:40 AM
Hey folks! Any idea why I'm able to get parallelism in my flow when I pass the
LocalDaskExecutor()
to the
flow.run(..
method but when I pass it to
LocalEnvironment(executor=LocalDaskExecutor())
and then pass that to the flow constructor like this:
with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
I only get one task running at once. Here's my test code:
@task
def generate():
    return [x for x in range(0,40)]

@task
def log_sleep(x):
    logger = prefect.context.get('logger')
    time.sleep(5 + x)
    <http://logger.info|logger.info>(x)
    return x * x

@task
def collect(lst):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(lst)

schedule = Schedule(clocks=[DatesClock([pendulum.now() + timedelta(seconds=5)])])

with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:

    nums = generate()

    results = log_sleep.map(nums)

    x = collect(results)

flow.run()
this configuration results in:
c

Chris White

09/01/2020, 1:42 AM
Hey Chris! Yea this is a common confusion that we hope to fix up soon — the
environment
that you configure for your Flow is only used when running against a Prefect backend, so when you call
flow.run
the default
LocalExecutor
is used
👀 1
c

Chris Goddard

09/01/2020, 1:44 AM
ah ok - what would be the easiest way to test this locally? I'm trying to figure out why my flows are running slow on my server (everything is in a docker container on GCP). the flows get registered and then run from prefect cloud
c

Chris White

09/01/2020, 1:45 AM
What sort of slowdown are you seeing?
c

Chris Goddard

09/01/2020, 1:46 AM
mapped tasks running one by one instead of in parallel
working on finding a good example
c

Chris White

09/01/2020, 1:51 AM
Interesting - are they extremely fast running tasks?
c

Chris Goddard

09/01/2020, 1:56 AM
not particularly. it's a multi-level mapped flow
but
here's what I see when it's running on cloud:
the VAN[AR] is followed by all VAN[AR] mapped tasks
sorry one second - my local task isn't running
👍 1
here we go
you see how there are multiple "Gathering targets" processes running at once
which spawn multiple exports
my flow is constructed like this:
c

Chris White

09/01/2020, 2:01 AM
OK yup this all looks good to me
and you’re finding that you’re getting reduced parallelism on your server?
c

Chris Goddard

09/01/2020, 2:01 AM
none as far as I can tell
is there something I'm missing when I register the task? or something about running within a single docker container (not flow storage - local storage within a docker container)
c

Chris White

09/01/2020, 2:03 AM
No your code that I can see looks good and standard to me, I suspect this isn’t a Prefect-specific issue but rather a Dask issue. Can you SSH into your server and perform the same local run that you did above to see what happens?
c

Chris Goddard

09/01/2020, 2:04 AM
yeah let me try
easier said than done - I have a bunch of secrets in prefect cloud
c

Chris White

09/01/2020, 2:09 AM
So if you set the following two environment variables that should make it easier:
PREFECT__CLOUD__AUTH_TOKEN=YYYY
PREFECT__CLOUD__USE_LOCAL_SECRETS=false
🙌 1
c

Chris Goddard

09/01/2020, 2:10 AM
ok I think I might know what's going on - I think it might be using an older image version on the server. thanks - will keep digging in and let you know
c

Chris White

09/01/2020, 2:10 AM
ok great, sounds good!
c

Chris Goddard

09/01/2020, 2:18 AM
yup - that's what it was. machine had booted and pulled an older version of the image which wasn't configured. nothing like spending an hour on a
turn it off and on again
problem 😛
c

Chris White

09/01/2020, 2:19 AM
Haha classic, glad you figured it out
c

Chris Goddard

09/01/2020, 2:21 AM
thanks again for your help - understanding the environment/runtime executor was definitely helpful
c

Chris White

09/01/2020, 2:21 AM
Anytime!