https://prefect.io logo
r

Robin

11/26/2020, 5:06 PM
Dear prefect community, Is there an (elegant) solution to run only those tasks provided as a list in a (default) parameter? I am having problems applying basic list operations on a parameter in the flow context:
Copy code
tasks_to_be_executed = Parameter("tasks_to_be_executed", default=["task_a", "task_c"])
The following error is returned when checking `if "get_number_of_all_systems" in tasks_to_be_executed`:
Copy code
Traceback (most recent call last):
  File ".\awesome_flow.py", line 112, in <module>
    if "get_number_of_all_systems" in tasks_to_be_executed:
TypeError: argument of type 'Parameter' is not iterable
c

Chris White

11/26/2020, 6:09 PM
You’re running into a common issue, e.g., https://stackoverflow.com/questions/64155793/is-it-possible-to-loop-over-a-prefect-parameter/64158793#64158793 one way to achieve this is to provide this parameter value to all tasks, and each task can check whether it’s own name is in the list; if it’s not, you can raise a SKIP signal
👍 1
r

Robin

11/26/2020, 6:23 PM
Thanks a lot for the quick answer! Would those tasks still be logged on prefect cloud? And I guess for maps this would already skip the entire map as opposed to each mapped task within the map?
c

Chris White

11/26/2020, 6:24 PM
Yup they would still be logged and put into a Skipped state; and yes also for the mapped tasks, because each mapped task has the same task name
r

Robin

11/26/2020, 6:26 PM
OK! Is this what you meant with
each task can check whether it’s own name is in the list
Copy code
from prefect.engine.signals import SKIP

...

if task_name in tasks_to_be_executed:
    raise SKIP
c

Chris White

11/26/2020, 6:27 PM
you’ll want to initialize the SKIP signal
SKIP()
, and as long as that code is run from within a task you should be good to go 👍
r

Robin

11/26/2020, 6:28 PM
OK 🙂
It works, thanks a lot! 🎉 Code:
Copy code
import prefect
from prefect import Flow, Parameter, task

from prefect.engine.signals import SKIP


@task(log_stdout=True)
def happy_thanksgiving(tasks_to_be_executed):

    if not "happy_thanksgiving" in tasks_to_be_executed:
        raise SKIP()

    print("happy thanksgiving! :)")


@task(log_stdout=True)
def unhappy_thankgsgiving(tasks_to_be_executed):

    if not "unhappy_thankgsgiving" in tasks_to_be_executed:
        raise SKIP()

    print("failed :(")


with Flow("deletme") as flow:

    tasks_to_be_executed = Parameter(
        "list_of_tasks", default=["extract", "load", "happy_thanksgiving"]
    )

    unhappy_thanksgiving(tasks_to_be_executed)
    happy_thanksgiving(tasks_to_be_executed)


flow.run()
Output:
Copy code
[2020-11-26 19:35:20+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'deletme'
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | Task 'list_of_tasks': Starting task run...
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | Task 'list_of_tasks': Finished task run for task with final state: 'Success'
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | Task 'happy_thanksgiving': Starting task run...
[2020-11-26 19:35:20+0100] INFO - prefect.TaskRunner | happy thanksgiving! :)
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | Task 'happy_thanksgiving': Finished task run for task with final state: 'Success'      
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | Task 'unhappy_thanksgiving': Starting task run...
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | SKIP signal raised: SKIP(None)
[2020-11-26 19:35:21+0100] INFO - prefect.TaskRunner | Task 'unhappy_thanksgiving': Finished task run for task with final state: 'Skipped'    
[2020-11-26 19:35:21+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
👍 1
c

Chris White

11/26/2020, 6:40 PM
Haha I love it - glad you figured it out :D
🙏 1
🚀 1
r

Robin

11/26/2020, 7:45 PM
Maybe it is worth to put the following function in some prefect utils?
Copy code
import inspect
from prefect.engine.signals import SKIP

def _skip_task_if_not_to_be_executed(tasks_to_be_executed: list = []):
    """Skips task if task name is not in tasks_to_be_executed"""

    # the task calls this function, so here we check for the caller name "[1][3]"
    # see: <https://stackoverflow.com/a/5067654>
    task_name = inspect.stack()[1][3]

    if task_name not in tasks_to_be_executed:
        raise SKIP()
s

Scott Moreland

11/27/2020, 12:45 PM
Just wanted to echo that I had this exact same question. It seems to be a relatively common need when developing and debugging. Might be good to add the example to the docs. This discussion was helpful, thanks! Question, suppose I have subsequent tasks A -> B -> C, and each task checkpoints some data used as input by the next downstream task. How does this work if I skip A and C and run B? Will B use the checkpoint created by A?
r

Robin

11/27/2020, 3:07 PM
Thanks for appreciating it! I just created this related prefect issue to discuss whether and how to add information and/or functionality to prefect.
I guess for your second question, a new slack thread would be welcomed so as to make it more visible and traceable! 🙂
👍 1