https://prefect.io logo
Title
j

Jon

11/01/2022, 8:31 PM
can you pass a task as a flow parameter in prefect 1?
1
m

Mason Menges

11/01/2022, 8:33 PM
Not really as Parameters in prefect 1 are tasks themselves so this would be calling a task from within another task which isn't supported, What are you trying to solve for in this case by doing so?
j

Jon

11/01/2022, 8:33 PM
i have a flow that has a ton of boilerplate, with just one task that is different depending on the pipeline we're running
task_a = always_the_same_a()
task_b = always_the_same_b(task_a)
task_c = something_new_each_time(task_b)
task_d = always_the_same_c(task_c)
m

Mason Menges

11/01/2022, 8:39 PM
you could add some conditional logic to the new task c to try and account for that, it's difficult to do this in the flow itself for prefect 1 but if that specific task is going to be doing different things depending on the pipeline then it might make sense to either expand logic in the task to account for those scenarios or to break out that task into a separate flow that task d can then reference depending on where it's output is stored.
Though thinking on it more a separate flow is probably a bad thought here so you can ignore that haha, The other option would be to write a unique task for these specific scenarios under each flow.
j

Jon

11/01/2022, 8:41 PM
if i were to break it out into a separate flow, don't we have the same issue?
hmph, yeah i really just want the ability to have a config that's passed to our flows which calls specific tasks
we have this pattern for a bunch of flows, where there's just one task that needs to change
so one option is to copy and paste each file and just change that one line, but that feels insane to me
m

Mason Menges

11/01/2022, 9:01 PM
For sure, it can kinda depend on the use case but if that one task has different behavior depending on the pipeline it's running you'd either need to do that or you'd need to account for those conditions within the task itself at least to the best of my knowledge here anyway. you might also be able to leverage the kv store or to store some variable that you set at the beginning of the flow which the task can then reference to determine the behavior of the task, though that still entails modifying the task behavior to account for this. or you could leverage the flow run context to reference based on the current flow run https://docs-v1.prefect.io/core/concepts/execution.html#adding-context-globally https://docs-v1.prefect.io/orchestration/concepts/kv_store.html#ui
j

Jon

11/01/2022, 9:02 PM
@prefect.task
def extract_task(
    extract_module_path: str,
    extract_function_name: str,
    credentials: str,
    bronze_file_downloads_dir: str,
) -> list[str]:

    extract_function = getattr(
        importlib.import_module(extract_module_path), extract_function_name
    )

    return extract_function(credentials, bronze_file_downloads_dir)
i can dynamically import the logic i want to use
not ideal but works
lmk if i need to clarify anything from the snippet i sent
m

Mason Menges

11/01/2022, 9:04 PM
Brilliant 😄 yeah prefect 2 is better equipped to handle this kind of dynamic task setup most workarounds for prefect 1 involve a bit more effort
j

Jon

11/01/2022, 9:07 PM
thx. yeah we're doing flows of flows and will hopefully move to prefect 2 at some point
looks way more approachable