Can anyone help me understand why I am seeing such...
# ask-community
j
Can anyone help me understand why I am seeing such huge gaps between tasks?
I am also seeing lots of logs on my agent like the following?
Copy code
16:10:45.323 | ERROR   | prefect._internal.concurrency.services - Service 'EventsWorker' failed to process item (Event(<LOTS OF STUFF>), <Context object at 0x7f9cabb6f100>)
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/services.py", line 151, in _main_loop
    await self._handle(item)
  File "/usr/local/lib/python3.9/site-packages/prefect/events/worker.py", line 41, in _handle
    await self._client.emit(event)
  File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 22, in emit
    return await self._emit(event)
  File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 172, in _emit
    assert self
Log from inside the flow when this happens;
Copy code
07:40:56.798 | ERROR   | prefect._internal.concurrency.services - Service 'EventsWorker' failed to process item (Event(occurred=DateTime(2023, 6, 12, 7, 38, 14, 746768, tzinfo=Timezone('+00:00')), event='prefect.task-run.Completed', resource=Resource(__root__={'prefect.resource.id': 'prefect.task-run.83c5ba9c-a1c0-464c-a973-4c8f23088598', 'prefect.resource.name': 'get_raw_collection-0', 'prefect.state-message': '', 'prefect.state-name': 'Completed', 'prefect.state-timestamp': '2023-06-12T07:38:14.746768+00:00', 'prefect.state-type': 'COMPLETED'}), related=[RelatedResource(__root__={'prefect.resource.id': 'prefect.flow-run.ef11746f-37e3-4d08-9b19-e8dc67e9ba22', 'prefect.resource.role': 'flow-run', 'prefect.resource.name': 'white-mastiff'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.flow.b824c832-982e-4d9b-b8eb-bcce9a8aab0e', 'prefect.resource.role': 'flow', 'prefect.resource.name': 'cms'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.deployment.95a06a70-c47f-44b0-9190-6ee3e6c54a8a', 'prefect.resource.role': 'deployment', 'prefect.resource.name': 'creatives_loki'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.work-queue.3c646f52-4d3a-41fe-bef1-d91396ad752a', 'prefect.resource.role': 'work-queue', 'prefect.resource.name': 'kubernetes'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.work-pool.326989fc-2dd6-4f88-9d0e-2261448ec7e2', 'prefect.resource.role': 'work-pool', 'prefect.resource.name': 'default-agent-pool'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.tag.cms', 'prefect.resource.role': 'tag'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.tag.loki', 'prefect.resource.role': 'tag'})], payload={'intended': {'from': 'RUNNING', 'to': 'COMPLETED'}, 'initial_state': {'type': 'RUNNING', 'name': 'Running', 'message': ''}, 'validated_state': {'type': 'COMPLETED', 'name': 'Completed', 'message': ''}}, id=UUID('d0b10c03-451a-4006-9da6-5bd4562f5499'), follows=UUID('d2cfd3a8-ae69-4aad-b8ed-20b8eba01ce7')), <Context object at 0x7ff91a1eee00>)
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 655, in __await_impl_timeout__
    return await self.__await_impl__()
  File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 659, in __await_impl__
    _transport, _protocol = await self._create_connection()
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 1026, in create_connection
    infos = await self._ensure_resolved(
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 1405, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 861, in getaddrinfo
    return await self.run_in_executor(
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/services.py", line 151, in _main_loop
    await self._handle(item)
  File "/usr/local/lib/python3.9/site-packages/prefect/events/worker.py", line 41, in _handle
    await self._client.emit(event)
  File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 22, in emit
    return await self._emit(event)
  File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 180, in _emit
    await self._reconnect()
  File "/usr/local/lib/python3.9/site-packages/prefect/events/clients.py", line 141, in _reconnect
    self._websocket = await self._connect.__aenter__()
  File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 637, in __aenter__
    return await self
  File "/usr/local/lib/python3.9/site-packages/websockets/legacy/client.py", line 655, in __await_impl_timeout__
    return await self.__await_impl__()
  File "/usr/local/lib/python3.9/site-packages/websockets/legacy/async_timeout.py", line 169, in __aexit__
    self._do_exit(exc_type)
  File "/usr/local/lib/python3.9/site-packages/websockets/legacy/async_timeout.py", line 252, in _do_exit
    raise asyncio.TimeoutError
asyncio.exceptions.TimeoutError
As an FYI this same flow running locally takes about 2 mins
I was on .12 and have bumped to .13 - the exceptions seem to have disappeared - however still seeing multi minute gaps between tasks - this turns 2 mins of cpu usage into 4-5x of that - serious cost implications for me!
Spoke too soon - got 2 tasks in and then got this from the pod;
Copy code
GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed to process item
and same in the agent...
3 minutes between end of one task and start of the other?
Copy code
11:12:16.270 | INFO    | Task run 'get_raw_collection-0' - Finished in state Completed()
... ERROR AS ABOVE
11:15:01.282 | INFO    | Flow run 'cocky-chachalaca' - Created task run 'store_raw-0' for task 'store_raw'
I really need some help here!
Exactly the same flow diff params 10s between tasks?
j
Hey! Would you be able to open a issue with a reproducible example of what you're seeing? It would really help with triage/diagnoses of what's going on
The events workers should be non-blocking and in the background, so any errors/failures in theory shouldn't impact your runs, only you may see missing task events. So unclear to me initially if that's related to the delay in task runs or not
j
Yeah absolutely I can try to put an issue together - but its a pretty complex infra setup right - a reproducible one is going to be a challenge? Just a code sample is not really going to help?
Any pointers as to how to make it as reproducible as possible?
Minikube perhaps?
j
Any information about your setup could help reproduce πŸ˜„ It could be helpful to rule out the flow code. Can you try running a simple example (flow with 1 or 2 tasks that print) in your env and see if it gives the same behavior?
j
β€’ GKE Autopilot β€’ Agent - manifests as those that come from the cli command β€’ Flows installed as a python module in the jobs container and then pointing the deployment path to "site-packages" Anything else that would be useful?
I am not using the worker since the way it looks up the flow src behaves differently from the agent and I cannot get it to find the flow in "site-packages"
The flow itself works completely succesfully (no huge gaps between flows) if I logout from prefect cloud and then run it locally - in fact they all succceed and do exactly what I expect - they just take a lot longer than exactly the same underlying flow code in v1
j
I suspect something in your network setup is not allowing for a websocket connection to be created, which is what events use. It's a little unclear but maybe is blocking? So your tasks won't execute until it gets control of the event loop back, leading to the delay. i'm just guessing though and a little suprised, the events worker shouldn't have an impact on your code, even with issues
Can you try setting
PREFECT_EXPERIMENTAL_ENABLE_EVENTS_CLIENT=False
and see what happens?
j
This would be on both the job pod and the agent?
so in the manifest for the KubernetesJob infra block and the manifest for the agent deploymenyt?
j
Yepyep for both
j
ok will do
j
The important one is probably the job pod (Related to task run delay)
j
I don't understand why there is any waiting for a response from cloud api before starting the next task - this confuses me a lot...
Will also take a look at cluster settings and see if there is anything re websockets and gke perhaps?
j
sounds good, let me know what you find. πŸ‘ If you're able to open the connection OR disable events and things start working, I think it's a pretty good indicator of the
EventsWorker
blocking something it shouldn't be
j
What is this flag gonna do other than maybe solve the issue?
j
it disables client side events from being sent. For example, task run state changes are client side events. those wouldnt show up in the event feed:
ideally you'll be able to open websocket connections on your cluster, but if you can't this may give you a workaround (at the cost of client side events being sent), while we look into a fix. if nothing changes, it's another issue though πŸ™‚
j
But the events are getting through right I see them in the UI - so its not that websockets are blocked completely or anything
j
task run events? (other events are generated server side)
j
So no exceptions re EventWorker - but still seeing >1min between tasks;
Copy code
15:59:23.783 | INFO    | Task run 'get_raw_collection-0' - Finished in state Completed()
16:00:47.111 | INFO    | Flow run 'victorious-lizard' - Created task run 'store_raw-0' for task 'store_raw'

16:04:25.144 | INFO    | Task run 'store_raw-0' - Finished in state Completed()
16:05:47.383 | INFO    | Flow run 'victorious-lizard' - Created task run 'clean_collection-0' for task 'clean_collection'
task run events? (other events are generated server side)
Oh maybe not no...I see the logs from the tasks though
but then this pair of tasks;
Copy code
16:08:23.833 | INFO    | Task run 'clean_collection-0' - Finished in state Completed()
16:08:24.135 | INFO    | Flow run 'victorious-lizard' - Created task run 'remove_existing-0' for task 'remove_existing'
1 second in between
I have bumped the pod resources to ridiculous levels just in case 16GB and 4cpu - so I hope its not resource related...
for context all this is doing is like pulling maybe 2mb of json from an API
j
hmm that's interesting. Could you test with a small json? I wonder if it's from the parameters and then latency back/forth to the api
j
the paramteters? One of them is quite large its a GQL query?
large...15 lines of code
j
ah I just meant if the 2mb json was being passed around to some tasks
j
but not just a single short string
yeah it defo is being passed around
I intend to pass around much larger things that that though? This is why prefect and not, old, airflow...
I will make it smaller see what happend
j
just a hypothesis if what you're seeing is network latency
one change from v1 to v2 is that a lot of orchestration happens server side vs client side
j
I hope very much that the results coming out of a task are not being sent back to the prefect API though right?
I need to pass GBs of stuff between tasks...
and its private
Ok yeah - smaller data passed between tasks basically no waiting at all...I don't really know what to do from here? Is this constraint documented? I have to completely restructure all my v1 flows to not pass data between tasks? So basicly just write flows with a single long running task?
j
Results/parameters for tasks aren't stored but a representation of the inputs is passed to the api, not the data itself though
Would you be able to open an issue with an MRE/logs from run that you see this with
PREFECT_DEBUG_MODE=True
? The size of the parameter seems to matter but it's really hard to say any more than that without an MRE I can run πŸ™‚
It's not a known constraint that big task inputs cannot be used
j
Yeah I will work on this tomorrow - thanks for your help
its defo the size though - just ran 10 with small/ 10 with large same outcome big gaps when big - not when small
j
sounds good, let me know!
z
If you want to debug to determine if this is an issue with events, then you can run against a local server which makes all event emission a no-op.
We traverse task run parameters locally to look for upstream futures, I suspect that’s the issue. What happens if you wrap your task run input data with
prefect.utilities.annotations.quote
?
That will disable inspection of the data
j
So if I disconnect from the api altogether just call the flow in a
if __name__ == '__main__'
block this doesn't happen
Can you give me an example of what you mean by using this
prefect.utilities.annotations.quote
?
j
like;
Copy code
@flow
def foo():
    out = quote(bar())
    baz(out)
?
z
That would work, although I’d say
baz(quote(out))
instead
j
Sadly not still >1min gaps
I will work on a repro and try and make an issue
thanks for help both
Still working on a repro - however have added debug logging - this is the logs in the long wait between tasks;
Copy code
10:01:07.309 | INFO    | Task run 'get_raw_collection-0' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))
10:01:07.311 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> added item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T10:01:07.309470+00:00', 'flow_run_id': 'f76582f4-5c8b-4d58-97b4-abf50e2dbbd4', 'task_run_id': '115a83ec-2ed7-4fda-a5d8-eeb20995d7bb'} to batch (size 401/3000000)
10:01:07.311 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <function _get_state_result at 0x7fed2b818040> --> return coroutine for internal await
10:01:07.311 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <function UnpersistedResult.get at 0x7fed2b8180d0> --> return coroutine for internal await
10:01:07.311 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <AsyncCancelScope, name='get_task_call_return_value' COMPLETED, runtime=91.85> exited
10:01:07.312 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fed23377610>, flow_run_context=FlowRunContext(start_time=DateT...)
10:01:07.317 | DEBUG   | MainThread   | prefect._internal.concurrency - Waiter <SyncWaiter call=get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fecf0efe190>, flow_run_context=FlowRunContext(start_time=DateT...), owner='MainThread'> watching for callbacks
10:01:07.481 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Running call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fecf0efe190>, flow_run_context=FlowRunContext(start_time=DateT...) in thread 'GlobalEventLoopThread' with timeout None
10:01:11.351 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get_task_call_return_value' RUNNING, runtime=0.00> entered
10:01:11.351 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get_task_call_return_value' COMPLETED, runtime=0.00> exited
10:01:11.351 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fecf0efe190>, flow_run_context=FlowRunContext(start_time=DateT...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False>
10:01:13.279 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <AsyncCancelScope, name='get_task_call_return_value' RUNNING, runtime=0.00> entered
10:01:13.280 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> processing batch of size 401
10:02:32.295 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 'store_raw-0' for task 'store_raw'", 'timestamp': '2023-06-13T10:02:32.295053+00:00', 'flow_run_id': 'f76582f4-5c8b-4d58-97b4-abf50e2dbbd4', 'task_run_id': None, '__payload_size__': 233}
So we have task finish at "100107.309" - dubug logs following on quite quickly up until this log;
Copy code
10:01:13.280 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> processing batch of size 401
the start of the next log isn't until "100232.295" - the next task starting here;
Copy code
10:02:32.295 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fed233e8e20> enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 'store_raw-0' for task 'store_raw'", 'timestamp': '2023-06-13T10:02:32.295053+00:00', 'flow_run_id': 'f76582f4-5c8b-4d58-97b4-abf50e2dbbd4', 'task_run_id': None, '__payload_size__': 233}
10:02:32.295 | INFO    | Flow run 'daring-bullmastiff' - Created task run 'store_raw-0' for task 'store_raw'