Hey everyone. I'm running into some trouble runnin...
# ask-community
m
Hey everyone. I'm running into some trouble running a prefect flow. It's a very simple one (converting json files to parquet). The problem is that when I try to run the flow (flow.run(...)) the flow doesn't work. I get to seet the "Saving status ..." log but the df.to_parquet never starts processing (looking at the dask dashboard). And it finally finishes with exception below. But when I run the task directly (to_parquet.run(...)) setting up the local dask cluster by myself it works perfectly. It's probably a rookie mistake but do you have any ideas what I'm doing wrong here? I'm using prefect 0.14.7 and dask 2014.4.1
j
Hi @Milton Tavares Neto - is there a reason the flow is inside the convert_to_parquet_flow() function?
Also, would you mind moving the code-block into the thread of this message? It makes it easier for us to scroll through if initial messages are kept short. Thanks!
1
m
no particular reason actually. Is this causing the issue?
k
Hey @Milton Tavares Neto, just a friendly reminder to have code blocks in threads next time. Something is a bit off here since you named the
Flow
as the
convert_flow
variable in the
with
statement.
flow.executor
should probably be
convert_flow.executor
and you should
return conver_flow
probably.
m
Copy code
SCHEMA = {...}
@task(timeout=10800)
def to_parquet(dumps_path, status_table, blocksize="128MB"):
    json_files_path = os.path.join(dumps_path, 'dump', status_table, '*')
    bag = db.read_text(json_files_path, blocksize=blocksize).map(convert_to_json)
    df = bag.to_dataframe(meta=SCHEMA)
    parquet_path = os.path.join(dumps_path, 'prefect_parquet', status_table)
    <http://logger.info|logger.info>(f"Saving status table as parquet @ {parquet_path}")
    df.to_parquet(parquet_path, compression='snappy', schema='infer', engine='pyarrow-dataset')
def convert_to_parquet_flow():
    with Flow('ConvertToParquet') as convert_flow:
        dumps_path = Parameter('dumps_path', default="<s3://kdt-staging-dynamodbexport-test>")
        status_table = Parameter('status_table', default='STATUS_TABLE')
        blocksize = Parameter('blocksize', default="256MB")
        to_parquet(dumps_path, status_table, blocksize)
    convert_flow.executor = DaskExecutor()
    convert_flow.run_config = UniversalRun()
    return convert_flow
k
What Jenny is saying may not be causing the issue, but in practice most of our users just use the Flow outside of a function.
Thanks for cleaning that up!
upvote 1
m
sure. This difference in name of the variable is not the problem. They are the same in our code. I'll fix it up 🙂 good catch
I'll try to move it outside of the function
k
Yeah outside the function might help here. I think the issue is that the
with Flow
is a deferred execution that happens in runtime.
task
is also deferred, but the function is not so we may be running into problems.
m
I'm still getting the same behavior unfortunately. The flow ends with the error below:
Copy code
Unexpected error: KilledWorker('to_parquet-f5821bf3359e41ac931067976aea0a97', <Worker '<tcp://172.32.0.48:43493>', name: 0, memory: 0, processing: 1>)
Scheduler and worker logs don't help either
k
KilledWorker errors happen with Dask when there are package mismatches between Client and Workers. Do you have a Dask cluster you’re connecting to? When you do
task.run
. Is that running on local?
m
I tried both on Local and using a cluster. And I get the same behavior on both
task.run works just fine.
k
so you tried LocalExecutor() and it failed? or was that LocalDaskExecutor?
m
I tried DaskExecutor only
k
Can you try LocalDaskExecutor?
m
Sure. Just a minute
So it didn't work with LocalDaskExecutor. It consumed all of my memory and crashed. Then I noticed that when I run it with DaskExecutor it also consumes all of the worker memory and the scheduler kills the worker.
The data is not that big. We have around 1k partitions with 200MB each (each entry has around 6 fields so the final parquet metadata is not big).
k
1k partitions with 200MB each. Does that mean we have 200GB of data?
m
yes. the funny thing is that this processing works when I run the task.run method (using a local cluster that I set up manually)
k
When you use the DaskExecutor, how do you connect to the cluster?
m
I would do something like:
Copy code
from dask.distributed import Client
c =  Client()
to_parquet.run(...)
and let the client setup the local cluster
k
Wow how much memory does your local cluster have? This looks right to me. When you use the DaskExecutor, does your worker memory look as big as you expect?
m
16GB in total. I'm typically letting 4 workers run so 4GB per worker.
When running the flow, only one worker seems to consume memory (and it's before the to_parquet starts running)
the other 3 are idle. I just tried using only 10 files out of the 1k and it worked.
But it seems to me that it's related to the number of files and not the size of the data.
And what bugs me is that it works running the task but not the flow. So I'm not sure how to go ahead here :(
(and I get the same behavior for the 1k files using an ECSCluster with each worker having 16GB of mem)
k
I am a bit confused. The size of data is 200gb and you have 16gb of memory? How does this execute with just
task.run
? Anyway, I think to utilize the other 3 workers, you need a slight refactor such that you can
map
your function across some list. This will allow Prefect to distribute the task to the workers. Maybe apply your logic by breaking up the list of files into smaller lists and passing those to a task that can be mapped.
The way this is written doesn’t make it easily parallelizable into independent processes
m
But isn't dask to_parquet inherently parallelized? I think it works with 16GB of memory because it reads the json by chunks of 'blocksize' (256MB by default)
k
Let me double check my statements. Yes though to_parquet is parallelized if using Dask alone, but I suspect the Prefect-Dask relationship is different here. i’ll get back to you.
Yes I confirmed my thoughts. What happens here is that when users use the
Prefect
map
, the task sets serialized and sent to the workers. Each
Dask
worker already contains an independent process. When you run your code with
Prefect
, it already runs in one worker. This means that your Dask code is already running inside one worker as a local process, which is why it does not distribute the operation. You will either need to refactor to independent processes to leverage the Prefect map, and then combine the results in a later task or you will need to use a worker client to resubmit the Dask operations back to the Dask scheduler to get your operations parallelized with Dask
m
Ah! I got it. In this case, I'm not using Prefect-map but DaskDatabag-map. Is this the same situation?
That's definitely different from what I was thinking but it explains a lot. Thanks!
Just another quick question: is there an easy way to resubmit the jobs to dask from the worker?
k
Yes so you want to refactor to be using Prefect map for parallelizaition or that worker client to resubmit to the scheduler.
🙌 1
m
Maybe it could be easier if we run the flow in a localexecutor and set up a cliente to the dask cluster parallelize the data processing. 🤔 Does it make sense?
k
You can orchestrate Prefect with Dask and you can orchestrate Dask with Prefect. You’re talking about the first one. I’m not exactly sure if that will help you unless you find a way to partition your data to be handled by each Prefect thread beforehand (which is the same case as using Prefect to orchestrate Dask needing to have the job split also)
It might be harder to pull it off that way
m
Ok, Kevin. Thank you. It was very helpful. I'll try a few solutions here. Have a nice weekend
k
Good luck! 🙂