Robin
11/26/2020, 5:06 PMtasks_to_be_executed = Parameter("tasks_to_be_executed", default=["task_a", "task_c"])
The following error is returned when checking `if "get_number_of_all_systems" in tasks_to_be_executed`:
Traceback (most recent call last):
File ".\awesome_flow.py", line 112, in <module>
if "get_number_of_all_systems" in tasks_to_be_executed:
TypeError: argument of type 'Parameter' is not iterable
Chris White
Robin
11/26/2020, 6:23 PMChris White
Robin
11/26/2020, 6:26 PMeach task can check whether it’s own name is in the list
from prefect.engine.signals import SKIP
...
if task_name in tasks_to_be_executed:
raise SKIP
Chris White
SKIP()
, and as long as that code is run from within a task you should be good to go 👍Robin
11/26/2020, 6:28 PMimport prefect
from prefect import Flow, Parameter, task
from prefect.engine.signals import SKIP
@task(log_stdout=True)
def happy_thanksgiving(tasks_to_be_executed):
if not "happy_thanksgiving" in tasks_to_be_executed:
raise SKIP()
print("happy thanksgiving! :)")
@task(log_stdout=True)
def unhappy_thankgsgiving(tasks_to_be_executed):
if not "unhappy_thankgsgiving" in tasks_to_be_executed:
raise SKIP()
print("failed :(")
with Flow("deletme") as flow:
tasks_to_be_executed = Parameter(
"list_of_tasks", default=["extract", "load", "happy_thanksgiving"]
)
unhappy_thanksgiving(tasks_to_be_executed)
happy_thanksgiving(tasks_to_be_executed)
flow.run()
Output:
[2020-11-26 19:35:20+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'deletme'
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | Task 'list_of_tasks': Starting task run...
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | Task 'list_of_tasks': Finished task run for task with final state: 'Success'
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | Task 'happy_thanksgiving': Starting task run...
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | happy thanksgiving! :)
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | Task 'happy_thanksgiving': Finished task run for task with final state: 'Success'
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | Task 'unhappy_thanksgiving': Starting task run...
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | SKIP signal raised: SKIP(None)
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | Task 'unhappy_thanksgiving': Finished task run for task with final state: 'Skipped'
[2020-11-26 19:35:21+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Chris White
Robin
11/26/2020, 7:45 PMimport inspect
from prefect.engine.signals import SKIP
def _skip_task_if_not_to_be_executed(tasks_to_be_executed: list = []):
"""Skips task if task name is not in tasks_to_be_executed"""
# the task calls this function, so here we check for the caller name "[1][3]"
# see: <https://stackoverflow.com/a/5067654>
task_name = inspect.stack()[1][3]
if task_name not in tasks_to_be_executed:
raise SKIP()
Scott Moreland
11/27/2020, 12:45 PMRobin
11/27/2020, 3:07 PM