Hello, I am trying to evaluate prefect for my work...
# prefect-community
d
Hello, I am trying to evaluate prefect for my work and trying to run some toy flows to understand various aspects of Prefect. I am struggling to find a good example of how to pass parameters to a task function using imperative API. Basically I am trying to do something like below, how can I pass parameter to task_1 and task_3 using imperative API ?
Copy code
@task
def task_1(some_param):
	// Do something

@task
def task_2():
	// Do something

@task
def task_3(some_other_param):
	// Do something


flow.set_dependencies(task = task_1)
flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
z
I wouldn’t recommend starting with the imperative API but here you want to pass
keyword_tasks
e.g.
flow.set_dependencies(task=task_1, keyword_tasks={"some_param": Parameter("example_param")})
Parameter("example_param")
in my example can also be a task or a constant value ie
10
d
Thanks @Zanie will check this.
z
I really would encourage you to use the functional API though — it’s much more intuitive. If you find yourself needing the imperative API for a specific thing then I’d switch to it in places 🤷 best of luck either way!
d
I understand functional API should be used as much as possible but one of the real life workflow we have has tasks which does not return any data back. for example, each task executes a SQL which creates a table in database and subsequent task uses the table cerated by previous task. I was struggling to make a flow with functional APIs where dependency of tasks needs to be maintained but tasks themselves does not return any direct results. Would be great to see if functional API can be used for this kind of workflow since as you said it is more intuitive for any developer.
z
I would just write it out functionally then use
downstream_task.set_upstream(table_creation)
where needed
Copy code
from prefect import task, Flow


@task(log_stdout=True)
def display(value):
    print(value)


@task
def create_table():
    pass


@task
def insert_data():
    pass


@task
def query_data():
    return "fake-data-from-table"


with Flow("non-data-dependencies") as flow:
    create = create_table()
    insert = insert_data()
    insert.set_upstream(create)
    data = query_data()
    data.set_upstream(insert)
    display(data)


flow.run()
Sorry that’s actually poorly written, it’ll run things more than once. Fixed now.
Slightly more succinct:
Copy code
with Flow("non-data-dependencies") as flow:
    create = create_table()
    insert = insert_data().set_upstream(create)
    data = query_data().set_upstream(insert)
    display(data)
d
Very helpful, thanks
So here is the viz of a toy workflow I was trying to implement
and here is the code I was able to come up with to design this.
Copy code
from prefect import Flow, task
import prefect

@task(name="create db_table_1")
def task_1():
    # Create db_table_1
    print("Create db_table_1")

@task(name="create db_table_2 using db_table_1")
def task_2():
    # Create db_table_2
    print("Create db_table_2")


@task(name="create db_table_3 using db_table_2")
def task_3():
    # Create db_table_3
    print("Create db_table_3")

@task(name="create db_table_4 using db_table_2")
def task_4():
    # Create db_table_4
    print("Create db_table_3")

@task(name="create db_table_5 using db_table_3 and db_table_4")
def task_5():
    # Create db_table_5
    print("Create db_table_3")


@task(name="query data from db_table_5 and display")
def query_data():
    # Query data and print
    print("result of query")

with Flow('ETL') as flow:

    task_2.set_upstream(task_1)
    task_3.set_upstream(task_2)
    task_4.set_upstream(task_2)
    
    task_5.set_dependencies(upstream_tasks=[task_3, task_4])
    
    query_data.set_upstream(task_5)

flow.run()
flow.visualize()