Hi, Would you know by any chance a way to restart...
# ask-community
e
Hi, Would you know by any chance a way to restart a flow automatically Use case: We love prefect it handle many things on our end Tasks runned every hour obviously but also task that listen to streams of data for an hoour and restart (cron: 0 * * * *) unfortunatly it happened before that the flow crash or doesn't restart when we have a infra issue So we're essentially looking for a way to have a task constantly running in the background current code:
Copy code
@task(timeout_seconds=4*60*60 + 10)
async def run():
    """
    Runs the merged transactions flow for 4 hours, executing every 10 seconds.
    
    Input:
      None.
    Output:
      None.
    """
    run_start = datetime.now()
    run_end = run_start + timedelta(hours=4)
    print(f"[{datetime.now()}] Flow will run until: {run_end}")
    mongo_manager = mongo_connect("mongo-manager-instance")
    clickhouse_manager = clickhouse_connect("clickhouse-manager-instance")

    current_time = datetime.now()
    last_computed = current_time
    while current_time < run_end:
        process_and_insert(mongo_manager, clickhouse_manager)
        current_time = datetime.now()
        before_next = 2 - (current_time - last_computed).total_seconds() + 1
        if before_next > 0:
            if current_time + timedelta(seconds=before_next) > run_end:
                break
            print(f"[{datetime.now()}] Sleeping for {int(before_next)} seconds")
            await asyncio.sleep(before_next)
        last_computed = datetime.now()

    clickhouse_manager.remove()

@flow(name="Merged Transactions Flow", log_prints=True)
async def transaction_full_and_metadata():
    """
    Orchestrates the 4-hour merged transactions processing loop.
    
    Input:
      None.
    Output:
      None.
    """
    await run()

if __name__ == "__main__":
    path = Path(__file__)
    transaction_full_and_metadata.from_source(
        source=f"{path.parent}",
        entrypoint=f"{path}:transaction_full_and_metadata",
    ).deploy(
        name="Transactions Flow",
        parameters=dict(),
        work_pool_name="local",
        cron="0 */4 * * *",
        ignore_warnings=True
    )