Hi, I’m trying to create a conditional Flow of Flo...
# ask-community
n
Hi, I’m trying to create a conditional Flow of Flows that runs a flow and then based on that flow’s final state either runs
flow_if_success
or
flow_if_failure
. Basically, I want to know how I can take the final state signal from
conditional-flow
and decide whether I want to run
flow_if_success
or
flow_if_failure
. I was having trouble matching the task signal with a value so that the case works properly. I was hoping you would be able to help me out. Would I need to use the
get_task_run_result
or is there a better way I can just grab the task result? Here is a snippet of my current test code:
Copy code
var1 = Parameter("var1", default = 4)
var2 = Parameter("var2", default = 12)

conditional_id = StartFlowRun(flow_name="Conditional-Flow", project_name="Test", wait=True)
flow_if_success_id = StartFlowRun(flow_name="Flow_if_success", project_name="Test", wait=True)
flow_if_failure_id = StartFlowRun(flow_name="Flow_if_failure", project_name="Test", wait=True)


# Idea: Conditional flow runs first. If it succeeds with Success signal, run flow_if_success, if it fails with failure signal, run flow_if_failure
with Flow("Conditional FoF") as parent_flow:
    conditional_run = conditional_id()

    with case (conditional_run, Success): #Conditional_run never matches Success even if successful task run
        flow_if_success_run = flow_if_success_id(parameters=dict(num=var1))
    with case (conditional_run, Failed): 
        flow_if_failure_run = flow_if_failure_id(parameters=dict(number=var2))
a
You would need to switch to create_flow_run + wait_for_flow_run and include a case task to implement this condition. Here is a bit different use case, but you may use a similar syntax and building blocks https://discourse.prefect.io/t/can-i-run-a-flow-of-flows-that-triggers-each-child-flow-on-a-different-machine/180
n
Alright I'll try this out. Thank you!
a
and if you have any questions about it, let us know, happy to provide more examples when needed
n
So I'm wondering is there a way to not have a case task in my parent flow and just have a way to match the cond_flow final signal to something. Basically the problem I get is that it skips the case because of the following output:
SKIP signal raised: SKIP('Provided value "FlowRunView(flow_run_id=\'d0bc26fe-9693-4890-832e-182d5035a90e\', name=\'Conditional-Flow_Run_2022-02-07 19:22:47.235833\', state=<Success: "All reference tasks succeeded.">, labels=<BoxList: []>, cached_task_runs=0)" did not match "<class \'prefect.engine.state.Success\'>"')
a
to get a state from a FlowRunView, you need to add
.state
Example:
Copy code
with Flow("parent_flow") as flow:
    infra = check_the_infrastructure()
    with case(infra, "Docker with label machine-1"):
        child_flow_run_id = create_flow_run(
            flow_name="child_flow_name",
            run_config=DockerRun(
                labels=["machine-1"]
            ),  # with a specific condition like image
        )
        child_flowrunview = wait_for_flow_run(
            child_flow_run_id, raise_final_state=True, stream_logs=True
        )
        child_flowrunview.state # state of the task you can use in some downstream task to determine what should happen next based on this task
n
I was trying that before but it gives me
AttributeError: 'FunctionTask' object has no attribute 'state'
error. Is there something I need to import to get that to work?
a
Can you share your flow?
n
Copy code
import prefect
from prefect import Flow, Parameter, case
from prefect.engine.state import Failed, Success
from prefect.storage import Docker
from prefect.tasks.prefect import RenameFlowRun, create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
import datetime

logger = prefect.context.get("logger")

var1 = Parameter("var1", default = 4)
var2 = Parameter("var2", default = 12)


def renameFlowRun(obj, new_state, old_state):
    d = datetime.datetime.now() #
    if new_state.is_running():
        param_value = "Conditional_FoF_{}".format(d) 
        RenameFlowRun().run(flow_run_name=f"{param_value}")
    return

with Flow("Conditional FoF", state_handlers=[renameFlowRun]) as parent_flow: 
    conditional_id = create_flow_run(flow_name="Conditional-Flow", project_name="Test")
    conditional_run = wait_for_flow_run(conditional_id, raise_final_state=True, stream_logs=True)
    <http://logger.info|logger.info>("Conditional_task State: "+str(conditional_run.state))

    with case (conditional_run.state, Success):
        flow_if_success_id = create_flow_run(flow_name="Flow_if_success", project_name="Test", parameters=dict(num=var1))
        flow_if_success_run = wait_for_flow_run(flow_if_success_id, raise_final_state=True, stream_logs=True)
    with case (conditional_run.state, Failed):
        flow_if_failure_id = create_flow_run(flow_name="Flow_if_failure", project_name="Test", parameters=dict(num=var2))
        flow_if_failure_run = wait_for_flow_run(flow_if_failure_id, raise_final_state=True, stream_logs=True)

parent_flow.executor = LocalDaskExecutor()
parent_flow.storage = Docker(image_name="prefect/conditionalfof", image_tag="7feb2022", files={"/home/test/Documents/conditional-fof.py" : "/usr/local/share/prefect/conditional-fof.py"}, stored_as_script=True, local_image=True, path="/usr/local/share/prefect/conditional-fof.py", prefect_directory="/usr/local/share/prefect")

