Scarlett King08/03/2022, 1:18 PM
Taylor Curran08/03/2022, 2:19 PM
Scarlett King08/03/2022, 4:33 PM
So if you have a simple flow like in the example above which I copied from the docs for Mapping, when I tried (below codes), it doesn't seem to work. How do I access the tasks that are part of the mapped flow?
from prefect import Flow, task, case, apply_map from prefect.tasks.control_flow import merge @task def inc(x): return x + 1 @task def negate(x): return -x @task def is_even(x): return x % 2 == 0 def inc_or_negate(x): cond = is_even(x) # If x is even, increment it with case(cond, True): res1 = inc(x) # If x is odd, negate it with case(cond, False): res2 = negate(x) return merge(res1, res2) with Flow("apply-map example") as flow: result = apply_map(inc_or_negate, range(4))
assert cond in flow.tasks
Taylor Curran08/03/2022, 6:47 PM
Scarlett King08/03/2022, 9:52 PM
Scarlett King08/04/2022, 10:20 AM
What I want to test in this flow is that I expect the task e and x to happen before t can happen. I understand that we can set upstream_task(), but I want to make sure the flow composition is as we expect especially when we have more complicated flow than this one. I'm having problems accessing the e, x, t, l tasks in flow.tasks. I can see when I look into flow.tasks, I can see all of the tasks there including
def full_flow(table): e = extract(...) x = export(...) t = transform(...) l = load(...) with Flow("example") as flow: list_of_tables = get_tables(...) run = apply_map(full_flow, table=list_of_tables)
. However, if I try evaluate the following statements, only the first one return true
I think the issue is I can't access e, x, t, or l because they are part of a mapped flow even though I can see them in flow.tasks.
list_of_tables in flow.tasks e in flow.tasks x in flow.tasks t in flow.tasks l in flow.tasks
Scarlett King08/04/2022, 11:24 AM