Jen Evenson
06/16/2022, 6:51 PMunlock_hosts
to wait for the entire with case(did_deletes_fail, True):
block (starts on line 16) to finish. But since I can't name that case
task, I can't set that state dependency.
If we get to the update_state()
task, we need to immediately fail the flow and NOT run any other task, especially unlock_hosts()
and daemon_start().
I thought maybe I should use a switch
instead, but my "b branch" is "do nothing".
What conditional logic and dependencies should I be using here? Thanks!did_moves_fail, failed_moves = get_failed_moves(move_tables)
)Kevin Kho
from random import random
from prefect import task, Flow, case, Parameter
from prefect.tasks.control_flow import merge
import prefect
@task
def check_first_condition(x):
return x
@task
def make_jira():
<http://prefect.context.logger.info|prefect.context.logger.info>("Make Jira")
return True
@task
def fail_hard():
<http://prefect.context.logger.info|prefect.context.logger.info>("Fail Hard")
return True
@task
def action_if_false():
<http://prefect.context.logger.info|prefect.context.logger.info>("Carry On")
return
with Flow("conditional-branches") as flow:
cond = Parameter("cond", True)
cond2 = Parameter("cond2", False)
with case(cond, True):
with case(cond2, True):
a = fail_hard()
with case(cond2, False):
b = make_jira()
c = merge(a,b)
with case(cond, False):
d = action_if_false()
val = merge(c, d)
flow.run(parameters={"cond": True, "cond2": False})
cond
and cond2
at the bottom to testunlock_host
, you can set the upstream to the second merge
Jen Evenson
06/16/2022, 7:28 PMKevin Kho
ENDRUN
signal only applies to a given task. You need to do something like this inside your task:
from prefect.client import Client
client = Client()
query = """
mutation SetFlowRunStates($flowRunId: UUID!, $version: Int!, $state: JSON!) {
set_flow_run_states(
input: {
states: [{ flow_run_id: $flowRunId, state: $state, version: $version }]
}
) {
states {
id
status
message
}
}
}
"""
res = client.graphql(query, variables={
"flowRunId": "e8a2a732-ac68-4ca9-81cc-1fa587021d30",
"version": 4,
"state": {"type": "Failed", "message":"test"}
}
)
print(res)
Jen Evenson
06/16/2022, 7:38 PMKevin Kho
Jen Evenson
06/17/2022, 4:29 PMserver
as the backend. I'm assuming my graphql mutation is failing because I'm running this locally (while I develop it) vs in our (self-hosted) prefect cloud
...Kevin Kho
flow.run()
right? That is hard. But I think the way to reframe it is if you can use the SKIP signal instead?context
, there is a field called run_with_backend
I thinkJen Evenson
06/17/2022, 4:38 PMKevin Kho
print(dict(prefect.context))
to see all available context fields but there is certainly done