Scarlett King
08/03/2022, 1:18 PMTaylor Curran
08/03/2022, 2:19 PMScarlett King
08/03/2022, 4:33 PMfrom prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
@task
def inc(x):
return x + 1
@task
def negate(x):
return -x
@task
def is_even(x):
return x % 2 == 0
def inc_or_negate(x):
cond = is_even(x)
# If x is even, increment it
with case(cond, True):
res1 = inc(x)
# If x is odd, negate it
with case(cond, False):
res2 = negate(x)
return merge(res1, res2)
with Flow("apply-map example") as flow:
result = apply_map(inc_or_negate, range(4))
So if you have a simple flow like in the example above which I copied from the docs for Mapping, when I tried (below codes), it doesn't seem to work. How do I access the tasks that are part of the mapped flow?
assert cond in flow.tasks
Taylor Curran
08/03/2022, 6:47 PMAnna Geller
08/03/2022, 8:28 PMScarlett King
08/03/2022, 9:52 PMAnna Geller
08/04/2022, 9:48 AMScarlett King
08/04/2022, 10:20 AMdef full_flow(table):
e = extract(...)
x = export(...)
t = transform(...)
l = load(...)
with Flow("example") as flow:
list_of_tables = get_tables(...)
run = apply_map(full_flow, table=list_of_tables)
What I want to test in this flow is that I expect the task e and x to happen before t can happen. I understand that we can set upstream_task(), but I want to make sure the flow composition is as we expect especially when we have more complicated flow than this one. I'm having problems accessing the e, x, t, l tasks in flow.tasks. I can see when I look into flow.tasks, I can see all of the tasks there including list_of_tables
. However, if I try evaluate the following statements, only the first one return true
list_of_tables in flow.tasks
e in flow.tasks
x in flow.tasks
t in flow.tasks
l in flow.tasks
I think the issue is I can't access e, x, t, or l because they are part of a mapped flow even though I can see them in flow.tasks.Anna Geller
08/04/2022, 11:13 AMScarlett King
08/04/2022, 11:24 AM