Hello everyone. I'm trying to run a simple task u...
# ask-community
b
Hello everyone. I'm trying to run a simple task using prefect server and I'm having some trouble with it. The idea is to read a parquet dataset, standardize the feature columns and write the result to a csv file. I'm using dask and dask-ml to process the data and code looks like:
Copy code
with Flow("standardization") as flow:
        df = read_feats(bucket, file_pattern) # a task that returns a dask dataframe using dd.read_parquet(...)
        scaled_df = scale(df) # prefect task that does the standardization using dask-ml StandardScaler
        write_csv_to_s3(scaled_df, bucket, output_file) # this will write the dask dataframe as a csv to an s3 bucket
This is a very simple POC just to get things running and feel prefect in action. I'm using version 0.12. Everything runs smoothly if I run the flow locally using flow.run with a remote DaskExecutor in a FargateCluster (using dask-cloudprovider):
Copy code
executor = DaskExecutor(...) # cluster parameters setting fargatecluster from cloudprovider and cluster_kwargs
flow.run(executor=executor)
I start having problem if I try to run it using a local prefect agent connected to a local prefect server after registering it:
Copy code
executor = DaskExecutor(...) # same parameters as before
flow.environment = LocalEnvironment(executor=executor)
flow.register()
The agent will deploy the flow, it will create FargateCluster normally and we can see the tasks registered at Dask UI Tasks stream but no processing actually happens. It desearializes the first task and do nothing, then does the same with the second and third tasks. Any idea what am I doing wrong here?
This is the dask task stream when running everything locally (with flow.run(...)):
And next one is the same but when running with prefect server and agent (both local). The grey bars are deserialization activities:
Thanks in advance šŸ™‚
j
Sorry, having a bit of trouble parsing your question. You're saying things run fine when you do:
Copy code
flow.run(executor=executor)
but don't when you do:
Copy code
flow.environment = LocalEnvironment(executor=executor)
flow.register()
with the same executor configuration?
b
Exactly
j
Hmmm, the tasks are arriving, so you have workers and things are connected, but you're saying the tasks do nothing? From looking at the above task plot in the dask UI, you're having tasks that do complete. How do you know they're doing nothing?
b
They run too fast and the final file is not written
j
Is your flow successful, or is the task failing?
b
Flow just keeps running forever
j
Hmmm, that is odd. Can you get the logs from your dask workers?
b
From what I see all tasks are "pending"
j
I suspect this is an environment error
b
I just tried doing that but the scheduler and worker were killed before I could do it
I'll try it again.
j
If the cluster shuts down, that means the
FlowRunner
did complete.
You might also try using a local cluster (
DaskExecutor
with no args) to see if things work in that case.
b
Let me try that.
Indeed. Running with a LocalDaskExecutor does work
j
Sorry, try using a
DaskExecutor()
(runs a local distributed cluster) - the
LocalDaskExecutor
uses the local threaded scheduler which is different. I suspect things will work the same, but this is a better proxy for what you're doing.
b
It works just fine with the DaskExecutor() as well
j
Ok. Hmmm. So a few thoughts on what might be happening: • The images your cloud provider cluster is running with don't have all your dependencies available, so something weird is happening. • The configuration to talk to prefect cloud isn't getting properly set in the cloudprovider cluster, so calls to the prefect api are failing, resulting in weird behavior. I'd expect more informative and less weird errors in both these cases, but without logs it's hard to check.
If you're not using some service to store the logs from your fargate tasks, perhaps create an external
FargateCluster
, and configure the
DaskExecutor
with the appropriate address? That way you'd have access to the logs, since the flow runner won't shut down the cluster after it completes.
b
I believe it's not the image problem as we generated a docker image with all dependencies. And we also get to run on the fargatecluster when calling flow.run(executor=executor)
That's a good ideia to create an external cluster to get the logs. I'll do that and I'll come back to you if I have any news. Thanks for the help.
Hey Jim. I just looked at both dask scheduler and worker logs. And they say nothing. It seems nothing is happening.
I used the same cluster with flow.run(executor=DaskExecutor(address)) and it worked again šŸ˜•
j
They say nothing as in nothing unusual? Or they say nothing at all? There should be logs, especially since you're seeing tasks run.
b
I see nothing unusual and nothing about tasks in general. Maybe because they are at info level?
j
Hmmm. Ok. Are you running with prefect cloud or with prefect server?
b
I'm trying it with prefect server now
j
Ok. You might try to get your prefect logs for this flow run using the CLI.
Copy code
prefect get logs --id FLOW_RUN_ID
Wait, sorry, when looking at the logs on the dask cluster did you see logs from prefect, or just logs from dask? If the logs from dask are fine that at least lets us know we have no executor-level failures. I'm trying to get the logs from prefect now.
b
Just logs from dask
Logs for flow run:
Copy code
TIMESTAMP                         LEVEL    MESSAGE
2020-06-30T22:32:15.05836+00:00   INFO     Submitted for execution: PID: 6305
2020-06-30T22:32:16.557885+00:00  INFO     Beginning Flow run for 'standardize_feature_flow'
2020-06-30T22:32:16.611991+00:00  INFO     Starting flow run.
2020-06-30T22:32:16.612358+00:00  DEBUG    Flow 'standardize_feature_flow': Handling state change from Scheduled to Running
2020-06-30T22:32:18.769612+00:00  INFO     Flow run RUNNING: terminal tasks are incomplete
j
And that's it? Hmmmm
b
That's all.
j
I'm a bit stuck on this. If you can provide a reproducible example (perhaps using some dummy tasks) I'd be happy to take a look in more detail. As is this isn't much to go on.
Sorry to not be more helpful here.
b
No problem. I'll try to create a reproducible exemple with dummy data and will get back to you.
šŸ‘ 1
Thank you for your time.