Jacob Blanco
11/10/2020, 9:33 AMwith Flow("Update Data") as flow:
last_export = get_last_export("my_table")
update_from = Parameter("update_from", default=last_export)
insert_query = parse_insert_query(update_from=update_from)
run_execute_query(insert_query)
This doesn’t work but I hope it illustrates what I’m trying to do. Is there any way to do this without having to write an intermediate task to “merge” the two inputs? I realize this pattern is wasteful by running get_last_export
for no reason if parameter is provided.Avi A
11/10/2020, 9:49 AMJim Crist-Harif
11/10/2020, 3:05 PMfrom prefect import case, Flow, Parameter
from prefect.tasks.control_flow import merge
with Flow("example"):
update_from_input = Parameter("update_from", default=None)
with case(update_from_input, None):
last_export = get_last_export("my_table")
update_from = merge(last_export, update_from_input)
I think pushing the optional parameter check into get_last_export
might be cleaner though, but up to you.
from prefect import task, Flow, Parameter
@task
def maybe_get_last_export(update_from, table):
if update_from is not None:
return update_from
return get_last_export(table)
with Flow("example"):
update_from_input = Parameter("update_from", default=None)
update_from = maybe_get_last_export(update_from_input)
Jacob Blanco
11/11/2020, 1:59 AMget_last_export
, the Task definition is gonna get added to an internal Task library, and I think keeping the merge logic visible in the flow is better. Of course if this was going to be a functionaltask I’d definitely agree with you.