Harry Baker
08/04/2021, 5:58 PMKevin Kho
Kevin Kho
with Flow("conditional-branches") as flow:
cond = check_condition()
with case(cond, True):
val1 = action_if_true()
with case(cond, False):
val2 = action_if_false()
val = merge(val1, val2)
another_action(val)
Harry Baker
08/04/2021, 6:08 PMKevin Kho
Harry Baker
08/04/2021, 6:21 PMdf_list = []
for roster in roster_list:
r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])
rf = pull_roster_fail(roster[0], upstream_tasks=[r])
rs = pull_roster_succeed(r)
rc = COALESCE(rs, rf)
df_list.append(rc)
i basically want to do the equivalent of a SQL COALESCE statement there, to either append the output of the success task or the failure task when relevent. Do i need to do that appending within a task as well to have the logic work, or is there a more streamlined pattern?Harry Baker
08/04/2021, 6:27 PMdf_list = []
for roster in roster_list:
r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])
with case(r.state, SUCCESS):
v1 = r.output
with case(r, FAILURE):
v2 = failure_task()
fr = merge(v1, v2)
df_list.append(fr)
Kevin Kho
roster_list
defined ahead of time or is it a task output?Harry Baker
08/04/2021, 6:32 PMKevin Kho
apply_map
because that append for work.Harry Baker
08/04/2021, 6:33 PMHarry Baker
08/04/2021, 6:34 PMKevin Kho
Kevin Kho
from prefect import task, Flow, case, apply_map
from prefect.tasks.control_flow import merge
import prefect
@task
def pull_roster(sheets, r1, r2, r3, r4):
if r1 == "c":
return "" # this is my fail
return sheets + r1 + r2 + r3 + r4 # this is success
@task
def pull_roster_succeed(r):
return r
@task
def pull_roster_fail(r):
return 123
sheets = "test"
roster_list = [["a","b","c","d"],["b","c","d","e"],["c","d","e","f"]]
def run_one_roster(roster):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Processing")
<http://logger.info|logger.info>(roster)
r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])
success = task(lambda x: len(x) > 1)(r)
with case(success, True):
<http://logger.info|logger.info>(f"Entered success for {r}")
rs = pull_roster_succeed(r)
with case(success, False):
rf = pull_roster_fail(roster[0])
return merge(rs, rf)
@task
def log_output(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
with Flow("a") as flow:
test = apply_map(run_one_roster, roster_list)
log_output(test)
flow.run()
Harry Baker
08/04/2021, 7:16 PMsuccess = task(lambda x: len(x) > 1)(r)
and how it relates to prefect tasks+flows?Kevin Kho
@task
decorator is just Python for task(fn)
so I am using this shorter version to do it manually with task(fn)(run_the_new_function_on_this_input)
Harry Baker
08/04/2021, 8:16 PMKevin Kho
from prefect import unmapped
and unmapped(other_variable)
when you pass it in. You can do apply_map(fn , arg1, unmapped(arg2))
where arg1 is the list and arg2 is a constant across the mapped task. Similarly for mapped tasks, you can do task.map(arg1, unmapped(arg2))
.Harry Baker
08/04/2021, 8:23 PMHarry Baker
08/04/2021, 8:37 PMKevin Kho
task(fn, name= …, trigger = …)(x)
I think this is what you mean right? The decorator just does the same thing under the hood.Harry Baker
08/04/2021, 9:04 PM@task(trigger=all_finished)
def check_pull_roster(x):
return isinstance(x, pd.DataFrame)
@task()
def pull_roster_success(df):
return df
@task()
def pull_roster_fail(cbo_name):
logger = prefect.context.get("logger")
logger.warning("Failed to import roster for {}".format(cbo_name))
data = [['tom', 'fake'], ['nick', 'fake'], ['juli', 'fake']]
df = pd.DataFrame(data, columns=['EMAIL', 'CBO'])
return df
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def pull_roster(sheets, cbo_name,spreadsheet_id, worksheet, email_index):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Pulling Roster for {}".format(cbo_name))
time.sleep(5)
df = sheets.get_worksheet(spreadsheet_id, worksheet).to_dataframe()
email_df = df.iloc[:,email_index].to_frame()
email_df.columns.values[0] = 'EMAIL'
email_df['CBO'] = cbo_name
return email_df
def pull_one_roster(roster, sheets):
r = pull_roster(sheets, roster[0], roster[1], roster[2], roster[3])
success = check_pull_roster(r)
with case(success, True):
rs = pull_roster_success(r)
with case(success, False):
rf = pull_roster_fail(roster[0])
return merge(rs, rf)
with Flow('import_roster') as flow:
df_list = apply_map(pull_one_roster, roster=roster_list, sheets=unmapped(sheets))
Kevin Kho