Another question for all you lovely people. Part o...
# prefect-community
a
Another question for all you lovely people. Part of my flow requires that I make ~10k http requests (one per user) and then take each result and store it into the same database table. My plan is to have something like: • Task: get list of users • Task: make api request to third party (map over this task with list of users) • Task: store results of API requests with a single SQL insert My question for you: is it wise to have 10k tasks running? I would assume so from the previous threads and docs. I also plan on limiting the Task Concurrency to not exceed my rate limit with the external party. How does this look from an infra point of view? I'm currently running a KubernetesAgent. Will the single Kubernetes Job be running those 10k API requests?
j
Hi @Adam that sounds like a great use of Prefect! To try and sum up the questions what I think you should do is take advantage of Prefect’s native Dask integrations: https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html https://docs.prefect.io/orchestration/execution/local_environment.html#examples This will give you a great interface for running many tasks in parallel. From an infra point of view (in conjunction with the Kubernetes agent) this could look like a few things (depending on your set up): • Using a LocalDaskExecutor in your flow’s environment if not running that many tasks in parallel • Standing up a static dask cluster (either externally or on the k8s cluster) and then pointing the Dask executor
address
to that Dask cluster to execute your tasks on its workers • Using something like the DaskKubernetesEnvironment for dynamically spinning up a dask cluster for each run on k8s
a
Thanks @josh. A follow up on the Dask Kubernetes environment documentation. If I already have the KubernetesAgent setup (with RBAC etc), do I need to provision my cluster with anything else? The docs seem to imply that the Dask cluster will dynamically be created. Am I right in saying I just need to do
Copy code
environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3)
in my flow and then the Dask workers and scheduler will be auto provisioned on my cluster?
j
Yep as long as your cluster has the RBAC applied and there is enough node availability it should spin right up
a
Thanks! Am I correct in understanding that Dask will run it's own image for the workers rather than the one built in the
Storage
?
j
If you are using the DaskKubernetesEnvironment it will automatically use the same image you have your flow stored in but if you stand up a static dask cluster you would have to control that image yourself 🙂
a
Excellent. Going to give it a try now 🙂
Works 🙂 Thank you!
I take that back 😉 Seem's i'm having a problem with Dask (but it works locally without Dask). I'm getting this error:
Copy code
Unexpected error: RecursionError('maximum recursion depth exceeded')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 491, in get_flow_run_state
    upstream_states = executor.wait(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 375, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1982, in gather
    return self.sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 832, in sync
    return sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1876, in _gather
    response = await future
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1927, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 644, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
    msg = await from_frames(
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/utils.py", line 87, in from_frames
    res = _from_frames()
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/utils.py", line 65, in _from_frames
    return protocol.loads(
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
    return loads(header, frames)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
    value = self._xxx_field_to_index.get(name)
  [Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
FWIW, I'm mapping 100 records into a task
j
Interesting 🤔 could you open an issue on the repo for this with an example of your flow code? Easier to track on there
a
Sure