Can tasks accept a Callable argument from another task? For example, I have an upstream task that needs to make a decision on whether to use
function a
in a downstream task or
function b
in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.
a
Anna Geller
01/27/2022, 5:28 PM
I’m not sure if this is possible, but you could solve it is by using the conditional case task.
Copy code
with case(cond, True):
val = function_a_decorated_with_task()
another_action(val)
with case(cond, False):
val = function_b_decorated_with_task()
another_action(val)
👍 1
k
Kevin Mullins
01/27/2022, 5:30 PM
Thanks, I’ll check it out.
k
Kevin Kho
01/27/2022, 5:47 PM
I believe you can do this. It just needs to be serializable by cloudpickle for the most part. Do you run into an error?
k
Kevin Mullins
01/27/2022, 5:48 PM
I was trying to get a gut feel if it was a crazy idea or not before I went too far.
I’m trying to build some re-usable tasks that depending on some configuration information from an upstream task may need to connect to different datasources.
I was hoping to implement this by providing a different factory function than the default from one task to another.
Kevin Mullins
01/27/2022, 5:50 PM
I can give it a whirl and see what happens.
k
Kevin Kho
01/27/2022, 5:50 PM
I don’t think it’s crazy. My most recent demo does something similar . I instantiate these but you shouldn’t have to
k
Kevin Mullins
01/27/2022, 5:51 PM
That’s awesome, makes me think it will work. Thanks!
Kevin Mullins
01/27/2022, 6:00 PM
Seems to work just fine, cheesy example:
Copy code
from typing import Any, Callable
import prefect
from prefect import task, Flow
def string_factory_one() -> str:
return "return from string factory one"
def string_factory_two() -> str:
return "return from string factory two"
@task
def determine_string_factory(value: int) -> Callable[[], str]:
if value <= 5:
prefect.context.logger.info("using string factory one")
return string_factory_one
prefect.context.logger.info("using string factory two")
return string_factory_two
@task
def use_string_factory(string_factory: Callable[[], str]) -> None:
value = string_factory()
<http://prefect.context.logger.info|prefect.context.logger.info>(f"return from factory: {value}")
with Flow("test-callable-args") as flow:
for i in range(10):
factory = determine_string_factory(i)
use_string_factory(factory)
if __name__ == "__main__":
flow.run()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.