Hello! I'm need to implement a workflow where each...
# ask-community
j
Hello! I'm need to implement a workflow where each task must execute one after the other and use the result of the previous task The only way to do I have found so far is to it loop in the flow code, but this solution don't work with flow parameters
Copy code
@task()
def sample_task(some):
    return some


@task()
def process(current, previous):
    pass


with Flow("test") as flow:
    previous = None
    for n in range(180):
        item = sample_task(n)
        process(item, previous)
        previous = item
Is there an other way to do this (inside a task maybe to handle flow parameters)? Thank you for your help !
a
@JulienKWM To manage data dependencies, including those coming from Parameter tasks, you just need to pass them to the tasks within the Flow constructor, e.g.:
Copy code
from prefect import Flow, Parameter, task


@task(log_stdout=True)
def hello_world(user_input: str):
    result = f"hello {user_input}"
    print(result)
    return result


@task(log_stdout=True)
def demo_data_dependency(input_: str, parameter: str):
    print(f"Got input: {input_} with parameter value: {parameter}")


with Flow("basic-flow") as flow:
    param = Parameter("user_input", default="world")
    hw = hello_world(param)
    demo_data_dependency(hw, param)
Does this answer your question?
j
Thank you for your reply this issue is that I have to do this dynamically (a task will take flow input parameter and return a list of items to process)
a
this works exactly the same way. Did you hear about Mapping in Prefect? I think this might be what you’re looking for instead of a loop: https://docs.prefect.io/core/concepts/mapping.html
@JulienKWM and if you really need a loop to get previous loop result in the current task, you may look at Looping: https://docs.prefect.io/core/advanced_tutorials/task-looping.html#dynamic-dags-task-looping
j
Yes, I know about mapping but this do not solve my issue The loop is here specifically to set an upstream dependency on each task to enforce sequential execution In my case, I need to provide the previous task result to the next task
Copy code
@task()
def process(current, previous):
    pass
a
If you need sequential, rather than parallel execution of the loop elements, then looping may be what you need.
j
sorry I didn't see your last reply As I read it in the doc, the looping only allow to loop inside a task right? My example was maybe over simplified Here is a more complete example of what I want to do
Copy code
@task()
def generate_hours(date: datetime):
    return {
        0: range(120),
        6: range(180),
        12: range(120),
        18: range(180)
    }[date.hour]


@task()
def sample_task_a(some):
    return some


@task()
def sample_task_b(some):
    return some


@task()
def sample_task_c(some):
    return some


@task()
def process(date: datetime, hour, current, previous):
    pass


with Flow("test") as flow:
    target_date = DateTimeParameter('target_date')
    previous = None
    for h in generate_hours(target_date):
        item_a = sample_task_a(h)
        item_b = sample_task_b(item_a)
        item_c = sample_task_c(item_b)
        process(target_date, h, item_c, previous)
        previous = item_c
I need to loop over block of multiple tasks
a
I see, thanks for sharing your use case. I think you can accomplish that in Prefect, but you may need to redesign how you do it a bit, e.g. you take a dynamic parameter of a target date. This will not work, unless you only trigger this task manually or via API by passing your desired parameter explicitly. Parameter defaults are evaluated at registration time, this means that if you schedule this job, it will not behave as you expect because: • you don’t have any default value and Prefect doesn’t know what value to use. But this would work: Parameter(‘target_date’, default=“2021-10-29”) • target_date would be static based on the value from registration so a better approach would be to move this date generation to a task and provide logic what to use by default if the parameter value is None, e.g.
Copy code
from prefect import Flow, Parameter, task
import pendulum


@task(log_stdout=True)
def hello_world(user_input: str):
    if user_input is None:
        default_val = pendulum.now("Europe/Berlin").add(days=-1).to_date_string()
        print(f"Hello {default_val}")
    else:
        print(f"hello {user_input}")


with Flow("test-flow") as flow:
    param = Parameter("user_input", default=None)
    hw = hello_world(param)
And regarding looping, I’m not quite sure, but you may not need to do it sequentially if you redesign it a bit and leverage iterated mapping
j
Thank you for the hint regarding parameter handling! The pattern I need seems to be hard to do without the ability to call tasks from other tasks, but I'll try to find an alternative Do you know if Orion release will allow to call tasks in tasks?
a
In Orion, your for-loop would work out-of-the-box. Well, maybe with some tiny adjustments to wait for task’s completion and to access the future’s result. If you need to call one task from another task, this is possible, even though it may backfire if you don’t use it properly - you just need to call .run() method of another task.
Copy code
from prefect import task, Flow, Parameter


@task(log_stdout=True)
def task_a():
    return "A"


@task(log_stdout=True)
def task_b(param: str):
    task_a_out = task_a.run()
    result = f"B with param {param} and output of A: {task_a_out}"
    print(result)
    return result


with Flow("task-in-task") as flow:
    user_input = Parameter("user_input", default="hi!")
    hw = task_b(user_input)
But really, not everything needs to be an individual task. You could put your entire for loop into one larger task, it depends on how much granular visibility into your tasks you need.
j
Ok, thank you very much ! So Orion will be more appropriate for this kind of workflow I will give it a try 🙂
a
yes, but note that it’s released as a technical preview - it’s not production ready yet. So if you want to deploy your flow to production, you should use Prefect Core rather than Orion.
👍 1