Joshua Greenhalgh
06/11/2023, 4:22 PMJoshua Greenhalgh
06/11/2023, 4:23 PM16:10:45.323 | ERROR | prefect._internal.concurrency.services - Service 'EventsWorker' failed to process item (Event(<LOTS OF STUFF>), <Context object at 0x7f9cabb6f100>)
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/services.py", line 151, in _main_loop
await self._handle(item)
File "/usr/local/lib/python3.9/site-packages/prefect/events/worker.py", line 41, in _handle
await self._client.emit(event)
File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 22, in emit
return await self._emit(event)
File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 172, in _emit
assert self
Joshua Greenhalgh
06/12/2023, 7:44 AM07:40:56.798 | ERROR | prefect._internal.concurrency.services - Service 'EventsWorker' failed to process item (Event(occurred=DateTime(2023, 6, 12, 7, 38, 14, 746768, tzinfo=Timezone('+00:00')), event='prefect.task-run.Completed', resource=Resource(__root__={'prefect.resource.id': 'prefect.task-run.83c5ba9c-a1c0-464c-a973-4c8f23088598', 'prefect.resource.name': 'get_raw_collection-0', 'prefect.state-message': '', 'prefect.state-name': 'Completed', 'prefect.state-timestamp': '2023-06-12T07:38:14.746768+00:00', 'prefect.state-type': 'COMPLETED'}), related=[RelatedResource(__root__={'prefect.resource.id': 'prefect.flow-run.ef11746f-37e3-4d08-9b19-e8dc67e9ba22', 'prefect.resource.role': 'flow-run', 'prefect.resource.name': 'white-mastiff'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.flow.b824c832-982e-4d9b-b8eb-bcce9a8aab0e', 'prefect.resource.role': 'flow', 'prefect.resource.name': 'cms'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.deployment.95a06a70-c47f-44b0-9190-6ee3e6c54a8a', 'prefect.resource.role': 'deployment', 'prefect.resource.name': 'creatives_loki'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.work-queue.3c646f52-4d3a-41fe-bef1-d91396ad752a', 'prefect.resource.role': 'work-queue', 'prefect.resource.name': 'kubernetes'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.work-pool.326989fc-2dd6-4f88-9d0e-2261448ec7e2', 'prefect.resource.role': 'work-pool', 'prefect.resource.name': 'default-agent-pool'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.tag.cms', 'prefect.resource.role': 'tag'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.tag.loki', 'prefect.resource.role': 'tag'})], payload={'intended': {'from': 'RUNNING', 'to': 'COMPLETED'}, 'initial_state': {'type': 'RUNNING', 'name': 'Running', 'message': ''}, 'validated_state': {'type': 'COMPLETED', 'name': 'Completed', 'message': ''}}, id=UUID('d0b10c03-451a-4006-9da6-5bd4562f5499'), follows=UUID('d2cfd3a8-ae69-4aad-b8ed-20b8eba01ce7')), <Context object at 0x7ff91a1eee00>)
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 655, in __await_impl_timeout__
return await self.__await_impl__()
File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 659, in __await_impl__
_transport, _protocol = await self._create_connection()
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 1026, in create_connection
infos = await self._ensure_resolved(
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 1405, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 861, in getaddrinfo
return await self.run_in_executor(
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/services.py", line 151, in _main_loop
await self._handle(item)
File "/usr/local/lib/python3.9/site-packages/prefect/events/worker.py", line 41, in _handle
await self._client.emit(event)
File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 22, in emit
return await self._emit(event)
File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 180, in _emit
await self._reconnect()
File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 141, in _reconnect
self._websocket = await self._connect.__aenter__()
File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 637, in __aenter__
return await self
File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 655, in __await_impl_timeout__
return await self.__await_impl__()
File "/usr/local/lib/python3.9/site-packages/websockets/legacy/async_timeout.py", line 169, in __aexit__
self._do_exit(exc_type)
File "/usr/local/lib/python3.9/site-packages/websockets/legacy/async_timeout.py", line 252, in _do_exit
raise asyncio.TimeoutError
asyncio.exceptions.TimeoutError
Joshua Greenhalgh
06/12/2023, 7:44 AMJoshua Greenhalgh
06/12/2023, 9:15 AMJoshua Greenhalgh
06/12/2023, 9:22 AMGlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed to process item
and same in the agent...Joshua Greenhalgh
06/12/2023, 11:20 AM11:12:16.270 | INFO | Task run 'get_raw_collection-0' - Finished in state Completed()
... ERROR AS ABOVE
11:15:01.282 | INFO | Flow run 'cocky-chachalaca' - Created task run 'store_raw-0' for task 'store_raw'
Joshua Greenhalgh
06/12/2023, 11:20 AMJoshua Greenhalgh
06/12/2023, 11:21 AMJake Kaplan
06/12/2023, 1:34 PMJake Kaplan
06/12/2023, 1:37 PMJoshua Greenhalgh
06/12/2023, 1:55 PMJoshua Greenhalgh
06/12/2023, 1:56 PMJoshua Greenhalgh
06/12/2023, 1:57 PMJake Kaplan
06/12/2023, 2:02 PMJoshua Greenhalgh
06/12/2023, 2:05 PMJoshua Greenhalgh
06/12/2023, 2:06 PMJoshua Greenhalgh
06/12/2023, 2:08 PMJake Kaplan
06/12/2023, 3:42 PMJake Kaplan
06/12/2023, 3:45 PMPREFECT_EXPERIMENTAL_ENABLE_EVENTS_CLIENT=False
and see what happens?Joshua Greenhalgh
06/12/2023, 3:46 PMJoshua Greenhalgh
06/12/2023, 3:47 PMJake Kaplan
06/12/2023, 3:47 PMJoshua Greenhalgh
06/12/2023, 3:47 PMJake Kaplan
06/12/2023, 3:47 PMJoshua Greenhalgh
06/12/2023, 3:47 PMJoshua Greenhalgh
06/12/2023, 3:48 PMJake Kaplan
06/12/2023, 3:50 PMEventsWorker
blocking something it shouldn't beJoshua Greenhalgh
06/12/2023, 3:51 PMJake Kaplan
06/12/2023, 3:54 PMJake Kaplan
06/12/2023, 3:56 PMJoshua Greenhalgh
06/12/2023, 4:02 PMJake Kaplan
06/12/2023, 4:03 PMJoshua Greenhalgh
06/12/2023, 4:03 PM15:59:23.783 | INFO | Task run 'get_raw_collection-0' - Finished in state Completed()
16:00:47.111 | INFO | Flow run 'victorious-lizard' - Created task run 'store_raw-0' for task 'store_raw'
16:04:25.144 | INFO | Task run 'store_raw-0' - Finished in state Completed()
16:05:47.383 | INFO | Flow run 'victorious-lizard' - Created task run 'clean_collection-0' for task 'clean_collection'
Joshua Greenhalgh
06/12/2023, 4:03 PMtask run events? (other events are generated server side)Oh maybe not no...I see the logs from the tasks though
Joshua Greenhalgh
06/12/2023, 4:05 PMJoshua Greenhalgh
06/12/2023, 4:09 PM16:08:23.833 | INFO | Task run 'clean_collection-0' - Finished in state Completed()
16:08:24.135 | INFO | Flow run 'victorious-lizard' - Created task run 'remove_existing-0' for task 'remove_existing'
1 second in betweenJoshua Greenhalgh
06/12/2023, 4:10 PMJoshua Greenhalgh
06/12/2023, 4:10 PMJake Kaplan
06/12/2023, 6:55 PMJoshua Greenhalgh
06/12/2023, 6:57 PMJoshua Greenhalgh
06/12/2023, 6:58 PMJake Kaplan
06/12/2023, 6:58 PMJoshua Greenhalgh
06/12/2023, 6:58 PMJoshua Greenhalgh
06/12/2023, 6:58 PMJoshua Greenhalgh
06/12/2023, 6:59 PMJoshua Greenhalgh
06/12/2023, 6:59 PMJake Kaplan
06/12/2023, 7:00 PMJake Kaplan
06/12/2023, 7:00 PMJoshua Greenhalgh
06/12/2023, 7:00 PMJoshua Greenhalgh
06/12/2023, 7:01 PMJoshua Greenhalgh
06/12/2023, 7:01 PMJoshua Greenhalgh
06/12/2023, 7:17 PMJake Kaplan
06/12/2023, 7:30 PMJake Kaplan
06/12/2023, 7:31 PMPREFECT_DEBUG_MODE=True
? The size of the parameter seems to matter but it's really hard to say any more than that without an MRE I can run πJake Kaplan
06/12/2023, 7:32 PMJoshua Greenhalgh
06/12/2023, 7:35 PMJoshua Greenhalgh
06/12/2023, 7:35 PMJake Kaplan
06/12/2023, 7:36 PMZanie
Zanie
prefect.utilities.annotations.quote
?Zanie
Joshua Greenhalgh
06/12/2023, 7:53 PMif __name__ == '__main__'
block this doesn't happenJoshua Greenhalgh
06/12/2023, 7:53 PMprefect.utilities.annotations.quote
?Zanie
Joshua Greenhalgh
06/12/2023, 7:55 PM@flow
def foo():
out = quote(bar())
baz(out)
Joshua Greenhalgh
06/12/2023, 7:55 PMZanie
baz(quote(out))
insteadJoshua Greenhalgh
06/12/2023, 8:05 PMJoshua Greenhalgh
06/12/2023, 8:05 PMJoshua Greenhalgh
06/12/2023, 8:05 PMJoshua Greenhalgh
06/13/2023, 10:13 AM10:01:07.309 | INFO | Task run 'get_raw_collection-0' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))
10:01:07.311 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> added item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T10:01:07.309470+00:00', 'flow_run_id': 'f76582f4-5c8b-4d58-97b4-abf50e2dbbd4', 'task_run_id': '115a83ec-2ed7-4fda-a5d8-eeb20995d7bb'} to batch (size 401/3000000)
10:01:07.311 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - <function _get_state_result at 0x7fed2b818040> --> return coroutine for internal await
10:01:07.311 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - <function UnpersistedResult.get at 0x7fed2b8180d0> --> return coroutine for internal await
10:01:07.311 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - <AsyncCancelScope, name='get_task_call_return_value' COMPLETED, runtime=91.85> exited
10:01:07.312 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fed23377610>, flow_run_context=FlowRunContext(start_time=DateT...)
10:01:07.317 | DEBUG | MainThread | prefect._internal.concurrency - Waiter <SyncWaiter call=get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fecf0efe190>, flow_run_context=FlowRunContext(start_time=DateT...), owner='MainThread'> watching for callbacks
10:01:07.481 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fecf0efe190>, flow_run_context=FlowRunContext(start_time=DateT...) in thread 'GlobalEventLoopThread' with timeout None
10:01:11.351 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get_task_call_return_value' RUNNING, runtime=0.00> entered
10:01:11.351 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get_task_call_return_value' COMPLETED, runtime=0.00> exited
10:01:11.351 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fecf0efe190>, flow_run_context=FlowRunContext(start_time=DateT...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False>
10:01:13.279 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - <AsyncCancelScope, name='get_task_call_return_value' RUNNING, runtime=0.00> entered
10:01:13.280 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> processing batch of size 401
10:02:32.295 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 'store_raw-0' for task 'store_raw'", 'timestamp': '2023-06-13T10:02:32.295053+00:00', 'flow_run_id': 'f76582f4-5c8b-4d58-97b4-abf50e2dbbd4', 'task_run_id': None, '__payload_size__': 233}
So we have task finish at "100107.309" - dubug logs following on quite quickly up until this log;
10:01:13.280 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> processing batch of size 401
the start of the next log isn't until "100232.295" - the next task starting here;
10:02:32.295 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 'store_raw-0' for task 'store_raw'", 'timestamp': '2023-06-13T10:02:32.295053+00:00', 'flow_run_id': 'f76582f4-5c8b-4d58-97b4-abf50e2dbbd4', 'task_run_id': None, '__payload_size__': 233}
10:02:32.295 | INFO | Flow run 'daring-bullmastiff' - Created task run 'store_raw-0' for task 'store_raw'