Hi all, I have a flow that kicks off 2 dbt cloud j...
# ask-community
z
Hi all, I have a flow that kicks off 2 dbt cloud jobs. Every so often (maybe once every few weeks), we get a random
asyncio.exceptions.CancelledError
(full traceback in thread). The dbt cloud jobs have run fine, but prefect thinks it failed and fails the flow Prefect version: 2.8.7 parent flow id: 276154ba-8d12-4ce2-983e-0b571f2d7e8a I don't want to set retries because dbt snapshots are involved (i.e. I don't want to unnecessarily take 2 snapshots) According to ChatGPT, this CAN be due to network connectivity issues. I'd like to ignore these errors, but continue checking the state of dbt cloud jobs to completion.. Chat GPT recommends something like this:
Copy code
@task
def your_task():
    try:
        # Your task's code here
        pass
    except asyncio.CancelledError:
        # Catch and ignore CancelledError
        pass
But I don't want to mark the flow as "complete" before its ACTUALLY completed in dbt cloud. Is there some way I can tell Prefect to continue monitoring the task, even after encountering a
asyncio.exceptions.CancelledError
? I have a tableau flow that is dependent on this one, but I'm tempted to just set it on a cr0n.. Thanks in advance!!
Downloading flow code from storage at '' 045400 AM prefect.flow_runs INFO Created subflow run 'romantic-toucanet' for flow ' prd - dbt - flow' 045403 AM prefect.flow_runs ERROR Encountered exception during execution: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 34, in read return await self._stream.receive(max_bytes=max_bytes) File "/usr/local/lib/python3.8/site-packages/anyio/streams/tls.py", line 195, in receive data = await self._call_sslobject_method(self._ssl_object.read, max_bytes) File "/usr/local/lib/python3.8/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method data = await self.transport_stream.receive() File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive await self._protocol.read_event.wait() File "/usr/local/lib/python3.8/asyncio/locks.py", line 309, in wait await fut asyncio.exceptions.CancelledError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions yield File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 36, in read return b"" File "/usr/local/lib/python3.8/site-packages/anyio/_core/_tasks.py", line 118, in exit raise TimeoutError TimeoutError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions yield File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request raise exc File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request response = await connection.handle_async_request(request) File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request return await self._connection.handle_async_request(request) File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request raise exc File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request ) = await self._receive_response_headers(**kwargs) File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers event = await self._receive_event(timeout=timeout) File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 191, in _receive_event data = await self._network_stream.read( File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 36, in read return b"" File "/usr/local/lib/python3.8/contextlib.py", line 131, in exit self.gen.throw(type, value, traceback) File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) httpcore.ReadTimeout The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 673, in orchestrate_flow_run result = await flow_call.aresult() File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult return await asyncio.wrap_future(self.future) File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync result = self.fn(*self.args, **self.kwargs) File "containers/prefect/src/flows/flow_data_pipeline_primary.py", line 18, in data_pipeline_primary trigger_dbt_cloud_job_run_flow() File "/usr/local/lib/python3.8/site-packages/prefect/flows.py", line 468, in call return enter_flow_run_engine_from_flow_call( File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call retval = from_sync.wait_for_call_in_loop_thread( File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread return call.result() File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result return self.future.result(timeout=timeout) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async result = await coro File "/usr/local/lib/python3.8/site-packages/prefect/client/utilities.py", line 40, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 571, in create_and_begin_subflow_run return await terminal_state.result(fetch=True) File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1545, in orchestrate_task_run result = await call.aresult() File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult return await asyncio.wrap_future(self.future) File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync result = self.fn(*self.args, **self.kwargs) File "/tmp/tmph6kz3ftoprefect/containers/prefect/src/flows/flow_dbt_2.py", line 44, in run_dbt_cloud_job dbt_cloud_job_run.wait_for_completion() File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper return call() File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 245, in call return self.result() File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result return self.future.result(timeout=timeout) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async result = await coro File "/usr/local/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 731, in wait_for_completion await self._wait_until_state( File "/usr/local/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 679, in _wait_until_state run_state = await get_state_fn() File "/usr/local/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 722, in get_status_code run_data = await self.get_run() File "/usr/local/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 708, in get_run response = await client.get_run(self.run_id) File "/usr/local/lib/python3.8/site-packages/prefect_dbt/cloud/clients.py", line 128, in get_run return await self.call_endpoint( File "/usr/local/lib/python3.8/site-packages/prefect_dbt/cloud/clients.py", line 55, in call_endpoint response = await self._admin_client.request( File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1533, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1620, in send response = await self._send_handling_auth( File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1648, in _send_handling_auth response = await self._send_handling_redirects( File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects response = await self._send_single_request(request) File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1722, in _send_single_request response = await transport.handle_async_request(request) File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "/usr/local/lib/python3.8/contextlib.py", line 131, in exit self.gen.throw(type, value, traceback) File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ReadTimeout 050022 AM prefect.flow_runs ERROR Finished in state Failed('Flow run encountered an exception.
@Marvin any ideas on this?
m
I'm sorry, but your question is a bit too vague for my vast, yet underutilized, computational capabilities. Could you please provide more details or context? It would give me something to do, at least.
z
@Marvin how to continue to check the status of a task after encountering an error without stopping the task and finishing the flow, and without retries
m
Well, it seems you're dealing with a task in a flow that may encounter an error. In Prefect, a task failing doesn't necessarily mean the flow has to stop. The behavior of the flow in response to a task failure is determined by the task's
trigger
. By default, a task's trigger is
all_successful
, which means if any of its upstream tasks fail, it won't run. However, you can change the trigger to
all_finished
, which means the task will run as long as all its upstream tasks have finished, regardless of whether they succeeded or failed. Here's how you can do it:
Copy code
python
from prefect import Flow, Task, triggers

class MyTask(Task):
    def run(self):
        # Your task logic here
        pass

with Flow("My Flow") as flow:
    task1 = MyTask(trigger=triggers.all_finished)()
    task2 = MyTask(trigger=triggers.all_finished)()
    task1.set_upstream(task2)

# Now even if task2 fails, task1 will still run
This way, your flow can continue even if a task encounters an error. But remember, life... er, I mean, error handling... is full of trade-offs. You'll need to decide what's best based on your specific use case.