j

    Jen Evenson

    3 months ago
    Hello! We're trying to figure out the conditional logic for a piece of our flow and struggling to get it right. The problem is that I want
    unlock_hosts
    to wait for the entire
    with case(did_deletes_fail, True):
    block (starts on line 16) to finish. But since I can't name that
    case
    task, I can't set that state dependency. If we get to the
    update_state()
    task, we need to immediately fail the flow and NOT run any other task, especially
    unlock_hosts()
    and
    daemon_start().
    I thought maybe I should use a
    switch
    instead, but my "b branch" is "do nothing". What conditional logic and dependencies should I be using here? Thanks!
    Here's a little diagram of those two case statements.
    (And oops, line 8 should be
    did_moves_fail, failed_moves = get_failed_moves(move_tables)
    )
    Kevin Kho

    Kevin Kho

    3 months ago
    This is quite tricky. Let me try it
    I am trying this and it seems to work:
    from random import random
    
    from prefect import task, Flow, case, Parameter
    from prefect.tasks.control_flow import merge
    import prefect
    
    @task
    def check_first_condition(x):
        return x
    
    @task
    def make_jira():
        <http://prefect.context.logger.info|prefect.context.logger.info>("Make Jira")
        return True
    
    @task
    def fail_hard():
        <http://prefect.context.logger.info|prefect.context.logger.info>("Fail Hard")
        return True
    
    @task
    def action_if_false():
        <http://prefect.context.logger.info|prefect.context.logger.info>("Carry On")
        return 
    
    
    with Flow("conditional-branches") as flow:
        cond = Parameter("cond", True)
        cond2 = Parameter("cond2", False)
    
        with case(cond, True):
            with case(cond2, True):
                a = fail_hard()
    
            with case(cond2, False):
                b = make_jira()
    
            c = merge(a,b)
        
        with case(cond, False):
            d = action_if_false()
    
        val = merge(c, d)
    
    flow.run(parameters={"cond": True, "cond2": False})
    Or am I missing something? You can edit
    cond
    and
    cond2
    at the bottom to test
    For your
    unlock_host
    , you can set the upstream to the second
    merge
    j

    Jen Evenson

    3 months ago
    Thank you! Let me try this.
    Kevin Kho

    Kevin Kho

    3 months ago
    And then for failing the Flow, you can’t just end a Flow simply. You need to use the GraphQL API. The
    ENDRUN
    signal only applies to a given task. You need to do something like this inside your task:
    from prefect.client import Client
    client = Client()
    query = """
    mutation SetFlowRunStates($flowRunId: UUID!, $version: Int!, $state: JSON!) {
      set_flow_run_states(
        input: {
          states: [{ flow_run_id: $flowRunId, state: $state, version: $version }]
        }
      ) {
        states {
          id
          status
          message
        }
      }
    }
    """
    res = client.graphql(query, variables={
        "flowRunId": "e8a2a732-ac68-4ca9-81cc-1fa587021d30",
        "version": 4,
      	"state": {"type": "Failed", "message":"test"}
    }
    )
    print(res)
    Use context to get the current flow run id. I don’t think version matters
    j

    Jen Evenson

    3 months ago
    Yes, that's what my update_state() task attempts to do. Based on this help doc: https://docs.prefect.io/orchestration/flow-runs/setting-states.html
    Thanks!!
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah ok sounds good!
    j

    Jen Evenson

    3 months ago
    @Kevin Kho - thanks for the help yesterday. That logic totally works for our use case. I'm wondering if there's a way to fail a flow when running the flow from the prefect command line using
    server
    as the backend. I'm assuming my graphql mutation is failing because I'm running this locally (while I develop it) vs in our (self-hosted) prefect
    cloud
    ...
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah unfortunately not. You are saying terminate
    flow.run()
    right? That is hard. But I think the way to reframe it is if you can use the SKIP signal instead?
    Maybe you can use the
    context
    , there is a field called
    run_with_backend
    I think
    To distinguish between local run and cloud run
    j

    Jen Evenson

    3 months ago
    Interesting. I will look for it in the docs. Thanks!
    Kevin Kho

    Kevin Kho

    3 months ago
    Not in the docs. Try a cloud run with:
    print(dict(prefect.context))
    to see all available context fields but there is certainly done