Benjamin
06/30/2020, 9:50 PMwith Flow("standardization") as flow:
df = read_feats(bucket, file_pattern) # a task that returns a dask dataframe using dd.read_parquet(...)
scaled_df = scale(df) # prefect task that does the standardization using dask-ml StandardScaler
write_csv_to_s3(scaled_df, bucket, output_file) # this will write the dask dataframe as a csv to an s3 bucket
This is a very simple POC just to get things running and feel prefect in action. I'm using version 0.12.
Everything runs smoothly if I run the flow locally using flow.run with a remote DaskExecutor in a FargateCluster (using dask-cloudprovider):
executor = DaskExecutor(...) # cluster parameters setting fargatecluster from cloudprovider and cluster_kwargs
flow.run(executor=executor)
I start having problem if I try to run it using a local prefect agent connected to a local prefect server after registering it:
executor = DaskExecutor(...) # same parameters as before
flow.environment = LocalEnvironment(executor=executor)
flow.register()
The agent will deploy the flow, it will create FargateCluster normally and we can see the tasks registered at Dask UI Tasks stream but no processing actually happens. It desearializes the first task and do nothing, then does the same with the second and third tasks.
Any idea what am I doing wrong here?Jim Crist-Harif
06/30/2020, 9:56 PMflow.run(executor=executor)
but don't when you do:
flow.environment = LocalEnvironment(executor=executor)
flow.register()
with the same executor configuration?Benjamin
06/30/2020, 9:56 PMJim Crist-Harif
06/30/2020, 9:57 PMBenjamin
06/30/2020, 9:58 PMJim Crist-Harif
06/30/2020, 9:58 PMBenjamin
06/30/2020, 9:59 PMJim Crist-Harif
06/30/2020, 10:00 PMBenjamin
06/30/2020, 10:00 PMJim Crist-Harif
06/30/2020, 10:00 PMBenjamin
06/30/2020, 10:01 PMJim Crist-Harif
06/30/2020, 10:02 PMFlowRunner
did complete.DaskExecutor
with no args) to see if things work in that case.Benjamin
06/30/2020, 10:03 PMJim Crist-Harif
06/30/2020, 10:06 PMDaskExecutor()
(runs a local distributed cluster) - the LocalDaskExecutor
uses the local threaded scheduler which is different. I suspect things will work the same, but this is a better proxy for what you're doing.Benjamin
06/30/2020, 10:10 PMJim Crist-Harif
06/30/2020, 10:13 PMFargateCluster
, and configure the DaskExecutor
with the appropriate address? That way you'd have access to the logs, since the flow runner won't shut down the cluster after it completes.Benjamin
06/30/2020, 10:17 PMJim Crist-Harif
06/30/2020, 10:51 PMBenjamin
06/30/2020, 10:52 PMJim Crist-Harif
06/30/2020, 10:53 PMBenjamin
06/30/2020, 10:55 PMJim Crist-Harif
06/30/2020, 10:56 PMprefect get logs --id FLOW_RUN_ID
Benjamin
06/30/2020, 10:59 PMTIMESTAMP LEVEL MESSAGE
2020-06-30T22:32:15.05836+00:00 INFO Submitted for execution: PID: 6305
2020-06-30T22:32:16.557885+00:00 INFO Beginning Flow run for 'standardize_feature_flow'
2020-06-30T22:32:16.611991+00:00 INFO Starting flow run.
2020-06-30T22:32:16.612358+00:00 DEBUG Flow 'standardize_feature_flow': Handling state change from Scheduled to Running
2020-06-30T22:32:18.769612+00:00 INFO Flow run RUNNING: terminal tasks are incomplete
Jim Crist-Harif
06/30/2020, 11:00 PMBenjamin
06/30/2020, 11:01 PMJim Crist-Harif
06/30/2020, 11:02 PMBenjamin
06/30/2020, 11:03 PM