Is there a way to map sequential tasks? With a dep...
# ask-community
t
Is there a way to map sequential tasks? With a dependency? I.e. I want to create a series of tasks where task 1 returns a value and, depending on a case from that value, task 2 kicks off. And I want to do this for A, B, C, etc... I've got a setup that's almost doing this now, but when I try to call the other task in my first task it tells me the context is missing and suggests adding a run(), which doesn't help. Is there a restructure that would work here?
k
This might be better with apply_map
t
Ahh, that sounds like exactly what I need, thanks!
@Kevin Kho, I got the apply_map working, but when it runs the task diagrams are on top of one another (first pic) and the flow names seem to just list "Mapped Child X" in most cases (second pic). Is there a way for me to split the mapped flows apart in the diagram or to have the names be more descriptive? As you can see in the bottom two cases I have mapped names going to the sub-task called, but all the various Mapped Child names aren't particularly helpful. These correlate roughly to the inc_or_negate and case statements in your https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines example (alone with a create_flow_run and wait_for_flow_run in each mapped instance too); is there a way for me to give that more sensical names?
k
Have you seen the templating of task run names?
t
I have, and in the task runs at the bottom of my second picture I'm successfully using that! The difficulty is in all these auto-created mapped child runs; if one of them fails I have to dig into logs or other details to know what it was (and this flow will have a growing list of such mappings). In the example linked above the
inc_or_negate
function isn't actually a task, and I made mine mirroring that documentation, so while I can add templated task run names to my equivalents of the
inc
or
negate
functions, both the
inc_or_negate
function or the
case
,
create_flow_run
, and
wait_for_flow_run
functions in it all have these generic mapped names.
k
I think I need to play around with this for a bit. Might take some time to get back to you
t
No rush; this at least isn't a blocker to my progress, it would just be good to work out.
FYI, I just had my first instance of one of the mapped flows failing; it really is harder to track which went wrong without some better naming here.
k
Oh didn’t get a chance to test over the weekend. I will try in a bit.
So I have a suggestion that might help, you can template a name inside the Flow argument by using the
task_args
like this:
Copy code
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x):
    return x + 1

@task
def negate(x):
    return -x

@task()
def is_even(x):
    if x == 2:
        raise ValueError()
    return x % 2 == 0

def inc_or_negate(x):
    cond = is_even(x)
    # If x is even, increment it
    with case(cond, True):
        res1 = inc(x, task_args={"name":"test-{x}"})
    # If x is odd, negate it
    with case(cond, False):
        res2 = negate(x)
    return merge(res1, res2)

with Flow("apply-map example") as flow:
    result = apply_map(inc_or_negate, range(4))

flow.run()
This should at least help with create_flow_run and wait_for_flow_run. The case is special so it doesn’t work for that.
t
Hmm, that would help, but if I try that it doesn't seem to be working. I get instead the data type that I passed in, not the value, so it shows up as
<Task: Constant[list]>
instead of the name (I'm passing a list to the apply_map function). Is there a way to pull the value from it instead of the type?
k
I think you might be using an f-string? The interpolation should be on a normal string
t
Oooh, good point. I'll try that...
Hmm, if I do that then it looks like I need an f-string.
current_flow = create_flow_run(_flow_name_ = flow_name,_run_name_ = _f_'{datetime.now()} {flow_name} attempt.',_project_name_ = PROJECT_NAME,_task_args_={"name":"Retry: {flow_name}"})
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {flow_name}"})
in those cases flow_name is one of the mapped values that I pass in to my equivalent of the
inc_or_negate
function
k
The template has to be an input into the function. That’s how the interpolation happens in my example.
Also, I don’t think you want to f-string in the
create_flow_run
for the
run_name
, there is a chance that gets fixed to registration time
Maybe you can modify wait_for_flow_run by making a wrapper so you can pass in your name to the task call, and then you can use it in the template lol
t
Wait, I'm a bit confused, your example above was f-string format (though without the actual f) so isn't that what I should do? You had
task_args={"name":"test-{x}"}
which is pretty much the same as mine
_task_args_={"name":"Wait for Retry: {flow_name}"}
where
flow_name
is my equivalent of x (input to the function). Am I missing something/ When I try it that way I still don't get any value, it all just shows up as {flow_name}. Weird.
k
Your call was:
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {flow_name}"})
so {flow_name} is not the name of the input into the function. I think you need this to be:
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {current_flow}"})
Ah sorry it should be the keyword argument that is the input. So it should be:
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {flow_run_id}"})
Like in this doc
t
Hmm, I tried both those and it didn't work. The string just doesn't evaluate; I keep getting whatever value I put in there printed out. In the above example you pasted with the bigger code block, that works for you? With
task_args={"name":"test-{x}"}
? If so I must be doing something else different; the only difference I can see that should matter is that x is a list in my case, but everything I do seems to either simply use the variable name I put in it or give me back
<Task: Constant[list]>
k
You should be able to run it yep. It just uses
flow.run()
so it’s easy to test