Rio McMahon
08/06/2022, 11:53 PMfrom prefect import flow, task
@task
def print_input(i):
print(i)
@flow()
def print_recursively(i):
print_input(i)
i += 1
if i <= 10:
print_recursively(i)
return i
if __name__ == '__main__':
print_recursively(1)
RuntimeError: The task runner is already started!
errorNico Neumann
08/07/2022, 1:49 AM.fn
This works for me:
from prefect import flow, task
@task
def print_input(i):
print(i)
@task()
def print_recursively(i):
print_input.fn(i)
i += 1
if i <= 10:
print_recursively.fn(i)
return i
@flow()
def run_flow(i):
print_recursively(i)
if __name__ == '__main__':
run_flow(1)
client
(needs async) to interact with your flows/deployments:
import asyncio
from prefect import flow
from prefect.client import get_client
@flow
async def run_flow():
async with get_client() as client:
# print your deployment id here or using prefect deployment ls
deployments = await client.read_deployments()
deployment_id = "..." # put your deployment id here
for i in range(10):
parameters = {"i": i}
flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, parameters=parameters)
if __name__ == "__main__":
asyncio.run(run_flow())
Rio McMahon
08/08/2022, 2:30 PM