b

    Benjamin

    2 years ago
    Hello everyone. I'm trying to run a simple task using prefect server and I'm having some trouble with it. The idea is to read a parquet dataset, standardize the feature columns and write the result to a csv file. I'm using dask and dask-ml to process the data and code looks like:
    with Flow("standardization") as flow:
            df = read_feats(bucket, file_pattern) # a task that returns a dask dataframe using dd.read_parquet(...)
            scaled_df = scale(df) # prefect task that does the standardization using dask-ml StandardScaler
            write_csv_to_s3(scaled_df, bucket, output_file) # this will write the dask dataframe as a csv to an s3 bucket
    This is a very simple POC just to get things running and feel prefect in action. I'm using version 0.12. Everything runs smoothly if I run the flow locally using flow.run with a remote DaskExecutor in a FargateCluster (using dask-cloudprovider):
    executor = DaskExecutor(...) # cluster parameters setting fargatecluster from cloudprovider and cluster_kwargs
    flow.run(executor=executor)
    I start having problem if I try to run it using a local prefect agent connected to a local prefect server after registering it:
    executor = DaskExecutor(...) # same parameters as before
    flow.environment = LocalEnvironment(executor=executor)
    flow.register()
    The agent will deploy the flow, it will create FargateCluster normally and we can see the tasks registered at Dask UI Tasks stream but no processing actually happens. It desearializes the first task and do nothing, then does the same with the second and third tasks. Any idea what am I doing wrong here?
    This is the dask task stream when running everything locally (with flow.run(...)):
    And next one is the same but when running with prefect server and agent (both local). The grey bars are deserialization activities:
    Thanks in advance šŸ™‚
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Sorry, having a bit of trouble parsing your question. You're saying things run fine when you do:
    flow.run(executor=executor)
    but don't when you do:
    flow.environment = LocalEnvironment(executor=executor)
    flow.register()
    with the same executor configuration?
    b

    Benjamin

    2 years ago
    Exactly
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hmmm, the tasks are arriving, so you have workers and things are connected, but you're saying the tasks do nothing? From looking at the above task plot in the dask UI, you're having tasks that do complete. How do you know they're doing nothing?
    b

    Benjamin

    2 years ago
    They run too fast and the final file is not written
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Is your flow successful, or is the task failing?
    b

    Benjamin

    2 years ago
    Flow just keeps running forever
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hmmm, that is odd. Can you get the logs from your dask workers?
    b

    Benjamin

    2 years ago
    From what I see all tasks are "pending"
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    I suspect this is an environment error
    b

    Benjamin

    2 years ago
    I just tried doing that but the scheduler and worker were killed before I could do it
    I'll try it again.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    If the cluster shuts down, that means the
    FlowRunner
    did complete.
    You might also try using a local cluster (
    DaskExecutor
    with no args) to see if things work in that case.
    b

    Benjamin

    2 years ago
    Let me try that.
    Indeed. Running with a LocalDaskExecutor does work
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Sorry, try using a
    DaskExecutor()
    (runs a local distributed cluster) - the
    LocalDaskExecutor
    uses the local threaded scheduler which is different. I suspect things will work the same, but this is a better proxy for what you're doing.
    b

    Benjamin

    2 years ago
    It works just fine with the DaskExecutor() as well
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Ok. Hmmm. So a few thoughts on what might be happening: ā€¢ The images your cloud provider cluster is running with don't have all your dependencies available, so something weird is happening. ā€¢ The configuration to talk to prefect cloud isn't getting properly set in the cloudprovider cluster, so calls to the prefect api are failing, resulting in weird behavior. I'd expect more informative and less weird errors in both these cases, but without logs it's hard to check.
    If you're not using some service to store the logs from your fargate tasks, perhaps create an external
    FargateCluster
    , and configure the
    DaskExecutor
    with the appropriate address? That way you'd have access to the logs, since the flow runner won't shut down the cluster after it completes.
    b

    Benjamin

    2 years ago
    I believe it's not the image problem as we generated a docker image with all dependencies. And we also get to run on the fargatecluster when calling flow.run(executor=executor)
    That's a good ideia to create an external cluster to get the logs. I'll do that and I'll come back to you if I have any news. Thanks for the help.
    Hey Jim. I just looked at both dask scheduler and worker logs. And they say nothing. It seems nothing is happening.
    I used the same cluster with flow.run(executor=DaskExecutor(address)) and it worked again šŸ˜•
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    They say nothing as in nothing unusual? Or they say nothing at all? There should be logs, especially since you're seeing tasks run.
    b

    Benjamin

    2 years ago
    I see nothing unusual and nothing about tasks in general. Maybe because they are at info level?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hmmm. Ok. Are you running with prefect cloud or with prefect server?
    b

    Benjamin

    2 years ago
    I'm trying it with prefect server now
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Ok. You might try to get your prefect logs for this flow run using the CLI.
    prefect get logs --id FLOW_RUN_ID
    Wait, sorry, when looking at the logs on the dask cluster did you see logs from prefect, or just logs from dask? If the logs from dask are fine that at least lets us know we have no executor-level failures. I'm trying to get the logs from prefect now.
    b

    Benjamin

    2 years ago
    Just logs from dask
    Logs for flow run:
    TIMESTAMP                         LEVEL    MESSAGE
    2020-06-30T22:32:15.05836+00:00   INFO     Submitted for execution: PID: 6305
    2020-06-30T22:32:16.557885+00:00  INFO     Beginning Flow run for 'standardize_feature_flow'
    2020-06-30T22:32:16.611991+00:00  INFO     Starting flow run.
    2020-06-30T22:32:16.612358+00:00  DEBUG    Flow 'standardize_feature_flow': Handling state change from Scheduled to Running
    2020-06-30T22:32:18.769612+00:00  INFO     Flow run RUNNING: terminal tasks are incomplete
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    And that's it? Hmmmm
    b

    Benjamin

    2 years ago
    That's all.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    I'm a bit stuck on this. If you can provide a reproducible example (perhaps using some dummy tasks) I'd be happy to take a look in more detail. As is this isn't much to go on.
    Sorry to not be more helpful here.
    b

    Benjamin

    2 years ago
    No problem. I'll try to create a reproducible exemple with dummy data and will get back to you.
    Thank you for your time.