Krishna Suryawanshi
12/26/2024, 7:46 AMpayload
argument
parameters={"payload": {"data":"{{ event.payload|tojson }}", "occured":"{{ event.occurred.timestamp() }}"}}
)
@flow(log_prints=True)
def data_pipeline(payload):
data = json.loads(payload['data'])
n_o_w, e_p_s, n_o_m=data['data']['number_of_workers'], data['data']['events_per_second'], data['data']['number_of_minutes']
with open(f'some_{n_o_w}_{e_p_s}_{n_o_m}.csv', 'a') as f:
print(f'{data['event']}', file=f, flush=True)
return 1
# Create Deployment from Source
if name == "__main__":
data_pipeline.from_source(
source=str(Path(file).parent),
entrypoint="basic_flow.py:data_pipeline",
).deploy(name="ddata piplevm",
work_pool_name='process-worker-pool',
description='None',
triggers=[deployment_trigger]
)
above is flow defination. We are testing by manually emitting say 10 events per second . The latecy data is attached as csv.
We emiited 5 events per second for 2 minutes while 10 workers actively polling for flows.
Execution envoirnment:
OS: windows 10
Prefect version:3.1.7 (prefect local server with sqlite database)
Processor: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz 2.42 GHz 8 logical cores
Memory: 16.0 GB (15.8 GB usable)
Python:3.12.8
How can we address this problem in efficient way to reduce latency? Is there a solution?Nate
12/27/2024, 12:09 AMKrishna Suryawanshi
12/27/2024, 4:35 AMKrishna Suryawanshi
12/30/2024, 4:48 AMNate
12/30/2024, 4:49 AMKrishna Suryawanshi
12/30/2024, 4:50 AMKrishna Suryawanshi
12/30/2024, 4:51 AMKrishna Suryawanshi
12/30/2024, 6:08 AMNate
12/30/2024, 7:28 AMKrishna Suryawanshi
12/30/2024, 7:28 AMNate
12/30/2024, 7:42 AMPREFECT_WORKER_QUERY_SECONDS=0.5
, but note that will 20x how often the worker asks the server for work (defaults to 10 seconds)
if you want full control you can use the websocket client directly and not use a deployment, worker or trigger and call your flow functions when you see the prerequisite events but that would be more work (since you'd re-implement triggers)Krishna Suryawanshi
12/30/2024, 7:49 AMKrishna Suryawanshi
12/30/2024, 9:32 AMKrishna Suryawanshi
12/30/2024, 11:06 AMNate
12/30/2024, 3:36 PMthe worker doesn't check for scheduled runs that often. so you could set thei think this is likely the most relevant piece of info. the worker polls the server to find scheduled runs, but by default it only checks every 10 seconds have you changed this setting?,PREFECT_WORKER_QUERY_SECONDS=0.5
Krishna Suryawanshi
12/31/2024, 4:34 AMKrishna Suryawanshi
01/01/2025, 9:54 AMJan 1st, 2025
Info
Worker 'ProcessWorker fe77d676-15dd-4c16-b3ab-5a70b78a4f22' submitting flow run '8a475663-4903-428b-8835-0aa421e956f2'
02:45:01 PM
prefect.flow_runs.worker
Info
Opening process...
02:45:01 PM
prefect.flow_runs.worker
Debug
Process running command: "C:\Users\dv-Krishna\Desktop\Projects\Perfect_Getting_Started\env\Scripts\python.exe" -m prefect.engine in C:\Users\DV-KRI~1\AppData\Local\Temp\tmpi57dhtvxprefect
02:45:01 PM
prefect.flow_runs.worker
Info
Completed submission of flow run '8a475663-4903-428b-8835-0aa421e956f2'
02:45:01 PM
prefect.flow_runs.worker
Debug
Running 1 deployment pull step(s)
02:45:04 PM
prefect.flow_runs
Debug
Changing working directory to 'C:\\Users\\dv-Krishna\\Desktop\\Projects\\Perfect_Getting_Started'
02:45:05 PM
prefect.flow_runs
Debug
Importing flow code from 'basic_flow.py:data_pipeline'
02:45:05 PM
prefect.flow_runs
Debug
Executing flow 'data-pipeline' for flow run 'ethereal-centipede'...
02:45:05 PM
prefect.flow_runs
Info
Finished in state Completed()
02:45:06 PM
prefect.flow_runs
Info
Process 13860 exited cleanly.
02:45:06 PM
prefect.flow_runs.worker
Between second debug and info block above it, can you please explain why is there a 3 second gap in timestamps.Zarinah
01/02/2025, 10:13 PMI also see this issue. I'm trying to debug. I have set theand confirmed it's set. What I see is when I kick off the flow from the UI it stays in scheduling state for 6 seconds.PREFECT_WORKER_QUERY_SECONDS=0.5
Zarinah
01/04/2025, 2:18 AMNate
01/04/2025, 2:20 AMWell my mentor asked if can we reach as real time as possible ,possibly less than 500 ms.and i thought it was a
float
- my bad
thanks for following upKrishna Suryawanshi
01/08/2025, 4:31 PMKrishna Suryawanshi
01/08/2025, 4:34 PMNate
01/08/2025, 5:05 PMNate
01/08/2025, 5:07 PMKrishna Suryawanshi
01/09/2025, 6:50 AMKrishna Suryawanshi
01/09/2025, 6:54 AM