Hello folks, I need some help with running a dynam...
# prefect-community
h
Hello folks, I need some help with running a dynamic set of tasks in sequence (not in parallel). In my flow, I have a
task_A
, which returns a
list_A
. I want to create one
task_B_i
for each
item i in list_A
. However, there is a catch. those task_Bs needs to be executed in sequence, that is,
task_B_0
execute first, and then
task_B_1
follows, and then
task_B_2
follows. If a task_B_i failed, all the subsequence task_Bs shall be skipped. Because those
task_Bs
can NOT be executed in parallel, so it seems I can not use
map()
task_B_i
returns an output (which is a string),
item_b
. So, after all task_Bs executes, I have a
list_B
of task_B_i_output. I then need to have a
task_c
, which takes
list_B
, the results of all task_Bs, to carry some work. which is like a reduce operation of the upstream task_Bs. I tried something like below but it fails obviously, and I have no better ideas for now.
Copy code
with Flow("dynamic tasks in sequence")
  list_A = task_A()
  for item_a in list_A:
    item_b = task_B(item_a)
it failed with error message
Copy code
File "/app/flow.py", line 736, in <module>
    for item_a in list_A:
TypeError: 'FunctionTask' object is not iterable
c
Hi Hui; do you know how long the list that
task_A
returns will be? If so, you can:
Copy code
with Flow("dynamic tasks in sequence")
  list_A = task_A()
  item = None
  for idx in range(expected_length):
    new_task = task_B(list_A[idx])
    if item:
        new_task.set_upstream(item)
    item = new_task
h
The list length would usually be less than 10, but it’s not determined until runtime
thank you for the example. what if the
list_A
has less items than
expected_length
? Could
task_B(list_A[idx])
failed due to out-of-index?
c
Yea it would fail in that case - could you pad the list so that it’s always a fixed length, and use
None
as a padding value? Then
task_B
could check if the provided value is
None
and
SKIP
if so. I’ll keep thinking whether there’s a more elegant way but this is effectively seeking to create dynamic edges which isn’t something Prefect supports (yet 😉 )
h
sure, yes, I could use
None
as a padding value.
What would you recommend to collect all outputs of task_Bs, and feed them into a downstream tasks like task_C?
c
For that you can do:
Copy code
with Flow("dynamic tasks in sequence")
  list_A = task_A()
  b_items = []
  for idx in range(expected_length):
    new_task = task_B(list_A[idx])
    if b_items:
        new_task.set_upstream(b_items[-1])
    b_items.append(new_task)

   # pass all outputs of B to C
   c_result = task_C(b_items)
(sorry the code is a little messy)
h
thank you, that’s great!
will give it a try!
c
👍 👍 anytime!
h
Hi Chris, I tried it out, Just to put some feedback here. First, many redundant Tasks
GetItem
and
task_B
are ran because iteration through the list up to the
expected_length
. It’s not pretty but it works.
👍 1
my
task_B
in this case actually is
DbtShellTask
, which requires the
list_A[idx]
to be valid for its
command
arg . so I couldn’t use it directly. One way to make `DbtShellTask`to check the provided command value is 
None
 and 
SKIP
is to makes a subclass to handle that. Do you think there is a better way doing it?
btw, could I run tasks inside a task?
c
for the
DbtShellTask
, I think that subclassing it is the fastest / cleanest approach 👍; and there isn’t a meaningful concept of running a task within a task, but you could always do:
Copy code
@task
def my_task():
    # some code

@task
def some_other_task():
    output = my_task.run() # call the run method to run the task logic with no prefect governance