Darshan
01/20/2021, 11:37 PM@task
def task_1(some_param):
// Do something
@task
def task_2():
// Do something
@task
def task_3(some_other_param):
// Do something
flow.set_dependencies(task = task_1)
flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
Zanie
keyword_tasks
e.g. flow.set_dependencies(task=task_1, keyword_tasks={"some_param": Parameter("example_param")})
bind
as shown in https://docs.prefect.io/core/getting_started/first-steps.html#imperative-apiParameter("example_param")
in my example can also be a task or a constant value ie 10
Darshan
01/20/2021, 11:44 PMZanie
Darshan
01/20/2021, 11:46 PMZanie
downstream_task.set_upstream(table_creation)
where neededfrom prefect import task, Flow
@task(log_stdout=True)
def display(value):
print(value)
@task
def create_table():
pass
@task
def insert_data():
pass
@task
def query_data():
return "fake-data-from-table"
with Flow("non-data-dependencies") as flow:
create = create_table()
insert = insert_data()
insert.set_upstream(create)
data = query_data()
data.set_upstream(insert)
display(data)
flow.run()
with Flow("non-data-dependencies") as flow:
create = create_table()
insert = insert_data().set_upstream(create)
data = query_data().set_upstream(insert)
display(data)
Darshan
01/21/2021, 12:15 AMfrom prefect import Flow, task
import prefect
@task(name="create db_table_1")
def task_1():
# Create db_table_1
print("Create db_table_1")
@task(name="create db_table_2 using db_table_1")
def task_2():
# Create db_table_2
print("Create db_table_2")
@task(name="create db_table_3 using db_table_2")
def task_3():
# Create db_table_3
print("Create db_table_3")
@task(name="create db_table_4 using db_table_2")
def task_4():
# Create db_table_4
print("Create db_table_3")
@task(name="create db_table_5 using db_table_3 and db_table_4")
def task_5():
# Create db_table_5
print("Create db_table_3")
@task(name="query data from db_table_5 and display")
def query_data():
# Query data and print
print("result of query")
with Flow('ETL') as flow:
task_2.set_upstream(task_1)
task_3.set_upstream(task_2)
task_4.set_upstream(task_2)
task_5.set_dependencies(upstream_tasks=[task_3, task_4])
query_data.set_upstream(task_5)
flow.run()
flow.visualize()