Eliot Courtel
03/01/2025, 11:00 PM@task(timeout_seconds=4*60*60 + 10)
async def run():
"""
Runs the merged transactions flow for 4 hours, executing every 10 seconds.
Input:
None.
Output:
None.
"""
run_start = datetime.now()
run_end = run_start + timedelta(hours=4)
print(f"[{datetime.now()}] Flow will run until: {run_end}")
mongo_manager = mongo_connect("mongo-manager-instance")
clickhouse_manager = clickhouse_connect("clickhouse-manager-instance")
current_time = datetime.now()
last_computed = current_time
while current_time < run_end:
process_and_insert(mongo_manager, clickhouse_manager)
current_time = datetime.now()
before_next = 2 - (current_time - last_computed).total_seconds() + 1
if before_next > 0:
if current_time + timedelta(seconds=before_next) > run_end:
break
print(f"[{datetime.now()}] Sleeping for {int(before_next)} seconds")
await asyncio.sleep(before_next)
last_computed = datetime.now()
clickhouse_manager.remove()
@flow(name="Merged Transactions Flow", log_prints=True)
async def transaction_full_and_metadata():
"""
Orchestrates the 4-hour merged transactions processing loop.
Input:
None.
Output:
None.
"""
await run()
if __name__ == "__main__":
path = Path(__file__)
transaction_full_and_metadata.from_source(
source=f"{path.parent}",
entrypoint=f"{path}:transaction_full_and_metadata",
).deploy(
name="Transactions Flow",
parameters=dict(),
work_pool_name="local",
cron="0 */4 * * *",
ignore_warnings=True
)