Hi, I get the following errror on one of my flow `...
# ask-community
e
Hi, I get the following errror on one of my flow
Copy code
Reported flow run 'af02622e-c533-4c9c-8a2f-7fb1de64b4d9' as crashed: Flow run could not be submitted to infrastructure:
RuntimeError('Failed to start flow run process.')
02:17:51 PM
prefect.flow_runs.worker
Error
Failed to submit flow run 'af02622e-c533-4c9c-8a2f-7fb1de64b4d9' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/workers/base.py", line 1011, in _submit_run_and_capture_errors
    result = await self.run(
  File "/usr/local/lib/python3.9/site-packages/prefect/workers/process.py", line 216, in run
    raise RuntimeError("Failed to start flow run process.")
RuntimeError: Failed to start flow run process.
02:17:50 PM
prefect.flow_runs.worker
Info
Worker 'ProcessWorker 4fd201b6-c78e-4218-b7ea-c888cbecba29' submitting flow run 'af02622e-c533-4c9c-8a2f-7fb1de64b4d9'
Turns out I'm also getting a pretty explicit error on my worker:
Copy code
prefect.runner - Flow run limit reached; 5 flow runs in progress. You can control this limit by passing a limit value to serve or adjusting the PREFECT_RUNNER_PROCESS_LIMIT setting
Would you have any hint on how to fix it ? I'm running my worker using this docker compose service explicitly setting a high limit value
Copy code
worker:
    build: ./prefect/
    container_name: prefect-worker
    restart: always
    expose:
      - 8080
    command: prefect worker start --pool local --install-policy always --limit 100 --with-healthcheck 
    environment:
      PREFECT_API_URL: <http://server:4200/api>
      PYTHONPATH: "/usr/app"
    healthcheck:
      test: ["CMD", "curl", "-f", "<http://localhost:8080/health>"]
      interval: 5s
      timeout: 3s
      retries: 10
    depends_on:
      server:
        condition: service_healthy
    logging:
      options:
        max-size: "10m"
        max-file: "3"
Thanks a lot
n
hi @Eliot Courtel - this might be a little confusing, so maybe we can make this more clear in the error message, but the runner and the worker are not the same thing, so you'll actually want to set this environment variable PREFECT_RUNNER_PROCESS_LIMIT, instead of passing
--limit
e
Thanks
@Nate Would you know by any chance a way to restart a flow automatically Use case: We love prefect it handle many things on our end Tasks runned every hour obviously but also task that listen to streams of data for an hoour and restart (cron: 0 * * * *) unfortunatly it happened before that the flow crash or doesn't restart when we have a infra issue So we're essentially looking for a way to have a task constantly running in the background current code:
Copy code
@task(timeout_seconds=4*60*60 + 10)
async def run():
    """
    Runs the merged transactions flow for 4 hours, executing every 10 seconds.
    
    Input:
      None.
    Output:
      None.
    """
    run_start = datetime.now()
    run_end = run_start + timedelta(hours=4)
    print(f"[{datetime.now()}] Flow will run until: {run_end}")
    mongo_manager = mongo_connect("mongo-manager-instance")
    clickhouse_manager = clickhouse_connect("clickhouse-manager-instance")

    current_time = datetime.now()
    last_computed = current_time
    while current_time < run_end:
        process_and_insert(mongo_manager, clickhouse_manager)
        current_time = datetime.now()
        before_next = 2 - (current_time - last_computed).total_seconds() + 1
        if before_next > 0:
            if current_time + timedelta(seconds=before_next) > run_end:
                break
            print(f"[{datetime.now()}] Sleeping for {int(before_next)} seconds")
            await asyncio.sleep(before_next)
        last_computed = datetime.now()

    clickhouse_manager.remove()

@flow(name="Merged Transactions Flow", log_prints=True)
async def transaction_full_and_metadata():
    """
    Orchestrates the 4-hour merged transactions processing loop.
    
    Input:
      None.
    Output:
      None.
    """
    await run()

if __name__ == "__main__":
    path = Path(__file__)
    transaction_full_and_metadata.from_source(
        source=f"{path.parent}",
        entrypoint=f"{path}:transaction_full_and_metadata",
    ).deploy(
        name="Transactions Flow",
        parameters=dict(),
        work_pool_name="local",
        cron="0 */4 * * *",
        ignore_warnings=True
    )