https://prefect.io logo
a

Aditya

07/07/2023, 4:18 AM
Hello, I am migrating from prefect v1 to v2.10.* I am not sure how to use this can someone help me out? from prefect.executors import LocalDaskExecutor Should I use prefect_dask imports for this?
1
j

Jeff Hale

07/07/2023, 11:47 AM
Yep. Check out the prefect-dask docs and the Prefect docs guide on Dask and Ray for info and examples. 🙂
a

Aditya

07/07/2023, 2:52 PM
I am still confused
@flow(name="newscatcher_stories") def newscatcher_stories_flow(): if WORKER_COUNT > 1: flow.task_runner = DaskTaskRunner(scheduler="threads", num_workers=WORKER_COUNT) How do I put in the if clause for worker count the documentation says to modify the code like this @flow(name="newscatcher_stories", task_runner = DaskTaskRunner(scheduler="threads", num_workers=WORKER_COUNT)) def newscatcher_stories_flow(): ...
j

Jeff Hale

07/07/2023, 4:45 PM
I think you need to specify the number of workers in an argument to the DaskTaskRunner, like in the example here:
Copy code
DaskTaskRunner(
    cluster_class="dask_cloudprovider.FargateCluster",
    cluster_kwargs={
        "image": "prefecthq/prefect:latest",
        "n_workers": 5,
    },
)
a

Aditya

08/01/2023, 2:33 PM
Hello just to confirm what is the use of this image tag in the cluster?
j

Jeff Hale

08/01/2023, 4:10 PM
That’s the Docker image that will pull the latest Prefect image in this case.
a

Aditya

08/01/2023, 6:29 PM
Here's an example of the weird internal error Dask (I think) is throwing on production: "sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked [SQL: INSERT INTO task_run (task_key, dynamic_key, cache_key, cache_expiration, task_version, flow_run_run_count, empirical_policy, task_inputs, tags, flow_run_id, name, run_count, total_run_time, id, created, updated) VALUES (:task_key, :dynamic_key, :cache_key, :cache_expiration, :task_version, :flow_run_run_count, :empirical_policy, :task_inputs, :tags, :flow_run_id, :name, :run_count, :total_run_time, :id, :created, :updated) ON CONFLICT (flow_run_id, task_key, dynamic_key) DO NOTHING]" I am facing such an error have any suggestions for me
I am assuming it is caused by dask
j

Jeff Hale

08/01/2023, 6:38 PM
Looks like a sqlite error. You'll want to use Postgres or Prefect Cloud in production.
a

Aditya

08/01/2023, 6:40 PM
If it's not too much to ask for can you guide me on what changes would be required I am new to this library and I don't have much production experience.
j

Jeff Hale

08/01/2023, 7:28 PM
Using Prefect Cloud as the API server backend would be the simplest way to get going. Here’s the quick start.
2 Views