Hi! Maybe someone can help? <https://github.com/Pr...
# ask-community
k
t
I was following the same thread, turns out v2 ruled out signals so ended up using a custom implementation:
Copy code
import sys
import anyio

DEFAULT_TIMEOUT_SECONDS = 3600

async def main(timeout):
    condition_not_met=True
    async with anyio.move_on_after(timeout):
        while True:
            if condition_not_met:
                print("Waiting ")
                await anyio.sleep(10)  # sleep for 10 seconds
            else:
                print("Continue ")
                break

        if condition_not_met:
            raise RuntimeError("Timed out while attempting to check for upstream jobs")

if __name__ == "__main__":
    try:
        timeout = int(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_TIMEOUT_SECONDS
        anyio.run(main, timeout)
    except Exception as exc:
        print(exc)
        exit(1)
    else:
        exit(0)
p
I don't really know airflow all that well, but Prefect allows for while loops in your flows
Copy code
@task
async def my_task():
  return False

@flow
async def my_flow():
    while True:
      res = await my_task()
      if res:
          break
      await asyncio.sleep(10)

    task_two()
That is a perfectly valid pattern