Thread
#prefect-community
    j

    JulienKWM

    10 months ago
    Hello! I'm need to implement a workflow where each task must execute one after the other and use the result of the previous task The only way to do I have found so far is to it loop in the flow code, but this solution don't work with flow parameters
    @task()
    def sample_task(some):
        return some
    
    
    @task()
    def process(current, previous):
        pass
    
    
    with Flow("test") as flow:
        previous = None
        for n in range(180):
            item = sample_task(n)
            process(item, previous)
            previous = item
    Is there an other way to do this (inside a task maybe to handle flow parameters)? Thank you for your help !
    Anna Geller

    Anna Geller

    10 months ago
    @JulienKWM To manage data dependencies, including those coming from Parameter tasks, you just need to pass them to the tasks within the Flow constructor, e.g.:
    from prefect import Flow, Parameter, task
    
    
    @task(log_stdout=True)
    def hello_world(user_input: str):
        result = f"hello {user_input}"
        print(result)
        return result
    
    
    @task(log_stdout=True)
    def demo_data_dependency(input_: str, parameter: str):
        print(f"Got input: {input_} with parameter value: {parameter}")
    
    
    with Flow("basic-flow") as flow:
        param = Parameter("user_input", default="world")
        hw = hello_world(param)
        demo_data_dependency(hw, param)
    Does this answer your question?
    j

    JulienKWM

    10 months ago
    Thank you for your reply this issue is that I have to do this dynamically (a task will take flow input parameter and return a list of items to process)
    Anna Geller

    Anna Geller

    10 months ago
    this works exactly the same way. Did you hear about Mapping in Prefect? I think this might be what you’re looking for instead of a loop: https://docs.prefect.io/core/concepts/mapping.html
    @JulienKWM and if you really need a loop to get previous loop result in the current task, you may look at Looping: https://docs.prefect.io/core/advanced_tutorials/task-looping.html#dynamic-dags-task-looping
    j

    JulienKWM

    10 months ago
    Yes, I know about mapping but this do not solve my issue The loop is here specifically to set an upstream dependency on each task to enforce sequential execution In my case, I need to provide the previous task result to the next task
    @task()
    def process(current, previous):
        pass
    Anna Geller

    Anna Geller

    10 months ago
    If you need sequential, rather than parallel execution of the loop elements, then looping may be what you need.
    j

    JulienKWM

    10 months ago
    sorry I didn't see your last reply As I read it in the doc, the looping only allow to loop inside a task right? My example was maybe over simplified Here is a more complete example of what I want to do
    @task()
    def generate_hours(date: datetime):
        return {
            0: range(120),
            6: range(180),
            12: range(120),
            18: range(180)
        }[date.hour]
    
    
    @task()
    def sample_task_a(some):
        return some
    
    
    @task()
    def sample_task_b(some):
        return some
    
    
    @task()
    def sample_task_c(some):
        return some
    
    
    @task()
    def process(date: datetime, hour, current, previous):
        pass
    
    
    with Flow("test") as flow:
        target_date = DateTimeParameter('target_date')
        previous = None
        for h in generate_hours(target_date):
            item_a = sample_task_a(h)
            item_b = sample_task_b(item_a)
            item_c = sample_task_c(item_b)
            process(target_date, h, item_c, previous)
            previous = item_c
    I need to loop over block of multiple tasks
    Anna Geller

    Anna Geller

    10 months ago
    I see, thanks for sharing your use case. I think you can accomplish that in Prefect, but you may need to redesign how you do it a bit, e.g. you take a dynamic parameter of a target date. This will not work, unless you only trigger this task manually or via API by passing your desired parameter explicitly. Parameter defaults are evaluated at registration time, this means that if you schedule this job, it will not behave as you expect because: • you don’t have any default value and Prefect doesn’t know what value to use. But this would work: Parameter(‘target_date’, default=“2021-10-29”) • target_date would be static based on the value from registration so a better approach would be to move this date generation to a task and provide logic what to use by default if the parameter value is None, e.g.
    from prefect import Flow, Parameter, task
    import pendulum
    
    
    @task(log_stdout=True)
    def hello_world(user_input: str):
        if user_input is None:
            default_val = pendulum.now("Europe/Berlin").add(days=-1).to_date_string()
            print(f"Hello {default_val}")
        else:
            print(f"hello {user_input}")
    
    
    with Flow("test-flow") as flow:
        param = Parameter("user_input", default=None)
        hw = hello_world(param)
    And regarding looping, I’m not quite sure, but you may not need to do it sequentially if you redesign it a bit and leverage iterated mapping
    j

    JulienKWM

    10 months ago
    Thank you for the hint regarding parameter handling! The pattern I need seems to be hard to do without the ability to call tasks from other tasks, but I'll try to find an alternative Do you know if Orion release will allow to call tasks in tasks?
    Anna Geller

    Anna Geller

    10 months ago
    In Orion, your for-loop would work out-of-the-box. Well, maybe with some tiny adjustments to wait for task’s completion and to access the future’s result. If you need to call one task from another task, this is possible, even though it may backfire if you don’t use it properly - you just need to call .run() method of another task.
    from prefect import task, Flow, Parameter
    
    
    @task(log_stdout=True)
    def task_a():
        return "A"
    
    
    @task(log_stdout=True)
    def task_b(param: str):
        task_a_out = task_a.run()
        result = f"B with param {param} and output of A: {task_a_out}"
        print(result)
        return result
    
    
    with Flow("task-in-task") as flow:
        user_input = Parameter("user_input", default="hi!")
        hw = task_b(user_input)
    But really, not everything needs to be an individual task. You could put your entire for loop into one larger task, it depends on how much granular visibility into your tasks you need.
    j

    JulienKWM

    10 months ago
    Ok, thank you very much ! So Orion will be more appropriate for this kind of workflow I will give it a try 🙂
    Anna Geller

    Anna Geller

    10 months ago
    yes, but note that it’s released as a technical preview - it’s not production ready yet. So if you want to deploy your flow to production, you should use Prefect Core rather than Orion.