Christopher
02/04/2022, 7:40 AMAnna Geller
from prefect import task, Flow
@task
def custom_business_logic(x):
return x + 1
with Flow("hello") as flow:
data_result = custom_business_logic(42)
custom_business_logic(data_result)
b) if you require to build this cyclic workflow in a while-loop pattern, you can use task looping
c) my recommendations and the best approach I see for your use case would be to leverage mapping which allows to call the same task multiple times over various inputs and it even does so in parallel
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def generate_random_numbers():
return list(range(1, 200))
@task
def add_one(x):
return x + 1
@task(log_stdout=True)
def print_results(res):
print(res)
with Flow("mapping", executor=LocalDaskExecutor()) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
result_2 = add_one.map(result)
print_results(result_2)
Regarding #2 - check out this Discourse topic - it’s 100% possible and you can even implement it in various waysChristopher
02/04/2022, 10:26 AM