Hello Community. I have a situation here. I want...
# ask-community
k
Hello Community. I have a situation here. I want to run a flow as suggested by my mentor as we get an event. It processes payload . Currently we are using process pool to create process pool. Problem is we are getting high latencies. import json from pathlib import Path from prefect import flow from prefect.events.schemas.automations import Posture from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger from prefect.events.schemas.events import ResourceSpecification # Create a basic flow with deployment Trigger to Run this flow as after certain Events. # Following Code uses minimum options to create a DeploymentEventTrigger. deployment_trigger = DeploymentEventTrigger( expect={"custom.event.*"}, # Match Resources to filter Deployments match=ResourceSpecification({'prefect.resource.id':["prefect.custom.event.*"]}), posture=Posture.Reactive, # Evaluate the trigger separately for each distinct value of these label for_each={"prefect.resource.id"}, # Set threshold to 1 and with in to 0 to trigger automation for every event threshold=1, within=0, # Sent event payload to flow as
payload
argument parameters={"payload": {"data":"{{ event.payload|tojson }}", "occured":"{{ event.occurred.timestamp() }}"}} ) @flow(log_prints=True) def data_pipeline(payload): data = json.loads(payload['data']) n_o_w, e_p_s, n_o_m=data['data']['number_of_workers'], data['data']['events_per_second'], data['data']['number_of_minutes'] with open(f'some_{n_o_w}_{e_p_s}_{n_o_m}.csv', 'a') as f: print(f'{data['event']}', file=f, flush=True) return 1 # Create Deployment from Source if name == "__main__": data_pipeline.from_source( source=str(Path(file).parent), entrypoint="basic_flow.py:data_pipeline", ).deploy(name="ddata piplevm", work_pool_name='process-worker-pool', description='None', triggers=[deployment_trigger] ) above is flow defination. We are testing by manually emitting say 10 events per second . The latecy data is attached as csv. We emiited 5 events per second for 2 minutes while 10 workers actively polling for flows. Execution envoirnment: OS: windows 10 Prefect version:3.1.7 (prefect local server with sqlite database) Processor: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz 2.42 GHz 8 logical cores Memory: 16.0 GB (15.8 GB usable) Python:3.12.8 How can we address this problem in efficient way to reduce latency? Is there a solution?
n
hi @Krishna Suryawanshi - can you please place the code snippet in the thread? also im not clear on what latency you're describing, is it the amount of time after the event is created that the flow run gets triggered?
k
Thank you for replying. Yes sir 2 latencies 1. amount of time flow run is created after event is occured 2. amount of time flow run is started after flow run is created basic_flow.py contains a minimal flow event_emitter.py contains code that emits events based on user defined numbers
Hello @Nate can you please help me with this?
n
with the latencies, what are your expectations and observations?
k
Well my mentor asked if can we reach as real time as possible ,possibly less than 500 ms.
AS for the observations minimum is at least 3-4 seconds.
Hello @Nate even if not possible to below 1 sec, is there a way to reduce latencies in above case ?
n
are you using prefect cloud or prefect open source?
k
prefect open source
n
on open source, the events should be propagated quickly enough for that, but the worker doesn't check for scheduled runs that often. so you could set the
PREFECT_WORKER_QUERY_SECONDS=0.5
, but note that will 20x how often the worker asks the server for work (defaults to 10 seconds) if you want full control you can use the websocket client directly and not use a deployment, worker or trigger and call your flow functions when you see the prerequisite events but that would be more work (since you'd re-implement triggers)
k
Thank you, I will try to work with this. Thank you for your help. If there is anything will post on community. Thank you very much.
Hello @Nate one more question. In case of getting raw events, latencies are very low. But when executing automation in response such events latencies are very high. Can you help me understanding why is such case?
Hello @Nate one more question. In case of getting raw events, latencies are very low. But when executing automation in response such events latencies are very high. Can you help me understanding why is such case?
n
the worker doesn't check for scheduled runs that often. so you could set the
PREFECT_WORKER_QUERY_SECONDS=0.5
,
i think this is likely the most relevant piece of info. the worker polls the server to find scheduled runs, but by default it only checks every 10 seconds have you changed this setting?
k
Yes Already did but didn't find any improvement.
Hello @Nate I have a question based on prefect logs. Runs ethereal-centipede Completed 1s None Flow data-pipeline Deployment ddata piplevm Work Pool process-worker-pool Work Queue default Automation ddata piplevm__automation_1 This flow run did not generate any task or subflow runs • LogsTask RunsSubflow RunsArtifactsDetailsParametersJob Variables
Copy code
Jan 1st, 2025
Info
Worker 'ProcessWorker fe77d676-15dd-4c16-b3ab-5a70b78a4f22' submitting flow run '8a475663-4903-428b-8835-0aa421e956f2'
02:45:01 PM
prefect.flow_runs.worker
Info
Copy code
Opening process...
02:45:01 PM
prefect.flow_runs.worker
Debug
Copy code
Process running command: "C:\Users\dv-Krishna\Desktop\Projects\Perfect_Getting_Started\env\Scripts\python.exe" -m prefect.engine in C:\Users\DV-KRI~1\AppData\Local\Temp\tmpi57dhtvxprefect
02:45:01 PM
prefect.flow_runs.worker
Info
Copy code
Completed submission of flow run '8a475663-4903-428b-8835-0aa421e956f2'
02:45:01 PM
prefect.flow_runs.worker
Debug
Copy code
Running 1 deployment pull step(s)
02:45:04 PM
prefect.flow_runs
Debug
Copy code
Changing working directory to 'C:\\Users\\dv-Krishna\\Desktop\\Projects\\Perfect_Getting_Started'
02:45:05 PM
prefect.flow_runs
Debug
Copy code
Importing flow code from 'basic_flow.py:data_pipeline'
02:45:05 PM
prefect.flow_runs
Debug
Copy code
Executing flow 'data-pipeline' for flow run 'ethereal-centipede'...
02:45:05 PM
prefect.flow_runs
Info
Copy code
Finished in state Completed()
02:45:06 PM
prefect.flow_runs
Info
Copy code
Process 13860 exited cleanly.
02:45:06 PM
prefect.flow_runs.worker
Between second debug and info block above it, can you please explain why is there a 3 second gap in timestamps.
z
I also see this issue. I'm trying to debug. I have set the
PREFECT_WORKER_QUERY_SECONDS=0.5
and confirmed it's set. What I see is when I kick off the flow from the UI it stays in scheduling state for 6 seconds.
@Krishna Suryawanshi @Nate, I found that the PREFECT_RUNNER_POLL_FREQUENCY needs to be lowered. The default seems to be 10 seconds. You can bring it down to 1 second.
n
thank you! i shouldnt have recommended such a low value in retrospect - OP had mentioned
Well my mentor asked if can we reach as real time as possible ,possibly less than 500 ms.
and i thought it was a
float
- my bad thanks for following up
k
Hello @Nate @Zarinah can you help me reduce latency between when event was occurred and event was created i.e. second last column in https://prefect-community.slack.com/files/U086WDB0C81/F08637LQ7FZ/data_10_5_2.csv?origin_team=TL09B008Y&origin_channel=CKNSX5WG3 in this file in case of multiple events per second.
As it seems last second last column values are increasing some what linearly. Is there a way such that we can parallelize event handling in server ?
n
unfortunately, to some extent there is a latency out of your control which is "how long does it take for the event travel through the message bus in prefect cloud?" if you use open source, you'll notice that events are propagated nearly instantly, but as cloud serves all users, there is a non zero latency for events to come through (that we are always working on decreasing!)
to summarize, there are at least two sources of latency • a deployment is not triggered until the prerequisite events occur AND the event trickles through the message bus, firing the trigger • the worker only checks for scheduled runs every N seconds the setting only controls the latter
k
Hello @Nate @Zarinah I attached little more csv file, this time with different number of workers. Can you please give it a try and help me? Settings are this: PREFECT_RUNNER_POLL_FREQUENCY='1' PREFECT_WORKER_QUERY_SECONDS='0.5' These three are files such data_(no of_worker}__{events__per_second}__{number_of_minutes}.csv First column= event number Second Column = flow_run created - event_occured Third Column = flow run started - flow run created Values in third column is not linear but lies between some interval as we can see from attachements. As the second column is almost linear function of event emitted even in case of different number of workers . Can we parallelize something on server side such that we can say that maximum value of second column lies between some known interval as in case of third column
All this is running on open source local prefect server