Can tasks accept a Callable argument from another ...
# ask-community
k
Can tasks accept a Callable argument from another task? For example, I have an upstream task that needs to make a decision on whether to use
function a
in a downstream task or
function b
in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.
a
I’m not sure if this is possible, but you could solve it is by using the conditional case task.
Copy code
with case(cond, True):
        val = function_a_decorated_with_task()
        another_action(val)

    with case(cond, False):
        val = function_b_decorated_with_task()
        another_action(val)
👍 1
k
Thanks, I’ll check it out.
k
I believe you can do this. It just needs to be serializable by cloudpickle for the most part. Do you run into an error?
k
I was trying to get a gut feel if it was a crazy idea or not before I went too far. I’m trying to build some re-usable tasks that depending on some configuration information from an upstream task may need to connect to different datasources. I was hoping to implement this by providing a different factory function than the default from one task to another.
I can give it a whirl and see what happens.
k
I don’t think it’s crazy. My most recent demo does something similar . I instantiate these but you shouldn’t have to
k
That’s awesome, makes me think it will work. Thanks!
Seems to work just fine, cheesy example:
Copy code
from typing import Any, Callable
import prefect
from prefect import task, Flow


def string_factory_one() -> str:
    return "return from string factory one"

def string_factory_two() -> str:
    return "return from string factory two"


@task
def determine_string_factory(value: int) -> Callable[[], str]:
    if value <= 5:
        prefect.context.logger.info("using string factory one")
        return string_factory_one

    prefect.context.logger.info("using string factory two")
    return string_factory_two


@task
def use_string_factory(string_factory: Callable[[], str]) -> None:
    value = string_factory()
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"return from factory: {value}")

with Flow("test-callable-args") as flow:
    for i in range(10):
        factory = determine_string_factory(i)
        use_string_factory(factory)

if __name__ == "__main__":
    flow.run()
k
exactly!