Josh
04/23/2021, 7:31 AMapply_map
in conjunction with skipped tasks.
At least one upstream state has an unmappable result.
Josh
04/23/2021, 7:36 AMfrom typing import Any
import pendulum
from pendulum import DateTime
from prefect import Flow, task, case, apply_map, Task
class PrettyPrint(Task):
def run(
self,
key: str = None,
value: Any = None,
) -> None:
<http://self.logger.info|self.logger.info>(f"{key}: {value}")
pretty_print = PrettyPrint()
pretty_print_always_run = PrettyPrint(skip_on_upstream_skip=False)
@task
def day_of_week(simulated_day: str = None):
if simulated_day:
day = pendulum.parse(simulated_day)
assert isinstance(day, DateTime)
return day.day_of_week
else:
return pendulum.today().day_of_week
def backcast_on(date):
with case(day_of_week(date), 6):
saturday_result = pretty_print(key="This day is Saturday", value=date)
pretty_print_always_run(
key="Always Running Backcast on", value=date, upstream_tasks=[saturday_result]
)
@task
def true_task():
return True
with Flow(name="Backcast Flow") as backcast_flow:
backcast_range = ["2021-04-03", "2021-04-04"]
with case(true_task(), True):
apply_map(backcast_on, backcast_range)
with case(true_task(), False):
apply_map(backcast_on, backcast_range)
if __name__ == "__main__":
backcast_flow.register(project_name="sandbox")
Josh
04/23/2021, 7:37 AMJosh
04/23/2021, 7:37 AMKevin Kho
pretty_print_always_run
sets saturday_result
as an upstream task. Is that intended?Kevin Kho
if
and else
in the backcast_on
function.Kevin Kho
def backcast_on(date):
if day_of_week(date) == 6:
saturday_result = pretty_print(key="This day is Saturday", value=date)
pretty_print_always_run(
key="Always Running Backcast on", value=date, upstream_tasks=[saturday_result]
)
else:
pretty_print_always_run(
key="Always Running Backcast on", value=date,
)
Kevin Kho
Kevin Kho
from prefect.engine import signals
if day_of_week(date) == 6:
raise signals.SKIP()
else:
do other stuff
Kevin Kho
SKIP
will skip all downstream tasks and is treated as a SUCCESS
Josh
04/23/2021, 3:50 PMJosh
04/23/2021, 3:51 PMKevin Kho
case
statement to run everything below or not inside the Flow definition.Kevin Kho
Kevin Kho
start = StartFlowRun(project_name="testing-result", wait = True)
with Flow('master-flow') as flow:
# True or False
skipped = check_skip()
# Insert other flows here
with case(skipped, False):
for date in ["2021-04-03", "2021-04-04"]
start(flow_name="flow", parameters={'date': date})
Kevin Kho
Josh
04/23/2021, 8:18 PMKevin Kho
wait=True
of StartFlowRun
to ensure that each flows ends before calling the next.Josh
04/27/2021, 3:41 AMdates = range(1, 10)
date_param = Parameter("date")
class DoSomething(Task):
def run(self, value):
sleep(1)
self.logger.warning(value)
do_something = DoSomething()
with Flow("sub-flow") as sub_flow:
do_something(date_param)
sub_flow_task = StartFlowRun(project_name="sandbox", flow_name="sub-flow", wait=True)
with Flow("schedule flow") as schedule_flow:
for date in dates:
sub_flow_task(parameters={"date": date})
Josh
04/27/2021, 3:43 AMwait=True
, we’ll have to upgrade to the standard plan right?Kevin Kho
Kevin Kho
with Flow("schedule flow") as schedule_flow:
tasks = [
sub_flow_task(parameters={"date": date})
for date in dates
]
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
Kevin Kho
Kevin Kho
Josh
04/27/2021, 2:13 PMKevin Kho
Josh
04/27/2021, 2:40 PMJosh
04/27/2021, 2:43 PMdates
when starting the schedule flowKevin Kho
Josh
04/27/2021, 4:32 PMStartFlowRun
?Kevin Kho
Josh
04/27/2021, 4:36 PMKevin Kho
Josh
04/27/2021, 4:45 PMStartFlowRun
task but ran into a Pickling context object error. You can take a look at my code here. https://gist.github.com/wangjoshuah/6987991be27c98c14c3eca3e561c3b9fJosh
04/27/2021, 4:45 PMidempotency_key
, and I don’t understand what that is for 🤷♂️Kevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
StartFlowRun
+ Task LOOP
from prefect import Parameter, Flow, task, Task
import prefect
from prefect.tasks.prefect import StartFlowRun
from time import sleep
from prefect.engine.signals import LOOP, SUCCESS
class DoSomething(Task):
def run(self, value):
sleep(1)
<http://self.logger.info|self.logger.info>(value)
do_something = DoSomething()
with Flow("sub-flow") as sub_flow:
one_date = Parameter("one_date")
do_something(one_date)
sub_flow.register("aws")
sub_flow_task = StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)
@task()
def loop_over_dates(dates):
# Starting state
loop_payload = prefect.context.get("task_loop_result", {"dates": dates})
dates = loop_payload.get("dates", [])
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(dates)
one_date = dates[0]
<http://logger.info|logger.info>(f"Checking {one_date}")
try:
sub_flow_task.run(parameters={"one_date": one_date})
except SUCCESS:
# Don't exit the loop on Flow Run success
pass
# Drop the first date
dates.pop(0)
if len(dates) == 0:
return # return statements end the loop
raise LOOP(message=f"Processing {dates[0]}", result=dict(dates = dates))
with Flow("schedule flow") as schedule_flow:
date_param = Parameter("dates", default=[1,2,3,4,5])
loop_over_dates(date_param)
schedule_flow.register("aws")
Kevin Kho
StartFlowRun
exiting the loop when it succeeds. This will be fixed soon. The try-except will make this work for now. This preserves your sequential dependency as the loop gets processed in order.Josh
04/27/2021, 9:53 PMloop_over_dates
task, but only one run of the sub-flow
Flow is ever createdJosh
04/27/2021, 10:28 PMKevin Kho
StartFlowRun.run()
call. Do you still want me to continue with the example?Josh
04/27/2021, 10:35 PMKevin Kho
Kevin Kho
from prefect import Parameter, Flow, task, Task
import prefect
from prefect.tasks.prefect import StartFlowRun
from time import sleep
from prefect.engine.signals import LOOP, SUCCESS
import datetime
class DoSomething(Task):
def run(self, value):
sleep(1)
<http://self.logger.info|self.logger.info>(value)
do_something = DoSomething()
with Flow("sub-flow") as sub_flow:
one_date = Parameter("one_date")
do_something(one_date)
sub_flow.register("aws")
sub_flow_task = StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)
@task()
def loop_over_dates(dates):
# Starting state
loop_payload = prefect.context.get("task_loop_result", {"dates": dates})
dates = loop_payload.get("dates", [])
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(dates)
one_date = dates[0]
<http://logger.info|logger.info>(f"Checking {one_date}")
try:
sub_flow_task.run(parameters={"one_date": one_date},
idempotency_key=datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
except SUCCESS:
# Don't exit the loop on Flow Run success
pass
# Drop the first date
dates.pop(0)
if len(dates) == 0:
return # return statements end the loop
raise LOOP(message=f"Processing {dates[0]}", result=dict(dates = dates))
with Flow("schedule flow") as schedule_flow:
date_param = Parameter("dates", default=[1,2,3,4,5])
loop_over_dates(date_param)
schedule_flow.register("aws")
Kevin Kho