JulienKWM
10/29/2021, 8:17 AM@task()
def sample_task(some):
return some
@task()
def process(current, previous):
pass
with Flow("test") as flow:
previous = None
for n in range(180):
item = sample_task(n)
process(item, previous)
previous = item
Is there an other way to do this (inside a task maybe to handle flow parameters)?
Thank you for your help !Anna Geller
from prefect import Flow, Parameter, task
@task(log_stdout=True)
def hello_world(user_input: str):
result = f"hello {user_input}"
print(result)
return result
@task(log_stdout=True)
def demo_data_dependency(input_: str, parameter: str):
print(f"Got input: {input_} with parameter value: {parameter}")
with Flow("basic-flow") as flow:
param = Parameter("user_input", default="world")
hw = hello_world(param)
demo_data_dependency(hw, param)
Does this answer your question?JulienKWM
10/29/2021, 9:12 AMAnna Geller
Anna Geller
JulienKWM
10/29/2021, 9:31 AM@task()
def process(current, previous):
pass
Anna Geller
JulienKWM
10/29/2021, 9:42 AM@task()
def generate_hours(date: datetime):
return {
0: range(120),
6: range(180),
12: range(120),
18: range(180)
}[date.hour]
@task()
def sample_task_a(some):
return some
@task()
def sample_task_b(some):
return some
@task()
def sample_task_c(some):
return some
@task()
def process(date: datetime, hour, current, previous):
pass
with Flow("test") as flow:
target_date = DateTimeParameter('target_date')
previous = None
for h in generate_hours(target_date):
item_a = sample_task_a(h)
item_b = sample_task_b(item_a)
item_c = sample_task_c(item_b)
process(target_date, h, item_c, previous)
previous = item_c
I need to loop over block of multiple tasksAnna Geller
from prefect import Flow, Parameter, task
import pendulum
@task(log_stdout=True)
def hello_world(user_input: str):
if user_input is None:
default_val = pendulum.now("Europe/Berlin").add(days=-1).to_date_string()
print(f"Hello {default_val}")
else:
print(f"hello {user_input}")
with Flow("test-flow") as flow:
param = Parameter("user_input", default=None)
hw = hello_world(param)
And regarding looping, I’m not quite sure, but you may not need to do it sequentially if you redesign it a bit and leverage iterated mappingJulienKWM
10/29/2021, 10:18 AMAnna Geller
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def task_a():
return "A"
@task(log_stdout=True)
def task_b(param: str):
task_a_out = task_a.run()
result = f"B with param {param} and output of A: {task_a_out}"
print(result)
return result
with Flow("task-in-task") as flow:
user_input = Parameter("user_input", default="hi!")
hw = task_b(user_input)
But really, not everything needs to be an individual task. You could put your entire for loop into one larger task, it depends on how much granular visibility into your tasks you need.JulienKWM
10/29/2021, 11:35 AMAnna Geller