Hui Zheng
10/27/2020, 10:44 PMtask_A
, which returns a list_A
.
I want to create one task_B_i
for each item i in list_A
. However, there is a catch. those task_Bs needs to be executed in sequence, that is, task_B_0
execute first, and then task_B_1
follows, and then task_B_2
follows. If a task_B_i failed, all the subsequence task_Bs shall be skipped. Because those task_Bs
can NOT be executed in parallel, so it seems I can not use map()
task_B_i
returns an output (which is a string), item_b
. So, after all task_Bs executes, I have a list_B
of task_B_i_output.
I then need to have a task_c
, which takes list_B
, the results of all task_Bs, to carry some work. which is like a reduce operation of the upstream task_Bs.
I tried something like below but it fails obviously, and I have no better ideas for now.
with Flow("dynamic tasks in sequence")
list_A = task_A()
for item_a in list_A:
item_b = task_B(item_a)
it failed with error message
File "/app/flow.py", line 736, in <module>
for item_a in list_A:
TypeError: 'FunctionTask' object is not iterable
Chris White
task_A
returns will be? If so, you can:
with Flow("dynamic tasks in sequence")
list_A = task_A()
item = None
for idx in range(expected_length):
new_task = task_B(list_A[idx])
if item:
new_task.set_upstream(item)
item = new_task
Hui Zheng
10/27/2020, 10:51 PMlist_A
has less items than expected_length
? Could task_B(list_A[idx])
failed due to out-of-index?Chris White
None
as a padding value? Then task_B
could check if the provided value is None
and SKIP
if so.
I’ll keep thinking whether there’s a more elegant way but this is effectively seeking to create dynamic edges which isn’t something Prefect supports (yet 😉 )Hui Zheng
10/27/2020, 10:54 PMNone
as a padding value.Chris White
with Flow("dynamic tasks in sequence")
list_A = task_A()
b_items = []
for idx in range(expected_length):
new_task = task_B(list_A[idx])
if b_items:
new_task.set_upstream(b_items[-1])
b_items.append(new_task)
# pass all outputs of B to C
c_result = task_C(b_items)
(sorry the code is a little messy)Hui Zheng
10/27/2020, 11:01 PMChris White
Hui Zheng
10/28/2020, 11:17 PMGetItem
and task_B
are ran because iteration through the list up to the expected_length
. It’s not pretty but it works.task_B
in this case actually is DbtShellTask
, which requires the list_A[idx]
to be valid for its command
arg . so I couldn’t use it directly. One way to make `DbtShellTask`to check the provided command value is None
and SKIP
is to makes a subclass to handle that. Do you think there is a better way doing it?Chris White
DbtShellTask
, I think that subclassing it is the fastest / cleanest approach 👍; and there isn’t a meaningful concept of running a task within a task, but you could always do:
@task
def my_task():
# some code
@task
def some_other_task():
output = my_task.run() # call the run method to run the task logic with no prefect governance