https://prefect.io logo
Title
j

Jen Evenson

06/16/2022, 6:51 PM
Hello! We're trying to figure out the conditional logic for a piece of our flow and struggling to get it right. The problem is that I want
unlock_hosts
to wait for the entire
with case(did_deletes_fail, True):
block (starts on line 16) to finish. But since I can't name that
case
task, I can't set that state dependency. If we get to the
update_state()
task, we need to immediately fail the flow and NOT run any other task, especially
unlock_hosts()
and
daemon_start().
I thought maybe I should use a
switch
instead, but my "b branch" is "do nothing". What conditional logic and dependencies should I be using here? Thanks!
Here's a little diagram of those two case statements.
(And oops, line 8 should be
did_moves_fail, failed_moves = get_failed_moves(move_tables)
)
k

Kevin Kho

06/16/2022, 7:06 PM
This is quite tricky. Let me try it
🙏 1
I am trying this and it seems to work:
from random import random

from prefect import task, Flow, case, Parameter
from prefect.tasks.control_flow import merge
import prefect

@task
def check_first_condition(x):
    return x

@task
def make_jira():
    <http://prefect.context.logger.info|prefect.context.logger.info>("Make Jira")
    return True

@task
def fail_hard():
    <http://prefect.context.logger.info|prefect.context.logger.info>("Fail Hard")
    return True

@task
def action_if_false():
    <http://prefect.context.logger.info|prefect.context.logger.info>("Carry On")
    return 


with Flow("conditional-branches") as flow:
    cond = Parameter("cond", True)
    cond2 = Parameter("cond2", False)

    with case(cond, True):
        with case(cond2, True):
            a = fail_hard()

        with case(cond2, False):
            b = make_jira()

        c = merge(a,b)
    
    with case(cond, False):
        d = action_if_false()

    val = merge(c, d)

flow.run(parameters={"cond": True, "cond2": False})
Or am I missing something? You can edit
cond
and
cond2
at the bottom to test
For your
unlock_host
, you can set the upstream to the second
merge
j

Jen Evenson

06/16/2022, 7:28 PM
Thank you! Let me try this.
k

Kevin Kho

06/16/2022, 7:36 PM
And then for failing the Flow, you can’t just end a Flow simply. You need to use the GraphQL API. The
ENDRUN
signal only applies to a given task. You need to do something like this inside your task:
from prefect.client import Client
client = Client()
query = """
mutation SetFlowRunStates($flowRunId: UUID!, $version: Int!, $state: JSON!) {
  set_flow_run_states(
    input: {
      states: [{ flow_run_id: $flowRunId, state: $state, version: $version }]
    }
  ) {
    states {
      id
      status
      message
    }
  }
}
"""
res = client.graphql(query, variables={
    "flowRunId": "e8a2a732-ac68-4ca9-81cc-1fa587021d30",
    "version": 4,
  	"state": {"type": "Failed", "message":"test"}
}
)
print(res)
Use context to get the current flow run id. I don’t think version matters
j

Jen Evenson

06/16/2022, 7:38 PM
Yes, that's what my update_state() task attempts to do. Based on this help doc: https://docs.prefect.io/orchestration/flow-runs/setting-states.html
Thanks!!
k

Kevin Kho

06/16/2022, 7:39 PM
Ah ok sounds good!
j

Jen Evenson

06/17/2022, 4:29 PM
@Kevin Kho - thanks for the help yesterday. That logic totally works for our use case. I'm wondering if there's a way to fail a flow when running the flow from the prefect command line using
server
as the backend. I'm assuming my graphql mutation is failing because I'm running this locally (while I develop it) vs in our (self-hosted) prefect
cloud
...
k

Kevin Kho

06/17/2022, 4:30 PM
Ah unfortunately not. You are saying terminate
flow.run()
right? That is hard. But I think the way to reframe it is if you can use the SKIP signal instead?
Maybe you can use the
context
, there is a field called
run_with_backend
I think
To distinguish between local run and cloud run
j

Jen Evenson

06/17/2022, 4:38 PM
Interesting. I will look for it in the docs. Thanks!
k

Kevin Kho

06/17/2022, 4:41 PM
Not in the docs. Try a cloud run with:
print(dict(prefect.context))
to see all available context fields but there is certainly done
👍 1