https://prefect.io logo
Title
r

Rio McMahon

08/06/2022, 11:53 PM
Hi there - I am trying to implement a recursive flow pattern in prefect 2.0; any tips on how to do this? I’ve seen some methods that leverage async in the code contest submissions but am curious if it is possible to implement it without async? I’ve included my attempts in the comments.
from prefect import flow, task

@task
def print_input(i):
   print(i)


@flow()
def print_recursively(i):
   print_input(i)
   i += 1
   if i <= 10:
      print_recursively(i)
   return i

if __name__ == '__main__':
   print_recursively(1)
However when I try this I get a
RuntimeError: The task runner is already started!
error
Second (related) question: if I want to schedule a flow from python is that possible? I think an alternative to running flows recursively in the manner described about would be to have the flow “push itself” to a task queue. Is this possible to do? If not is it possible to access the scheduler from python code rather than the UI? Thanks.
n

Nico Neumann

08/07/2022, 1:49 AM
First question: It seems flows cannot be recursively called but tasks can. As you cannot call tasks within task directly, I added
.fn
This works for me:
from prefect import flow, task


@task
def print_input(i):
   print(i)


@task()
def print_recursively(i):
   print_input.fn(i)
   i += 1
   if i <= 10:
      print_recursively.fn(i)
   return i

@flow()
def run_flow(i):
   print_recursively(i)

if __name__ == '__main__':
   run_flow(1)
:thank-you: 1
Second question: You can use the
client
(needs async) to interact with your flows/deployments:
import asyncio

from prefect import flow
from prefect.client import get_client


@flow
async def run_flow():
    async with get_client() as client:
        # print your deployment id here or using prefect deployment ls
        deployments = await client.read_deployments()
        deployment_id = "..." # put your deployment id here

        for i in range(10):
            parameters = {"i": i}
            flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, parameters=parameters)


if __name__ == "__main__":
    asyncio.run(run_flow())
🙌 2
r

Rio McMahon

08/08/2022, 2:30 PM
@Nico Neumann Thanks for the feedback/help - I think that these will work nicely
🙌 1