Thread
#prefect-community
    d

    Darshan

    1 year ago
    Hello, I am trying to evaluate prefect for my work and trying to run some toy flows to understand various aspects of Prefect. I am struggling to find a good example of how to pass parameters to a task function using imperative API. Basically I am trying to do something like below, how can I pass parameter to task_1 and task_3 using imperative API ?
    @task
    def task_1(some_param):
    	// Do something
    
    @task
    def task_2():
    	// Do something
    
    @task
    def task_3(some_other_param):
    	// Do something
    
    
    flow.set_dependencies(task = task_1)
    flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
    flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
    Michael Adkins

    Michael Adkins

    1 year ago
    I wouldn’t recommend starting with the imperative API but here you want to pass
    keyword_tasks
    e.g.
    flow.set_dependencies(task=task_1, keyword_tasks={"some_param": Parameter("example_param")})
    Parameter("example_param")
    in my example can also be a task or a constant value ie
    10
    d

    Darshan

    1 year ago
    Thanks @Michael Adkins will check this.
    Michael Adkins

    Michael Adkins

    1 year ago
    I really would encourage you to use the functional API though — it’s much more intuitive. If you find yourself needing the imperative API for a specific thing then I’d switch to it in places 🤷 best of luck either way!
    d

    Darshan

    1 year ago
    I understand functional API should be used as much as possible but one of the real life workflow we have has tasks which does not return any data back. for example, each task executes a SQL which creates a table in database and subsequent task uses the table cerated by previous task. I was struggling to make a flow with functional APIs where dependency of tasks needs to be maintained but tasks themselves does not return any direct results. Would be great to see if functional API can be used for this kind of workflow since as you said it is more intuitive for any developer.
    Michael Adkins

    Michael Adkins

    1 year ago
    I would just write it out functionally then use
    downstream_task.set_upstream(table_creation)
    where needed
    from prefect import task, Flow
    
    
    @task(log_stdout=True)
    def display(value):
        print(value)
    
    
    @task
    def create_table():
        pass
    
    
    @task
    def insert_data():
        pass
    
    
    @task
    def query_data():
        return "fake-data-from-table"
    
    
    with Flow("non-data-dependencies") as flow:
        create = create_table()
        insert = insert_data()
        insert.set_upstream(create)
        data = query_data()
        data.set_upstream(insert)
        display(data)
    
    
    flow.run()
    Sorry that’s actually poorly written, it’ll run things more than once. Fixed now.
    Slightly more succinct:
    with Flow("non-data-dependencies") as flow:
        create = create_table()
        insert = insert_data().set_upstream(create)
        data = query_data().set_upstream(insert)
        display(data)
    d

    Darshan

    1 year ago
    Very helpful, thanks
    So here is the viz of a toy workflow I was trying to implement
    and here is the code I was able to come up with to design this.
    from prefect import Flow, task
    import prefect
    
    @task(name="create db_table_1")
    def task_1():
        # Create db_table_1
        print("Create db_table_1")
    
    @task(name="create db_table_2 using db_table_1")
    def task_2():
        # Create db_table_2
        print("Create db_table_2")
    
    
    @task(name="create db_table_3 using db_table_2")
    def task_3():
        # Create db_table_3
        print("Create db_table_3")
    
    @task(name="create db_table_4 using db_table_2")
    def task_4():
        # Create db_table_4
        print("Create db_table_3")
    
    @task(name="create db_table_5 using db_table_3 and db_table_4")
    def task_5():
        # Create db_table_5
        print("Create db_table_3")
    
    
    @task(name="query data from db_table_5 and display")
    def query_data():
        # Query data and print
        print("result of query")
    
    with Flow('ETL') as flow:
    
        task_2.set_upstream(task_1)
        task_3.set_upstream(task_2)
        task_4.set_upstream(task_2)
        
        task_5.set_dependencies(upstream_tasks=[task_3, task_4])
        
        query_data.set_upstream(task_5)
    
    flow.run()
    flow.visualize()