Another question for all you lovely people. Part of my flow requires that I make ~10k http requests (one per user) and then take each result and store it into the same database table. My plan is to have something like: • Task: get list of users • Task: make api request to third party (map over this task with list of users) • Task: store results of API requests with a single SQL insert My question for you: is it wise to have 10k tasks running? I would assume so from the previous threads and docs. I also plan on limiting the Task Concurrency to not exceed my rate limit with the external party. How does this look from an infra point of view? I'm currently running a KubernetesAgent. Will the single Kubernetes Job be running those 10k API requests?
Hi @Adam that sounds like a great use of Prefect! To try and sum up the questions what I think you should do is take advantage of Prefect’s native Dask integrations: This will give you a great interface for running many tasks in parallel. From an infra point of view (in conjunction with the Kubernetes agent) this could look like a few things (depending on your set up): • Using a LocalDaskExecutor in your flow’s environment if not running that many tasks in parallel • Standing up a static dask cluster (either externally or on the k8s cluster) and then pointing the Dask executor
to that Dask cluster to execute your tasks on its workers • Using something like the DaskKubernetesEnvironment for dynamically spinning up a dask cluster for each run on k8s
Thanks @josh. A follow up on the Dask Kubernetes environment documentation. If I already have the KubernetesAgent setup (with RBAC etc), do I need to provision my cluster with anything else? The docs seem to imply that the Dask cluster will dynamically be created. Am I right in saying I just need to do
environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3)
in my flow and then the Dask workers and scheduler will be auto provisioned on my cluster?
Yep as long as your cluster has the RBAC applied and there is enough node availability it should spin right up
Thanks! Am I correct in understanding that Dask will run it's own image for the workers rather than the one built in the
If you are using the DaskKubernetesEnvironment it will automatically use the same image you have your flow stored in but if you stand up a static dask cluster you would have to control that image yourself 🙂
Excellent. Going to give it a try now 🙂
Works 🙂 Thank you!
I take that back 😉 Seem's i'm having a problem with Dask (but it works locally without Dask). I'm getting this error:
Unexpected error: RecursionError('maximum recursion depth exceeded')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/", line 491, in get_flow_run_state
    upstream_states = executor.wait(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/executors/", line 375, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 1982, in gather
    return self.sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 832, in sync
    return sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 1876, in _gather
    response = await future
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 1927, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 385, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 370, in retry
    return await coro()
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/", line 644, in send_recv
    response = await
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/", line 202, in read
    msg = await from_frames(
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/", line 87, in from_frames
    res = _from_frames()
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/", line 65, in _from_frames
    return protocol.loads(
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/", line 130, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/", line 302, in deserialize
    return loads(header, frames)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/", line 64, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/", line 75, in loads
    return pickle.loads(x)
FWIW, I'm mapping 100 records into a task
Interesting 🤔 could you open an issue on the repo for this with an example of your flow code? Easier to track on there