I am getting an unexpected error when trying to us...
# ask-community
j
I am getting an unexpected error when trying to use
apply_map
in conjunction with skipped tasks.
Copy code
At least one upstream state has an unmappable result.
👀 1
Minimal flow to reproduce
Copy code
from typing import Any

import pendulum
from pendulum import DateTime
from prefect import Flow, task, case, apply_map, Task


class PrettyPrint(Task):
    def run(
        self,
        key: str = None,
        value: Any = None,
    ) -> None:
        <http://self.logger.info|self.logger.info>(f"{key}: {value}")


pretty_print = PrettyPrint()
pretty_print_always_run = PrettyPrint(skip_on_upstream_skip=False)


@task
def day_of_week(simulated_day: str = None):
    if simulated_day:
        day = pendulum.parse(simulated_day)
        assert isinstance(day, DateTime)
        return day.day_of_week
    else:
        return pendulum.today().day_of_week


def backcast_on(date):
    with case(day_of_week(date), 6):
        saturday_result = pretty_print(key="This day is Saturday", value=date)
    pretty_print_always_run(
        key="Always Running Backcast on", value=date, upstream_tasks=[saturday_result]
    )


@task
def true_task():
    return True


with Flow(name="Backcast Flow") as backcast_flow:
    backcast_range = ["2021-04-03", "2021-04-04"]

    with case(true_task(), True):
        apply_map(backcast_on, backcast_range)

    with case(true_task(), False):
        apply_map(backcast_on, backcast_range)


if __name__ == "__main__":
    backcast_flow.register(project_name="sandbox")
Is there a proper way to conditionally skip everything in the apply_map based on the case?
k
Hi @Josh! I think the issue here is the dependency on an upstream task that’s in a case statement so it might not get executed.
pretty_print_always_run
sets
saturday_result
as an upstream task. Is that intended?
Case is also a task used for flow definitions so I think you want to use Python
if
and
else
in the
backcast_on
function.
Copy code
def backcast_on(date):
    if day_of_week(date) == 6:
        saturday_result = pretty_print(key="This day is Saturday", value=date)
        pretty_print_always_run(
            key="Always Running Backcast on", value=date, upstream_tasks=[saturday_result]
        )
    else:
        pretty_print_always_run(
            key="Always Running Backcast on", value=date,
        )
Maybe do something like this?
To your last question, have you seen SKIP signals before? Use them like:
Copy code
from prefect.engine import signals

if day_of_week(date) == 6:
    raise signals.SKIP()
else:
    do other stuff
SKIP
will skip all downstream tasks and is treated as a
SUCCESS
j
Thanks for looking into this. I want to retrain a model every Saturday and make daily forecasts based on that model. So on Saturday, I want to make sure that the forecasts run after the model is re-trained. For the rest of the week, I can skip the model training and just do forecasting.
On that note, I also want to know if it’s possible to order or somehow make the task runs dependent. If I backcast a forecast over a 1 week period, how will I guarantee the forecasting for April 4, 2021 runs after the training and forecasting for April 3rd (Saturday).
k
So for the retrain every Saturday, you can make a task to check for Saturday and return True/False. Then use the
case
statement to run everything below or not inside the Flow definition.
I suspect you want a Flow-of-flows type script where you kick off a defined Flow and pass in the date. That way, you can pass the dates sequentially.
Copy code
start = StartFlowRun(project_name="testing-result", wait = True)
with Flow('master-flow') as flow:

    # True or False
    skipped = check_skip()

    # Insert other flows here
    with case(skipped, False):
        for date in ["2021-04-03", "2021-04-04"]
            start(flow_name="flow", parameters={'date': date})
Does this make sense?
j
By using the for loop to kick off flows, will they be run in order? Does this mean it’s a sequential for loop?
k
Yes sequential loops are possible. Order is preserved. Use the
wait=True
of
StartFlowRun
to ensure that each flows ends before calling the next.
j
@Kevin Kho I just tried it with a toy example, and the order of the flows are not preserved
Copy code
dates = range(1, 10)

date_param = Parameter("date")


class DoSomething(Task):
    def run(self, value):
        sleep(1)
        self.logger.warning(value)


do_something = DoSomething()

with Flow("sub-flow") as sub_flow:
    do_something(date_param)

sub_flow_task = StartFlowRun(project_name="sandbox", flow_name="sub-flow", wait=True)

with Flow("schedule flow") as schedule_flow:
    for date in dates:
        sub_flow_task(parameters={"date": date})
