Edmondo Porcu

    Edmondo Porcu

    4 months ago
    I am struggling a little with Prefect programming patterns. Take this example from this doc: https://docs.prefect.io/core/advanced_tutorials/task-looping.html
    with Flow("mapped-fibonacci") as mapped_flow:
        ms = Parameter("ms")
        fib_nums = compute_large_fibonacci.map(ms)
    
    flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
    nice but what about non array parameters? Let's say I have two parameters, one that's an array, and the other which is a constant.
    with Flow("mapped-fibonacci") as mapped_flow:
        ms = Parameter("ms")
        ms2 = Parameter("ms2")
        fib_nums = compute_large_fibonacci.map(ms, ms2)
    
    flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
    Kevin Kho

    Kevin Kho

    4 months ago
    You can use unmapped for that
    Edmondo Porcu

    Edmondo Porcu

    4 months ago
    Awesome. Two more questions: • Does the functional API support expressing task dependencies without explicitly adding edges to the flow? • How would you code this: run ten tasks, and when completed, ten more tasks
    Kevin Kho

    Kevin Kho

    4 months ago
    For the first . For the second, you have to split into two different tasks and then use
    upstream_tasks=[unmapped(task_one)]
    to force all the task_one to complete before task_two
    Edmondo Porcu

    Edmondo Porcu

    4 months ago
    Nice for question 1. Is upstream_tasks a kwarg args that can be used with functions annotated with @task ? The second question, seems like task composition is a daunting topic when learning prefect
    Kevin Kho

    Kevin Kho

    4 months ago
    It is used during the Flow so it’s not attached to the
    @task
    . For the second, are the first ten tasks and second tasks different?
    Edmondo Porcu

    Edmondo Porcu

    4 months ago
    yes
    Still not obvious for question 1, if the task is a function, how you use upstream_tasks
    Kevin Kho

    Kevin Kho

    4 months ago
    It has to be in the flow block:
    with Flow("example") as flow:
        a = first_task()
        b = second_task()
        c = third_task(c_inputs, upstream_tasks=[a,b])
    Edmondo Porcu

    Edmondo Porcu

    4 months ago
    Would that work if third task is defined like that?
    @task def third_task(c): 
             pass
    What I mean, is the upstream tasks automatically managed and it works even when using decorators?
    Kevin Kho

    Kevin Kho

    4 months ago
    Yes exactly. Upstream/downstream are always defined in the Flow, not at the task level
    Edmondo Porcu

    Edmondo Porcu

    4 months ago
    @Kevin Kho can parameters come from env variables?
    Kevin Kho

    Kevin Kho

    4 months ago
    You can use
    EnvVarSecret
    instead?
    Or you can make a task to read the env variable. That’ll work
    Edmondo Porcu

    Edmondo Porcu

    4 months ago
    make sense, thank you
    Now however I am confusef. Parameters vs tasks. I obviously think of parameters, and parameters read from env variables.
    Kevin Kho

    Kevin Kho

    4 months ago
    A parameter is a special task that is injected at run time. Parameters don’t read from env variables, but you can based on the default:
    @task
    def get_val(x):
        if x == "test:
            return os.environ["something"]
        else:
            return "something_else"
    with Flow(...) as flow:
        x = Parameter("x", "test")
        get_val(x)