Philip MacMenamin
09/26/2022, 3:19 PM@task(name="Task A")
def task_a(val):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occurred.{val}")
@task
def send_log(val):
<http://logger.info|logger.info>(f"hello {val}")
l = [1,2,3,4]
with Flow("Trigger example") as flow:
ta_s = task_a.map(val=l)
sl_s = send_log.map(val=l, upstream_tasks=[ta_s])
# flow run should be success if ANY of task_a's succeeded.
flow.set_reference_tasks([something])
Mason Menges
09/26/2022, 3:29 PMPhilip MacMenamin
09/26/2022, 5:52 PMimport random
from typing import Set, Optional
from prefect.engine.state import State
import prefect
from prefect.engine import state
from prefect.triggers import all_successful, all_failed, any_failed, always_run, any_successful
from prefect import task, Flow
logger = prefect.context.get("logger")
@task(name="Task A")
def task_a(val):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occured.{val}")
@task(name="Task B", trigger=any_successful)
def cleanup_workdir(val):
<http://logger.info|logger.info>(f"someghing worked, {val}")
def custom_terminal_state_handler(
flow: Flow,
state: State,
reference_task_states: Set[State],
) -> Optional[State]:
success = False
# iterate through reference task states looking for failures
for task_state in reference_task_states:
if not task_state.is_failed():
success = True
# update the terminal state of the Flow and return
if not success:
state.message = "Some important tasks have failed"
return state
l = [1,2,3,4]
with Flow("Trigger example", terminal_state_handler=custom_terminal_state_handler) as flow:
ta_s = task_a.map(val=l)
cleanups = cleanup_workdir.map(val=l, upstream_tasks=[ta_s])
# flow.set_reference_tasks([ps])
flow.run()
Mason Menges
09/26/2022, 6:09 PMPhilip MacMenamin
09/26/2022, 6:11 PMMason Menges
09/26/2022, 7:06 PMimport random
from typing import Set
import prefect
from prefect.triggers import any_successful
from prefect import task, Flow
logger = prefect.context.get("logger")
@task(name="Task A")
def task_a(val):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occured.{val}")
@task(name="Task B", trigger=any_successful)
def cleanup_workdir():
<http://logger.info|logger.info>(f"someghing worked")
with Flow("Trigger example") as flow:
l = [1,2,3,4]
ta_s = task_a.map(val=l)
cleanups = cleanup_workdir(upstream_tasks=[ta_s])
flow.set_reference_tasks([cleanups])
flow.run()
Philip MacMenamin
09/26/2022, 7:12 PMimport random
from typing import Set
import prefect
from prefect.triggers import any_successful
from prefect import task, Flow
logger = prefect.context.get("logger")
@task(name="Task A")
def task_a(val):
# if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occured.{val}")
@task(name="Task B", trigger=any_successful)
def cleanup_workdir(val):
<http://logger.info|logger.info>(f"someghing worked {val}")
with Flow("Trigger example") as flow:
l = [1,2,3,4]
ta_s = task_a.map(val=l)
cleanups = cleanup_workdir.map(val=l, upstream_tasks=[ta_s])
flow.set_reference_tasks([cleanups])
flow.run()
Mason Menges
09/26/2022, 7:17 PMPhilip MacMenamin
09/26/2022, 7:38 PMMason Menges
09/26/2022, 7:49 PMfrom prefect.engine.state import State, Success, Failed
state = Success(message="Successful", context=flow.context)
though this could lead to some unintended consequences so I'd generally suggest the former for most use casesPhilip MacMenamin
09/26/2022, 7:51 PMMason Menges
09/26/2022, 7:59 PMPhilip MacMenamin
09/26/2022, 8:08 PMAttributeError: 'Flow' object has no attribute 'context'
import random
from typing import Set, Optional
from prefect.engine.state import State
import prefect
from prefect.engine.state import State, Success, Failed
from prefect.triggers import all_successful, all_failed, any_failed, always_run, any_successful
from prefect import task, Flow
logger = prefect.context.get("logger")
@task(name="Task A")
def task_a(val):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occured.{val}")
@task(name="Task B", trigger=any_successful)
def cleanup_workdir(val):
<http://logger.info|logger.info>(f"someghing worked, {val}")
def custom_terminal_state_handler(
flow: Flow,
state: State,
reference_task_states: Set[State],
) -> Optional[State]:
success = False
# iterate through reference task states looking for failures
for task_state in reference_task_states:
if not task_state.is_failed():
success = True
# update the terminal state of the Flow and return
if not success:
state.message = "Some important tasks have failed"
state = Success(message="Successful")
else:
state = Failed(message="fail")
# s = prefect.engine.state.Success()
return state
l = [1,2,3,4]
with Flow("Trigger example", terminal_state_handler=custom_terminal_state_handler) as flow:
ta_s = task_a.map(val=l)
cleanups = cleanup_workdir.map(val=l, upstream_tasks=[ta_s])
# flow.set_reference_tasks([ps])
flow.run()
2022-09-26 21:29:16+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Traceback (most recent call last):
File "map_fail_trigger.py", line 49, in <module>
flow.run()
File "/home/macmenaminpe/code/venv_test_d/image_portal_workflows/qa/lib/python3.8/site-packages/prefect/core/flow.py", line 1302, in run
if task not in state.result:
TypeError: argument of type 'NoneType' is not iterable
Mason Menges
09/26/2022, 10:49 PM