https://prefect.io logo
Title
b

Benjamin

06/30/2020, 9:50 PM
Hello everyone. I'm trying to run a simple task using prefect server and I'm having some trouble with it. The idea is to read a parquet dataset, standardize the feature columns and write the result to a csv file. I'm using dask and dask-ml to process the data and code looks like:
with Flow("standardization") as flow:
        df = read_feats(bucket, file_pattern) # a task that returns a dask dataframe using dd.read_parquet(...)
        scaled_df = scale(df) # prefect task that does the standardization using dask-ml StandardScaler
        write_csv_to_s3(scaled_df, bucket, output_file) # this will write the dask dataframe as a csv to an s3 bucket
This is a very simple POC just to get things running and feel prefect in action. I'm using version 0.12. Everything runs smoothly if I run the flow locally using flow.run with a remote DaskExecutor in a FargateCluster (using dask-cloudprovider):
executor = DaskExecutor(...) # cluster parameters setting fargatecluster from cloudprovider and cluster_kwargs
flow.run(executor=executor)
I start having problem if I try to run it using a local prefect agent connected to a local prefect server after registering it:
executor = DaskExecutor(...) # same parameters as before
flow.environment = LocalEnvironment(executor=executor)
flow.register()
The agent will deploy the flow, it will create FargateCluster normally and we can see the tasks registered at Dask UI Tasks stream but no processing actually happens. It desearializes the first task and do nothing, then does the same with the second and third tasks. Any idea what am I doing wrong here?
This is the dask task stream when running everything locally (with flow.run(...)):
And next one is the same but when running with prefect server and agent (both local). The grey bars are deserialization activities:
Thanks in advance šŸ™‚
j

Jim Crist-Harif

06/30/2020, 9:56 PM
Sorry, having a bit of trouble parsing your question. You're saying things run fine when you do:
flow.run(executor=executor)
but don't when you do:
flow.environment = LocalEnvironment(executor=executor)
flow.register()
with the same executor configuration?
b

Benjamin

06/30/2020, 9:56 PM
Exactly
j

Jim Crist-Harif

06/30/2020, 9:57 PM
Hmmm, the tasks are arriving, so you have workers and things are connected, but you're saying the tasks do nothing? From looking at the above task plot in the dask UI, you're having tasks that do complete. How do you know they're doing nothing?
b

Benjamin

06/30/2020, 9:58 PM
They run too fast and the final file is not written
j

Jim Crist-Harif

06/30/2020, 9:58 PM
Is your flow successful, or is the task failing?
b

Benjamin

06/30/2020, 9:59 PM
Flow just keeps running forever
j

Jim Crist-Harif

06/30/2020, 10:00 PM
Hmmm, that is odd. Can you get the logs from your dask workers?
b

Benjamin

06/30/2020, 10:00 PM
From what I see all tasks are "pending"
j

Jim Crist-Harif

06/30/2020, 10:00 PM
I suspect this is an environment error
b

Benjamin

06/30/2020, 10:01 PM
I just tried doing that but the scheduler and worker were killed before I could do it
I'll try it again.
j

Jim Crist-Harif

06/30/2020, 10:02 PM
If the cluster shuts down, that means the
FlowRunner
did complete.
You might also try using a local cluster (
DaskExecutor
with no args) to see if things work in that case.
b

Benjamin

06/30/2020, 10:03 PM
Let me try that.
Indeed. Running with a LocalDaskExecutor does work
j

Jim Crist-Harif

06/30/2020, 10:06 PM
Sorry, try using a
DaskExecutor()
(runs a local distributed cluster) - the
LocalDaskExecutor
uses the local threaded scheduler which is different. I suspect things will work the same, but this is a better proxy for what you're doing.
b

Benjamin

06/30/2020, 10:10 PM
It works just fine with the DaskExecutor() as well
j

Jim Crist-Harif

06/30/2020, 10:13 PM
Ok. Hmmm. So a few thoughts on what might be happening: ā€¢ The images your cloud provider cluster is running with don't have all your dependencies available, so something weird is happening. ā€¢ The configuration to talk to prefect cloud isn't getting properly set in the cloudprovider cluster, so calls to the prefect api are failing, resulting in weird behavior. I'd expect more informative and less weird errors in both these cases, but without logs it's hard to check.
If you're not using some service to store the logs from your fargate tasks, perhaps create an external
FargateCluster
, and configure the
DaskExecutor
with the appropriate address? That way you'd have access to the logs, since the flow runner won't shut down the cluster after it completes.
b

Benjamin

06/30/2020, 10:17 PM
I believe it's not the image problem as we generated a docker image with all dependencies. And we also get to run on the fargatecluster when calling flow.run(executor=executor)
That's a good ideia to create an external cluster to get the logs. I'll do that and I'll come back to you if I have any news. Thanks for the help.
Hey Jim. I just looked at both dask scheduler and worker logs. And they say nothing. It seems nothing is happening.
I used the same cluster with flow.run(executor=DaskExecutor(address)) and it worked again šŸ˜•
j

Jim Crist-Harif

06/30/2020, 10:51 PM
They say nothing as in nothing unusual? Or they say nothing at all? There should be logs, especially since you're seeing tasks run.
b

Benjamin

06/30/2020, 10:52 PM
I see nothing unusual and nothing about tasks in general. Maybe because they are at info level?
j

Jim Crist-Harif

06/30/2020, 10:53 PM
Hmmm. Ok. Are you running with prefect cloud or with prefect server?
b

Benjamin

06/30/2020, 10:55 PM
I'm trying it with prefect server now
j

Jim Crist-Harif

06/30/2020, 10:56 PM
Ok. You might try to get your prefect logs for this flow run using the CLI.
prefect get logs --id FLOW_RUN_ID
Wait, sorry, when looking at the logs on the dask cluster did you see logs from prefect, or just logs from dask? If the logs from dask are fine that at least lets us know we have no executor-level failures. I'm trying to get the logs from prefect now.
b

Benjamin

06/30/2020, 10:59 PM
Just logs from dask
Logs for flow run:
TIMESTAMP                         LEVEL    MESSAGE
2020-06-30T22:32:15.05836+00:00   INFO     Submitted for execution: PID: 6305
2020-06-30T22:32:16.557885+00:00  INFO     Beginning Flow run for 'standardize_feature_flow'
2020-06-30T22:32:16.611991+00:00  INFO     Starting flow run.
2020-06-30T22:32:16.612358+00:00  DEBUG    Flow 'standardize_feature_flow': Handling state change from Scheduled to Running
2020-06-30T22:32:18.769612+00:00  INFO     Flow run RUNNING: terminal tasks are incomplete
j

Jim Crist-Harif

06/30/2020, 11:00 PM
And that's it? Hmmmm
b

Benjamin

06/30/2020, 11:01 PM
That's all.
j

Jim Crist-Harif

06/30/2020, 11:02 PM
I'm a bit stuck on this. If you can provide a reproducible example (perhaps using some dummy tasks) I'd be happy to take a look in more detail. As is this isn't much to go on.
Sorry to not be more helpful here.
b

Benjamin

06/30/2020, 11:03 PM
No problem. I'll try to create a reproducible exemple with dummy data and will get back to you.
šŸ‘ 1
Thank you for your time.