https://prefect.io logo
Title
k

Kirill Ulich

01/27/2023, 2:55 PM
t

Tony Alfonse

01/27/2023, 3:11 PM
I was following the same thread, turns out v2 ruled out signals so ended up using a custom implementation:
import sys
import anyio

DEFAULT_TIMEOUT_SECONDS = 3600

async def main(timeout):
    condition_not_met=True
    async with anyio.move_on_after(timeout):
        while True:
            if condition_not_met:
                print("Waiting ")
                await anyio.sleep(10)  # sleep for 10 seconds
            else:
                print("Continue ")
                break

        if condition_not_met:
            raise RuntimeError("Timed out while attempting to check for upstream jobs")

if __name__ == "__main__":
    try:
        timeout = int(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_TIMEOUT_SECONDS
        anyio.run(main, timeout)
    except Exception as exc:
        print(exc)
        exit(1)
    else:
        exit(0)
p

Peyton Runyan

01/28/2023, 12:26 AM
I don't really know airflow all that well, but Prefect allows for while loops in your flows
@task
async def my_task():
  return False

@flow
async def my_flow():
    while True:
      res = await my_task()
      if res:
          break
      await asyncio.sleep(10)

    task_two()
That is a perfectly valid pattern