Amit Gal
12/09/2020, 3:53 PMmap
case
.
More in detail, I am converting into Prefect an existing process currently run locally through a simple CLI, to fetch and handle datasets from a remote server. One of the first things in this process is checking the dataset version, using check_version(id)
which returns a bool
. In the CLI this is then used in a simple logical block to determine how to continue. In the CLI a list of ids is handed and looped over, making that check for each element in the list (sequentially).
I want to convert this into Prefect and take advantage of the parallelisation opportunities. Therefore, I thought that case
and map
would work great here, something like this (which is not possible):
ids = [id1, id2, id3]
with Flow('Check version') as flow:
with case (check_version.map(id=ids), True):
val1 = action_if_true()
with case (check_version.map(id=ids), False):
val2 = action_if_false()
The issue here being that the output from check_version(id)
is mapped, and is therefore a list of bools - [True, True, False]
, which cannot be compared against a bool
. I found no option to map `case`.
I did however see the option to map `switch`, and I could imagine using that if I have to, but since it must take strings as input (it uses a dict
to get the different scenarios) it would require a conversion or changing the original functions in the CLI, which I would rather not do.
This is a pretty simple thing, and I imagine someone else has solved this before, but could not locate anything in the docs or other places. Or perhaps I am thinking about this in the wrong way. Any input is welcome, thanks!josh
12/09/2020, 4:03 PMapply_map
solve your use case? (bottom of the page here https://docs.prefect.io/api/latest/utilities/tasks.html#functions)Amit Gal
12/09/2020, 4:15 PMids
list is ran through twice - one time per case, correct?
My implementation would be this:
check_version_task = task(check_version)
@task
def action_if_true():
print("TRUE!")
@task
def action_if_false():
print("FALSE!")
def do_if_v4(id):
with case (check_version_task(id), True):
val1 = action_if_true()
with case (check_version_task(id), False):
val2 = action_if_false()
ids = [id1, id2, id3]
with Flow("Check Version") as flow:
apply_map(do_if_v4, ids)
state = flow.run()
josh
12/09/2020, 4:30 PMAmit Gal
12/09/2020, 4:45 PMdo_if_v4
function? As in:
def do_if_v4(id):
with case (check_version_task(id), True):
val1 = get_data_task()
val2 = do_something_task()
val3 = do_another_thing_task()
.
.
.
val4 = upload_somewher_task
with case (check_version_task(id), False):
same_things_here
...
...
...
with Flow('conditional flow') as flow:
apply_map(do_if_v4, ids)
I'm asking because that seems to be breaking from the regular way of writing each task as a line in the flow (which I really like)josh
12/09/2020, 4:46 PMAmit Gal
12/09/2020, 4:47 PMget_urls_task()
as I can't give it the parameter it needs - namely id1
, id2
or id3
, depending on which id got me into that branch. Is there a way to access that data within the v4_branch
context?
# Converting functions to tasks
check_v4_task = task(check_v4) # This requires an id as input
get_urls_task = task(get_urls) # This requires an id as input
ids = [id1, id2, id3]
# V4 Branch
@task
def v4_branch(odata_version="v4"):
urls = get_urls_task(id=id_that_got_us_to_this_branch) ## THIS THROWS AN ERROR, since id is not actually defined anywhere.
return urls
# V3 Branch
@task
def v3_branch(odata_version="v3"):
# similar_Things_here
go_to_v4 = {True: v4_branch, False: v3_branch}
with Flow("Statline-BQ") as flow:
branch = switch(check_v4_task.map(ids), go_to_v4, mapped=True)