if __name__ == "__main__":
    parent_flow.run()
a
This is not as straightforward so I can totally understand why this is difficult to configure. Here is one idea of how you can approach this:
Copy code
import prefect
from prefect import Flow, Parameter, task, Task
from prefect.engine.state import Success
from prefect.storage import Docker
from prefect.tasks.prefect import RenameFlowRun, create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
import datetime


def rename_flow_run(obj, new_state, old_state):
    d = datetime.datetime.now()
    if new_state.is_running():
        param_value = "Conditional_FoF_{}".format(d)
        RenameFlowRun().run(flow_run_name=f"{param_value}")
    return


@task
def trigger_conditional_flow_runs(flow_run_view):
    if isinstance(flow_run_view.state, Success):
        flow_if_success_id = create_flow_run(
            flow_name="Flow_if_success",
            project_name="Test",
            parameters=dict(num=prefect.context["parameters"].get("var1")),
        ).run()
        flow_if_success_run = wait_for_flow_run(
            flow_if_success_id, raise_final_state=True, stream_logs=True
        ).run()
    else:
        flow_if_failure_id = create_flow_run(
            flow_name="Flow_if_failure",
            project_name="Test",
            parameters=dict(num=prefect.context["parameters"].get("var2")),
        )
        flow_if_failure_run = wait_for_flow_run(
            flow_if_failure_id, raise_final_state=True, stream_logs=True
        )


with Flow(
    "Conditional FoF",
    state_handlers=[rename_flow_run],
    storage=Docker(
        image_name="prefect/conditionalfof",
        image_tag="7feb2022",
        files={
            "/home/test/Documents/conditional-fof.py": "/usr/local/share/prefect/conditional-fof.py"
        },
        stored_as_script=True,
        local_image=True,
        path="/usr/local/share/prefect/conditional-fof.py",
        prefect_directory="/usr/local/share/prefect",
    ),
    executor=LocalDaskExecutor(),
) as parent_flow:
    var1 = Parameter("var1", default=4)
    var2 = Parameter("var2", default=12)
    conditional_id = create_flow_run(flow_name="Conditional-Flow", project_name="Test")
    conditional_run = wait_for_flow_run(
        conditional_id, raise_final_state=True, stream_logs=True
    )
    trigger_conditional_flow_runs(conditional_run)
    parent_flow.add_task(var1)
    parent_flow.add_task(var2)


if __name__ == "__main__":
    parent_flow.visualize()
Does this pattern work for you?
n
It looks like it should, but for some reason it gave me `ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a
with Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call `create_flow_run.run(...)`` and I tried putting .run() in different spots in the trigger conditional flows task but it didn't seem to help. I'll keep trying again tomorrow morning. Thank you for all your help i appreciate it! I'll let you know if I get it working
a
@Nick Hart with a help from @Kevin Kho I found a much better solution! Here is an example flow you may try.
Copy code
from prefect import Flow, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.triggers import all_successful, any_failed
from random import random


@task(log_stdout=True)
def say_hi(flow_name):
    print(f"Hi from {flow_name}")


@task(log_stdout=True)
def randomly_fail():
    nr = random()
    print(f"Number: {nr}")
    if nr > 0.5:
        raise ValueError("Big number!")


with Flow("child1") as child1:
    say_hello = say_hi("child1")
    randomly_fail(upstream_tasks=[say_hello])


with Flow("child2") as child2:
    say_hi("child2")

with Flow("child3") as child3:
    say_hi("child3")


with Flow("conditional_flow_of_flows",) as parent_flow:
    hi = say_hi("Parent flow")
    child1_id = create_flow_run(
        flow_name="child1",
        project_name="xyz",
        task_args=dict(name="Main Flow Run"),
        upstream_tasks=[hi],
    )
    conditional_run = wait_for_flow_run(
        child1_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Condition"),
    )
    # on success of conditional_run:
    child2_id = create_flow_run(
        flow_name="child2",
        project_name="xyz",
        upstream_tasks=[conditional_run],
        task_args=dict(name="Flow run on Success", trigger=all_successful),
    )
    conditional_run_child2 = wait_for_flow_run(
        child2_id, raise_final_state=True, stream_logs=True
    )
    # on failure of conditional_run:
    child3_id = create_flow_run(
        flow_name="child3",
        project_name="xyz",
        upstream_tasks=[conditional_run],
        task_args=dict(name="Flow run on Failure", trigger=any_failed),
    )
    conditional_run_child3 = wait_for_flow_run(
        child3_id, raise_final_state=True, stream_logs=True
    )
    parent_flow.set_reference_tasks([hi])

if __name__ == "__main__":
    # parent_flow.visualize()
    child1.register("xyz")
    child2.register("xyz")
    child3.register("xyz")
    parent_flow.register("xyz")
It works exactly as you wanted - based on a condition, it runs the respective child flow:
btw here is a transcription of this discussion if you need it for a later reference
n
oh no way this is awesome! Yeah that's exactly what I'm looking for. I just tested it on my machine and it works! Thank you again I appreciate all the help!
🙌 1