What would be the best structure for when I have a...
# ask-community
h
What would be the best structure for when I have a task that, on success should just return data normally that is appended to a list, but on failure should run another task that gets data from another source, and appends that instead? Can you use "with case" blocks on a tasks state, rather than on a tasks output? Basically, if the original task fails, I want to replace it's output with the output of another task/function instead, which is then appended the same way
k
Seems like you need a task to check on the return to see if it’s as expected and then return True or False and then you could use the logic described here . And then you can merge the branches.
This one specifically:
Copy code
with Flow("conditional-branches") as flow:
    cond = check_condition()

    with case(cond, True):
        val1 = action_if_true()

    with case(cond, False):
        val2 = action_if_false()

    val = merge(val1, val2)

    another_action(val)
h
so it would need to work like: 1. Attempt my original task 2. Have another task check whether that original task succeeded, returning True or False 3. Have a case block based on this true/false helper task to either set a variable with the original output if true, or to run my backup task if false 4. merge
k
Yes or could explore triggers and have two tasks. One would trigger on upstream fail, and one would trigger on success, but the merge would be better if the logic somehow connects together.
h
This is roughly what I'm trying to get
Copy code
df_list = []
for roster in roster_list:
    r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])

    rf = pull_roster_fail(roster[0], upstream_tasks=[r])
    rs = pull_roster_succeed(r)
    
    rc = COALESCE(rs, rf)
    
    df_list.append(rc)
i basically want to do the equivalent of a SQL COALESCE statement there, to either append the output of the success task or the failure task when relevent. Do i need to do that appending within a task as well to have the logic work, or is there a more streamlined pattern?
however i'm not sure if this is appropriate
Copy code
df_list = []
for roster in roster_list:
    r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])

    with case(r.state, SUCCESS):
       v1 = r.output

    with case(r, FAILURE):
       v2 = failure_task()

    fr = merge(v1, v2)
    df_list.append(fr)
k
Is
roster_list
defined ahead of time or is it a task output?
h
its defined ahead of time, its basically just a list of google sheets url information for importing into our pipeline
k
Ok I’ll make an example of how I think this will work. I think we need
apply_map
because that append for work.
h
yeah i figured i might need that. was trying to keep it simple with the iteration while i got more familiar with prefect, but can see how this pattern doesn't work
i didn't really want to parallize it in order to avoid hitting the google sheets API rate limit
k
You can map with LocalExecutor and it will run sequentially
I think this is what you want:
Copy code
from prefect import task, Flow, case, apply_map
from prefect.tasks.control_flow import merge
import prefect

@task
def pull_roster(sheets, r1, r2, r3, r4):
    if r1 == "c":
        return ""  # this is my fail
    return sheets + r1 + r2 + r3 + r4 # this is success

@task
def pull_roster_succeed(r):
    return r

@task
def pull_roster_fail(r):
    return 123

sheets = "test"
roster_list = [["a","b","c","d"],["b","c","d","e"],["c","d","e","f"]]

def run_one_roster(roster):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Processing")
    <http://logger.info|logger.info>(roster)
    r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])
    success = task(lambda x: len(x) > 1)(r)

    with case(success, True):
        <http://logger.info|logger.info>(f"Entered success for {r}")
        rs = pull_roster_succeed(r)
    with case(success, False):
        rf = pull_roster_fail(roster[0])
    return merge(rs, rf)

@task
def log_output(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)

with Flow("a") as flow:
    test = apply_map(run_one_roster, roster_list)
    log_output(test)
    
flow.run()
h
tyvm, this makes a lot of sense. I think i understand, but can you describe what's happening with
success = task(lambda x: len(x) > 1)(r)
and how it relates to prefect tasks+flows?
k
The
@task
decorator is just Python for
task(fn)
so I am using this shorter version to do it manually with
task(fn)(run_the_new_function_on_this_input)
h
gotcha. btw, is there any way to pass in another paramter to the function that is being apply_mapped? When I try to pass in the sheets result that was instantiated, it yells at me that "At least one upstream state has an unmappable result". the only way i was able to get around it was to drop in my instantiation tasks within "run_one_roster"
k
You use
from prefect import unmapped
and
unmapped(other_variable)
when you pass it in. You can do
apply_map(fn , arg1, unmapped(arg2))
where arg1 is the list and arg2 is a constant across the mapped task. Similarly for mapped tasks, you can do
task.map(arg1, unmapped(arg2))
.
h
ohhh thats what that meant, got it
how would i set the trigger for this lambda task?
k
task(fn, name= …, trigger = …)(x)
I think this is what you mean right? The decorator just does the same thing under the hood.
h
got it thank you! I finally was able to get it working. There's probably room for extensive refactoring but this at least seems to execute correctly:
Copy code
@task(trigger=all_finished)
def check_pull_roster(x):
    return isinstance(x, pd.DataFrame)


@task()
def pull_roster_success(df):
    return df


@task()
def pull_roster_fail(cbo_name):
    logger = prefect.context.get("logger")
    logger.warning("Failed to import roster for {}".format(cbo_name))

    data = [['tom', 'fake'], ['nick', 'fake'], ['juli', 'fake']]
    df = pd.DataFrame(data, columns=['EMAIL', 'CBO'])
    return df


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def pull_roster(sheets, cbo_name,spreadsheet_id, worksheet, email_index):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Pulling Roster for {}".format(cbo_name))
    time.sleep(5)
    df = sheets.get_worksheet(spreadsheet_id, worksheet).to_dataframe()
    email_df = df.iloc[:,email_index].to_frame()
    email_df.columns.values[0] = 'EMAIL'
    email_df['CBO'] = cbo_name
    return email_df


def pull_one_roster(roster, sheets):
    r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])
    success = check_pull_roster(r)

    with case(success, True):
        rs = pull_roster_success(r)
    with case(success, False):
        rf = pull_roster_fail(roster[0])
    return merge(rs, rf)


with Flow('import_roster') as flow:
    df_list = apply_map(pull_one_roster, roster=roster_list, sheets=unmapped(sheets))
k
Looks good to me! Glad to got it working. Should be sequential execution also. The append would not have worked in the last setup.