Eliot Courtel
02/28/2025, 1:42 PMReported flow run 'af02622e-c533-4c9c-8a2f-7fb1de64b4d9' as crashed: Flow run could not be submitted to infrastructure:
RuntimeError('Failed to start flow run process.')
02:17:51 PM
prefect.flow_runs.worker
Error
Failed to submit flow run 'af02622e-c533-4c9c-8a2f-7fb1de64b4d9' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/workers/base.py", line 1011, in _submit_run_and_capture_errors
result = await self.run(
File "/usr/local/lib/python3.9/site-packages/prefect/workers/process.py", line 216, in run
raise RuntimeError("Failed to start flow run process.")
RuntimeError: Failed to start flow run process.
02:17:50 PM
prefect.flow_runs.worker
Info
Worker 'ProcessWorker 4fd201b6-c78e-4218-b7ea-c888cbecba29' submitting flow run 'af02622e-c533-4c9c-8a2f-7fb1de64b4d9'
Turns out I'm also getting a pretty explicit error on my worker:
prefect.runner - Flow run limit reached; 5 flow runs in progress. You can control this limit by passing a limit value to serve or adjusting the PREFECT_RUNNER_PROCESS_LIMIT setting
Would you have any hint on how to fix it ?
I'm running my worker using this docker compose service explicitly setting a high limit value
worker:
build: ./prefect/
container_name: prefect-worker
restart: always
expose:
- 8080
command: prefect worker start --pool local --install-policy always --limit 100 --with-healthcheck
environment:
PREFECT_API_URL: <http://server:4200/api>
PYTHONPATH: "/usr/app"
healthcheck:
test: ["CMD", "curl", "-f", "<http://localhost:8080/health>"]
interval: 5s
timeout: 3s
retries: 10
depends_on:
server:
condition: service_healthy
logging:
options:
max-size: "10m"
max-file: "3"
Thanks a lotNate
02/28/2025, 5:28 PM--limit
Eliot Courtel
02/28/2025, 5:57 PMEliot Courtel
02/28/2025, 6:04 PM@task(timeout_seconds=4*60*60 + 10)
async def run():
"""
Runs the merged transactions flow for 4 hours, executing every 10 seconds.
Input:
None.
Output:
None.
"""
run_start = datetime.now()
run_end = run_start + timedelta(hours=4)
print(f"[{datetime.now()}] Flow will run until: {run_end}")
mongo_manager = mongo_connect("mongo-manager-instance")
clickhouse_manager = clickhouse_connect("clickhouse-manager-instance")
current_time = datetime.now()
last_computed = current_time
while current_time < run_end:
process_and_insert(mongo_manager, clickhouse_manager)
current_time = datetime.now()
before_next = 2 - (current_time - last_computed).total_seconds() + 1
if before_next > 0:
if current_time + timedelta(seconds=before_next) > run_end:
break
print(f"[{datetime.now()}] Sleeping for {int(before_next)} seconds")
await asyncio.sleep(before_next)
last_computed = datetime.now()
clickhouse_manager.remove()
@flow(name="Merged Transactions Flow", log_prints=True)
async def transaction_full_and_metadata():
"""
Orchestrates the 4-hour merged transactions processing loop.
Input:
None.
Output:
None.
"""
await run()
if __name__ == "__main__":
path = Path(__file__)
transaction_full_and_metadata.from_source(
source=f"{path.parent}",
entrypoint=f"{path}:transaction_full_and_metadata",
).deploy(
name="Transactions Flow",
parameters=dict(),
work_pool_name="local",
cron="0 */4 * * *",
ignore_warnings=True
)