Am I missing something? Also, if we need concurrent flows with
wait=True
, we’ll have to upgrade to the standard plan right?
k
I guess I was wrong. Sorry about that! Maybe you can try this to set the upstream tasks on the sub_flow_tasks so they run sequentially
Copy code
with Flow("schedule flow") as schedule_flow:
    tasks = [
        sub_flow_task(parameters={"date": date})
        for date in dates
    ]
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])
About the concurrent flows, do you know if you’re on a usage based plan or legacy plan?
This code works for me I confirmed.
j
I’m on legacy.
k
Will DM you
j
This only works if dates is a pre-defined iterable right? If it’s a Parameter or the result of a Task, it won’t be able to create the tasks. It can only create the tasks at the Flow creation time?
Is there a way to map over a set of Parameters dynamicaly? Ideally the user should be able to define
dates
when starting the schedule flow
k
For looping over dynamic lengths, we have Task looping . Have you seen this before?
j
Is there a way to have a Dynamic DAG of Task Looping over
StartFlowRun
?
k
This sounds a bit tricky. I’ll reevaluate your use case and try to make an example of this if it still makes sense. I’ll try making an example
j
I want a set of tasks to execute iteratively in a specific order. It’s easier to have a Flow represent a set of tasks than a task of tasks. I want to have them iterate in a specific order and Looping with Dynamic DAGs seem a good solution for that.
k
Gotcha. Yeah I think what it’ll look like is you pass a parameter in and have a Task get the length of it and then loop over that length value. Will try it out myself and get back. Sorry this has taken a bunch of detours.
j
I tried subclassing the
StartFlowRun
task but ran into a Pickling context object error. You can take a look at my code here. https://gist.github.com/wangjoshuah/6987991be27c98c14c3eca3e561c3b9f
It works when I comment out the section around
idempotency_key
, and I don’t understand what that is for 🤷‍♂️
k
The idempotency_key is for situations where you’re not sure a request will go through. Like for example, you hit the GraphQL API 3-4 times but you really only need the flow to run once. The key will make sure that the flow will only have run once in 24 hours.
An example is having a place with unstable internet do the request
I haven’t tried the looping over StartFlowRun myself so I’ll need to really try it in a bit.
I finally have a working example for you. This is
StartFlowRun
+
Task LOOP
Copy code
from prefect import Parameter, Flow, task, Task
import prefect
from prefect.tasks.prefect import StartFlowRun
from time import sleep
from prefect.engine.signals import LOOP, SUCCESS

class DoSomething(Task):
    def run(self, value):
        sleep(1)
        <http://self.logger.info|self.logger.info>(value)

do_something = DoSomething()

with Flow("sub-flow") as sub_flow:
    one_date = Parameter("one_date")
    do_something(one_date)

sub_flow.register("aws")

sub_flow_task = StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)

@task()
def loop_over_dates(dates):
    # Starting state
    loop_payload = prefect.context.get("task_loop_result", {"dates": dates})
    dates = loop_payload.get("dates", [])

    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(dates)

    one_date = dates[0]

    <http://logger.info|logger.info>(f"Checking {one_date}")
    
    try:
        sub_flow_task.run(parameters={"one_date": one_date})
    except SUCCESS:
        # Don't exit the loop on Flow Run success
        pass

    # Drop the first date
    dates.pop(0)

    if len(dates) == 0:
        return  # return statements end the loop
    raise LOOP(message=f"Processing {dates[0]}", result=dict(dates = dates))

with Flow("schedule flow") as schedule_flow:
    date_param = Parameter("dates", default=[1,2,3,4,5])
    loop_over_dates(date_param)

schedule_flow.register("aws")
Note the try-except. There is an issue with
StartFlowRun
exiting the loop when it succeeds. This will be fixed soon. The try-except will make this work for now. This preserves your sequential dependency as the loop gets processed in order.
j
I tried this and it loops over the dates in the
loop_over_dates
task, but only one run of the
sub-flow
Flow is ever created
I was able to get it working without the idempotency_key here. https://gist.github.com/wangjoshuah/0b84bce74253133193dc38a74a6c0e9f
k
Oh I see what you mean from earlier. My bad. The example will work if you pass a unique idempotency key to the
StartFlowRun.run()
call. Do you still want me to continue with the example?
j
I have never used idempotency keys before, so if you know how to do that, it would be amazing. If you think this is a good working example, I’ll submit it to the project as well to make it available for others to use.
k
Ok I’ll continue on with that. I think it’s just a one line change
👍 1
I confirmed this is working now. The subflows are being started. The idempotent run creation docs is very short if you’re interested.
Copy code
from prefect import Parameter, Flow, task, Task
import prefect
from prefect.tasks.prefect import StartFlowRun
from time import sleep
from prefect.engine.signals import LOOP, SUCCESS
import datetime

class DoSomething(Task):
    def run(self, value):
        sleep(1)
        <http://self.logger.info|self.logger.info>(value)

do_something = DoSomething()

with Flow("sub-flow") as sub_flow:
    one_date = Parameter("one_date")
    do_something(one_date)

sub_flow.register("aws")

sub_flow_task = StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)

@task()
def loop_over_dates(dates):
    # Starting state
    loop_payload = prefect.context.get("task_loop_result", {"dates": dates})
    dates = loop_payload.get("dates", [])

    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(dates)

    one_date = dates[0]

    <http://logger.info|logger.info>(f"Checking {one_date}")
    
    try:
        sub_flow_task.run(parameters={"one_date": one_date}, 
        idempotency_key=datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
    except SUCCESS:
        # Don't exit the loop on Flow Run success
        pass

    # Drop the first date
    dates.pop(0)

    if len(dates) == 0:
        return  # return statements end the loop
    raise LOOP(message=f"Processing {dates[0]}", result=dict(dates = dates))

with Flow("schedule flow") as schedule_flow:
    date_param = Parameter("dates", default=[1,2,3,4,5])
    loop_over_dates(date_param)

schedule_flow.register("aws")
Thanks for your patience!