https://prefect.io logo
a

Amit Gal

12/09/2020, 3:53 PM
Hey all, New to the community and Prefect in general, but a bit in love with it already 🙂 I have a basic architectural question, and I hope this is a good place to ask it - otherwise, please let me know! In a nutshell, I am looking to
map
case
. More in detail, I am converting into Prefect an existing process currently run locally through a simple CLI, to fetch and handle datasets from a remote server. One of the first things in this process is checking the dataset version, using 
check_version(id)
 which returns a 
bool
. In the CLI this is then used in a simple logical block to determine how to continue. In the CLI a list of ids is handed and looped over, making that check for each element in the list (sequentially). I want to convert this into Prefect and take advantage of the parallelisation opportunities. Therefore, I thought that 
case
 and 
map
 would work great here, something like this (which is not possible):
Copy code
ids = [id1, id2, id3]
with Flow('Check version') as flow:
    with case (check_version.map(id=ids), True):
        val1 = action_if_true()
    with case (check_version.map(id=ids), False):
        val2 = action_if_false()
The issue here being that the output from 
check_version(id)
 is mapped, and is therefore a list of bools - 
[True, True, False]
, which cannot be compared against a 
bool
. I found no option to map `case`. I did however see the option to map `switch`, and I could imagine using that if I have to, but since it must take strings as input (it uses a
dict
to get the different scenarios) it would require a conversion or changing the original functions in the CLI, which I would rather not do. This is a pretty simple thing, and I imagine someone else has solved this before, but could not locate anything in the docs or other places. Or perhaps I am thinking about this in the wrong way. Any input is welcome, thanks!
j

josh

12/09/2020, 4:03 PM
Hey @Amit Gal would
apply_map
solve your use case? (bottom of the page here https://docs.prefect.io/api/latest/utilities/tasks.html#functions)
a

Amit Gal

12/09/2020, 4:15 PM
Indeed, I believe so, thanks for the prompt assist!
And just to make sure I got it right, this would mean the
ids
list is ran through twice - one time per case, correct? My implementation would be this:
Copy code
check_version_task = task(check_version)


@task
def action_if_true():
    print("TRUE!")


@task
def action_if_false():
    print("FALSE!")


def do_if_v4(id):
    with case (check_version_task(id), True):
        val1 = action_if_true()
    with case (check_version_task(id), False):
        val2 = action_if_false()

ids = [id1, id2, id3]

with Flow("Check Version") as flow:
    apply_map(do_if_v4, ids)

state = flow.run()
j

josh

12/09/2020, 4:30 PM
I believe so, yes
a

Amit Gal

12/09/2020, 4:45 PM
Great, thanks. That's not ideal, but not a big issue. And then all following tasks would be nested under the
do_if_v4
function? As in:
Copy code
def do_if_v4(id):
    with case (check_version_task(id), True):
        val1 = get_data_task()
        val2 = do_something_task()
        val3 = do_another_thing_task()
        .
        .
        .
        val4 = upload_somewher_task
    with case (check_version_task(id), False):
        same_things_here
        ...
        ...
        ...

with Flow('conditional flow') as flow:
    apply_map(do_if_v4, ids)
I'm asking because that seems to be breaking from the regular way of writing each task as a line in the flow (which I really like)
j

josh

12/09/2020, 4:46 PM
Yeah
a

Amit Gal

12/09/2020, 4:47 PM
Got it, thanks for your help!
Following up within the same scenario (I actually ended up using switch), I am having trouble accessing the currently-iterated-on id when inside a conditional block. Is that possible? Meaning with the following setup
get_urls_task()
as I can't give it the parameter it needs - namely
id1
,
id2
or
id3
, depending on which id got me into that branch. Is there a way to access that data within the
v4_branch
context?
Copy code
# Converting functions to tasks
check_v4_task = task(check_v4) # This requires an id as input
get_urls_task = task(get_urls) # This requires an id as input

ids = [id1, id2, id3]

# V4 Branch
@task
def v4_branch(odata_version="v4"):
    urls = get_urls_task(id=id_that_got_us_to_this_branch) ## THIS THROWS AN ERROR, since id is not actually defined anywhere.
    return urls


# V3 Branch
@task
def v3_branch(odata_version="v3"):
    # similar_Things_here

go_to_v4 = {True: v4_branch, False: v3_branch}

with Flow("Statline-BQ") as flow:
    branch = switch(check_v4_task.map(ids), go_to_v4, mapped=True)