hi! i was trying to use the default dask task runn...
# prefect-community
d
hi! i was trying to use the default dask task runner on our self hosted agent and orion. im running into
Copy code
AttributeError: 'coroutine' object has no attribute 'type'
and the flow run crashes. any idea why this happened. adding detailed logs in the comment
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 580, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "flows/calculate_usages_flows/usages_flow.py", line 44, in usages_flow
    usages_initializer(scheduler_output['body_array'])
  File "/usr/local/lib/python3.9/site-packages/prefect/flows.py", line 439, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 154, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 475, in create_and_begin_subflow_run
    terminal_state = await orchestrate_flow_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 627, in orchestrate_flow_run
    await wait_for_task_runs_and_report_crashes(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1254, in wait_for_task_runs_and_report_crashes
    if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'
k
Can you paste an example of the code that causes this error?
j
Just to clarify, the default taskrunner is async, not Dask.
j
I got this same error running Prefect on a k8s cluster with
dask_kubernetes.KubeCluster
More specifically: • I'm using Prefect Cloud • prefect 2.6.6, dask 2022.10.2, dask-kubernetes 2022.10.1 • k8s 1.21 on EKS More logs:
Copy code
15:31:27.888 | INFO    | Flow run 'cornflower-manul' - Created task run 'download_reshape_upload-8c37f7f7-55' for task 'download_reshape_upload'
15:31:27.894 | INFO    | Flow run 'cornflower-manul' - Submitted task run 'download_reshape_upload-8c37f7f7-55' for execution.
2022-11-14 15:43:15,039 - distributed.deploy.adaptive - INFO - Retiring workers [1]
2022-11-14 15:43:32,041 - distributed.deploy.adaptive - INFO - Retiring workers [7]
2022-11-14 15:43:58,039 - distributed.deploy.adaptive - INFO - Retiring workers [0]
2022-11-14 15:44:11,039 - distributed.deploy.adaptive - INFO - Retiring workers [8, 10, 11]
2022-11-14 15:44:48,040 - distributed.deploy.adaptive - INFO - Retiring workers [6]
2022-11-14 15:44:57,039 - distributed.deploy.adaptive - INFO - Retiring workers [3]
2022-11-14 15:45:19,039 - distributed.deploy.adaptive - INFO - Retiring workers [4, 5]
2022-11-14 15:45:21,039 - distributed.deploy.adaptive - INFO - Retiring workers [11]
15:45:27.270 | ERROR   | Flow run 'cornflower-manul' - Encountered exception during execution:
Traceback (most recent call last):
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 612, in orchestrate_flow_run
    waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1284, in wait_for_task_runs_and_report_crashes
    if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'
2022-11-14 15:45:27,277 - distributed.deploy.adaptive_core - INFO - Adaptive stop
2022-11-14 15:45:27,410 - distributed.deploy.adaptive_core - INFO - Adaptive stop
15:45:27.411 | ERROR   | Flow run 'cornflower-manul' - Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'

15:45:27.485 | ERROR   | prefect.engine - Engine execution of flow run '1c4aac07-e116-47fe-a8e7-31de3cb613f8' exited with unexpected exception
Traceback (most recent call last):
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1634, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 181, in enter_flow_run_engine_from_subprocess
    return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
  File "/code/.venv/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/code/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/code/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/code/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 299, in retrieve_flow_then_begin_flow_run
    return await begin_flow_run(
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 361, in begin_flow_run
    terminal_state = await orchestrate_flow_run(
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 657, in orchestrate_flow_run
    await wait_for_task_runs_and_report_crashes(
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1284, in wait_for_task_runs_and_report_crashes
    if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'
There was a version mismatch earlier, but doesn't seem relevant:
Copy code
15:31:15.890 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_kubernetes.classic.kubecluster.KubeCluster`
Creating scheduler pod on cluster. This may take some time.
2022-11-14 15:31:21,037 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=10
/code/.venv/lib/python3.10/site-packages/distributed/client.py:1348: VersionMismatchWarning: Mismatched versions found

+---------+----------------+----------------+---------+
| Package | Client         | Scheduler      | Workers |
+---------+----------------+----------------+---------+
| lz4     | None           | 4.0.2          | None    |
| python  | 3.10.7.final.0 | 3.8.13.final.0 | None    |
+---------+----------------+----------------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
@Khuyen Tran @Jeff Hale any ideas here 🤞 ?
z
Looks like when we waited for the task Dask gave us a coroutine instead of the result (which we expect to be a Prefect state).
👀 1
j
That it does!
🙌 1
I have run into a pretty exhausting list of problems trying to run Prefect on k8s with Dask. It feels like I am way off the beaten track, because I haven't even managed to get a simple 3-task flow to complete successfully – and I know that there are serious developer experience problems waiting for us even if we manage to get the basic flow working. Is anyone actually running Prefect 2 in Kubernetes via
DaskTaskRunner
and
dask_kubernetes.KubeCluster
? If not, is there a different infrastructure setup that we should consider?
1
d
id love to get some better resources for dask task runners as well. for now the default concurrent runner is what im resorting to
j
@Deepanshu Aggarwal so all of your tasks run on the same box, right? That wouldn't work for us I'm afraid 😕 Need horizontal scaling.
d
all of the tasks yes but you can internally limit the number of task running in one flow and start a new flow after that task limit is reached
its not an ideal way and somewhat not concurrent but it serves us well for now
j
Sounds like something I'd want the framework to handle TBH. I'm re-checking other pipeline products to see if they're a better fit – Dagster have a cloud offering now, which is new to me!
d
true! ill also check it out thanks
a
@James Brady have you tried our Coiled and Fugue integration? with those, scale-out with Dask should be pretty seamless cc @Khuyen Tran (she's working on a Coiled tutorial IIRC) cc @Kevin Kho maintainer of Fugue
j
Nope. The fact Coiled isn't mentioned in your docs is kind of a red flag to me
Perhaps my mental model is off, but distributing work across multiple pods just seems like something I want supported as a first-class operation 🤷
a
you're absolutely right, sorry for adding confusion, looks like we don't have the Coiled integration fully formalized yet -- we started collaboration here let me cc @alex - he's our Integrations team lead and can say more how we plan to integrate with Coiled
definitely, distributed setup brings its own challenges with it, we have some experts in this area you could book personalized support with via cs@prefect.io totally understand the issue, and we will keep improving our integration with various task runner options and ways to integrate with Dask and Ray
a
There’s definitely a gap in our documentation for working with Coiled specifically, but here’s an example of how to configure the a
DaskTaskRunner
that works with a coiled cluster: https://github.com/PrefectHQ/prefect-dask/issues/26#issuecomment-1258755216. This portion of the
prefect-dask
documentation may also help with distributing work across workers: https://prefecthq.github.io/prefect-dask/#distributing-dask-collections-across-workers
gratitude thank you 2
🙌 2
j
m
Wait, but what is the point of k8s without dask actually then? Because you mention you cannot distribute things across pods without it at all @James Brady? I thought that was the whole point, that I could have agents on various pods that are picking up scheduled flows (and maybe even tasks?) independently?
j
Not an expert, but yes that's my understanding Michiel. Deepanshu suggests here that you can maybe get the ConcurrentTaskRunner to use more than one pod, but it seems like a bit of a bodge to me
z
You can definitely distribute flows across pods in k8s, you only need Dask to distribute tasks.
d
concurrent task runner doesnt distribute it across pods. it implements multithreading i guess . thus increasing the cpu and memory of one single job
even im looking for solution to distribute tasks across pods
m
Ah okay, pfew, got scared for a second since I am literally currently going through a prefect recipe about setting up Prefect on Azure AKS with Terraform 😅
But yeah definitely I could imagine wanting easier task distribution further down the line, so I can imagine your frustration
d
theres no clear documentation for dask . thats like the last blocker for me in my poc
m
The main thing I wonder actually now, in what way is k8s still better than for example Azure Container Instances (https://medium.com/the-prefect-blog/serverless-prefect-flows-with-azure-container-instances-f2442ebc9343)?
r
k8s gives you complete control over your infrastructure, but in many cases all you need is the ability to specify CPU, GPU, and memory like you get with ACI or Google Cloud Run. I believe @Oscar Björhn migrated from k8s to Azure Container Instances, so he can probably tell you more about the benefits and drawbacks of that next time he is online 😄
🙌 1
o
Benefits: Much easier to set up and maintain (checking how much cpu/ram a flow uses, running bash commands, checking logs), cheaper (don't need the overhead of constantly-running head nodes and such). Doing things like starting instances in specific vnets or using managed identities required very little effort. Drawbacks: Each container instance can currently only use up to 4 cores and I forgot the amount, but a relatively low amount of ram (16-64 GB or something). Not an issue for our workloads. We struggled for a week or two learning k8s and getting it up and running, then we had multiple intermittent errors running flows which would have required us to spend a lot more time learning k8s "properly" to fix. It was, by far, the most complicated component in our data stack. Just then, Prefect released ACI support, we had a quick look at it and migrated everything over to it within a couple of days.
🙌 3
🙏 3
d
@Oscar Björhn were you able to use dask runner on k8s ? if yes can you provide some sample code for a flow and how is it distributing the tasks ?
r
I don't think Oscar used Dask on k8s, but I might be mistaken.
o
Nope, we are experimenting with a Ray cluster but decided to run it outside of k8s
... So now that's the most complicated component of our data stack. Hoping to check out Modal when they open up an option to run it within the EU.
🙌 1
m
Awesome Oscar, thanks for the info! So I’m curious, as you mentioned 2 weeks for k8s/AKS with Prefect, in comparison (handwavy back-of-the hand estimations are completely fine) how long did it take to setup ACI in a production-worthy deployment strategy? And do you for example also have like a develop, staging, production type of environments which are done through different ACI resources? I think this would make for a very good prefect recipe as alternative to the currently available AKS recipe (https://github.com/PrefectHQ/prefect-recipes/tree/main/devops/infrastructure-as-code/azure). I do wonder how much it differs from the AVM recipe though, but certainly open to collaborate to create a new recipe
o
It took us 4 or 5 days in total, but most of it was due to some of our infrastructure requirements not being supported by prefect-azure (yet), which meant we had to modify the package ourselves as well as gain a deeper understanding regarding how blocks work (such as "what happens when we modify and redeploy a block created by the Prefect team?" and "What will happen once they push their own updates?"). We added 3 new abilities to the ACI block: Specifying subnets, dns servers as well as assigning an uami (User-Assigned Managed Identity) to the created container. I've had some discussions about this with Ryan and I believe he is adding support for most of it soon. If anything is missing we will create a PR. 🙂 If the above functionality had already been in place I doubt it would have taken us more than two days from idea to production, ACI is surprisingly easy to work with. We deploy to three environments currently (dev, test, prod) and we've set it up so that each flow can specify whether it needs to be run on a specific subnet/dns. Each flow can also specify its cpu and ram requirements. I've considered creating recipes for some of the things we've created, just haven't found the time yet. Most of the above was based on Ryan's article, so I haven't been sure if there's any value in it.
❤️ 2
🙌 2
gratitude thank you 2
a
there's always value in sharing your work - I definitely encourage that 🙌 thanks so much Oscar for helping us further improve the ACI block, so great to see it
👍 3
j
@James Brady did you ever figure out how to implement horizontal scaling?
my team is just migrating from prefect 1 to 2 and are evaluating our infrastructure. we are currently using ECS with fargate. i'm not familiar with ACI, but seems this is Azure's equivalent offering...? anyway, our ETLs with prefect are pretty inconsistent in the amount of processing we do. we have bursts of a lot of data, but most of the time it's not very much. Having the prefect agent as an ECS task doesn't really work for us, since we need to know the compute up front: https://docs.prefect.io/concepts/infrastructure/#ecstask:~:text=Make%20sure%20to%20allocate%20eno[…]r%20agent%2C%20and%20consider%20adding%20retries We have no way of knowing the amount of data we need to process in advance, but we do know how much compute any given flow will need. what's happening right now is we get a ton of data dumped on us, and we have to throttle the processing so the prefect agent as an ECS task can handle it. the ideal situation would be for ECS to autoscale up the prefect agent task, but seems that's not possible? anyone in this thread have thoughts what we might try?