Adam

    Adam

    2 years ago
    Another question for all you lovely people. Part of my flow requires that I make ~10k http requests (one per user) and then take each result and store it into the same database table. My plan is to have something like: • Task: get list of users • Task: make api request to third party (map over this task with list of users) • Task: store results of API requests with a single SQL insert My question for you: is it wise to have 10k tasks running? I would assume so from the previous threads and docs. I also plan on limiting the Task Concurrency to not exceed my rate limit with the external party. How does this look from an infra point of view? I'm currently running a KubernetesAgent. Will the single Kubernetes Job be running those 10k API requests?
    j

    josh

    2 years ago
    Hi @Adam that sounds like a great use of Prefect! To try and sum up the questions what I think you should do is take advantage of Prefect’s native Dask integrations:https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html https://docs.prefect.io/orchestration/execution/local_environment.html#examples This will give you a great interface for running many tasks in parallel. From an infra point of view (in conjunction with the Kubernetes agent) this could look like a few things (depending on your set up): • Using a LocalDaskExecutor in your flow’s environment if not running that many tasks in parallel • Standing up a static dask cluster (either externally or on the k8s cluster) and then pointing the Dask executor
    address
    to that Dask cluster to execute your tasks on its workers • Using something like the DaskKubernetesEnvironment for dynamically spinning up a dask cluster for each run on k8s
    Adam

    Adam

    2 years ago
    Thanks @josh. A follow up on the Dask Kubernetes environment documentation. If I already have the KubernetesAgent setup (with RBAC etc), do I need to provision my cluster with anything else? The docs seem to imply that the Dask cluster will dynamically be created. Am I right in saying I just need to do
    environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3)
    in my flow and then the Dask workers and scheduler will be auto provisioned on my cluster?
    j

    josh

    2 years ago
    Yep as long as your cluster has the RBAC applied and there is enough node availability it should spin right up
    Adam

    Adam

    2 years ago
    Thanks! Am I correct in understanding that Dask will run it's own image for the workers rather than the one built in the
    Storage
    ?
    j

    josh

    2 years ago
    If you are using the DaskKubernetesEnvironment it will automatically use the same image you have your flow stored in but if you stand up a static dask cluster you would have to control that image yourself 🙂
    Adam

    Adam

    2 years ago
    Excellent. Going to give it a try now 🙂
    Works 🙂 Thank you!
    I take that back 😉 Seem's i'm having a problem with Dask (but it works locally without Dask). I'm getting this error:
    Unexpected error: RecursionError('maximum recursion depth exceeded')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 491, in get_flow_run_state
        upstream_states = executor.wait(
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 375, in wait
        return self.client.gather(futures)
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1982, in gather
        return self.sync(
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 832, in sync
        return sync(
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 339, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 323, in f
        result[0] = yield future
      File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
        value = future.result()
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1876, in _gather
        response = await future
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1927, in _gather_remote
        response = await retry_operation(self.scheduler.gather, keys=keys)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
        return await retry(
      File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
        return await coro()
      File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 861, in send_recv_from_rpc
        result = await send_recv(comm=comm, op=key, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 644, in send_recv
        response = await comm.read(deserializers=deserializers)
      File "/usr/local/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
        msg = await from_frames(
      File "/usr/local/lib/python3.8/site-packages/distributed/comm/utils.py", line 87, in from_frames
        res = _from_frames()
      File "/usr/local/lib/python3.8/site-packages/distributed/comm/utils.py", line 65, in _from_frames
        return protocol.loads(
      File "/usr/local/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads
        value = _deserialize(head, fs, deserializers=deserializers)
      File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
        return loads(header, frames)
      File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
        return pickle.loads(x, buffers=buffers)
      File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
        return pickle.loads(x)
      File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
        value = self._xxx_field_to_index.get(name)
      File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
        value = self._xxx_field_to_index.get(name)
      File "/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 1264, in __getattr__
        value = self._xxx_field_to_index.get(name)
      [Previous line repeated 974 more times]
    RecursionError: maximum recursion depth exceeded
    FWIW, I'm mapping 100 records into a task
    j

    josh

    2 years ago
    Interesting 🤔 could you open an issue on the repo for this with an example of your flow code? Easier to track on there
    Adam

    Adam

    2 years ago
    Sure