https://prefect.io logo
k

Kevin Mullins

01/27/2022, 5:20 PM
Can tasks accept a Callable argument from another task? For example, I have an upstream task that needs to make a decision on whether to use
function a
in a downstream task or
function b
in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.
a

Anna Geller

01/27/2022, 5:28 PM
I’m not sure if this is possible, but you could solve it is by using the conditional case task.
Copy code
with case(cond, True):
        val = function_a_decorated_with_task()
        another_action(val)

    with case(cond, False):
        val = function_b_decorated_with_task()
        another_action(val)
👍 1
k

Kevin Mullins

01/27/2022, 5:30 PM
Thanks, I’ll check it out.
k

Kevin Kho

01/27/2022, 5:47 PM
I believe you can do this. It just needs to be serializable by cloudpickle for the most part. Do you run into an error?
k

Kevin Mullins

01/27/2022, 5:48 PM
I was trying to get a gut feel if it was a crazy idea or not before I went too far. I’m trying to build some re-usable tasks that depending on some configuration information from an upstream task may need to connect to different datasources. I was hoping to implement this by providing a different factory function than the default from one task to another.
I can give it a whirl and see what happens.
k

Kevin Kho

01/27/2022, 5:50 PM
I don’t think it’s crazy. My most recent demo does something similar . I instantiate these but you shouldn’t have to
k

Kevin Mullins

01/27/2022, 5:51 PM
That’s awesome, makes me think it will work. Thanks!
Seems to work just fine, cheesy example:
Copy code
from typing import Any, Callable
import prefect
from prefect import task, Flow


def string_factory_one() -> str:
    return "return from string factory one"

def string_factory_two() -> str:
    return "return from string factory two"


@task
def determine_string_factory(value: int) -> Callable[[], str]:
    if value <= 5:
        prefect.context.logger.info("using string factory one")
        return string_factory_one

    prefect.context.logger.info("using string factory two")
    return string_factory_two


@task
def use_string_factory(string_factory: Callable[[], str]) -> None:
    value = string_factory()
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"return from factory: {value}")

with Flow("test-callable-args") as flow:
    for i in range(10):
        factory = determine_string_factory(i)
        use_string_factory(factory)

if __name__ == "__main__":
    flow.run()
k

Kevin Kho

01/27/2022, 6:02 PM
exactly!
3 Views