Hi there, quick question: I saw once how to capture inside a task that the upstream tasks were SKIPPED but I'm finding any more. Does anyone knows where I can find it?
c
Chris White
11/05/2019, 12:50 AM
Hi @Argemiro Neto -> States are typically how Prefect internally manages your tasks, so they aren’t normally exposed during execution. The only places they’re really exposed to users are:
- state handlers for your tasks (but in this case the only state you’d have access to is the current task’s state)
- the Cloud API; you can always query for information directly from Cloud
- task triggers; task triggers are always provided all upstream states but this is called prior to your task’s run
If you share some more info on why you need this I might be able to help more
a
Argemiro Neto
11/05/2019, 12:52 AM
that's fine. I thought I had seen something like that. I'm currently testing if there is a SKIP or FAIL error before proceed in an
always_run
task.
c
Chris White
11/05/2019, 12:53 AM
ah ok great! hang on i can share some code you can use a starting point
Chris White
11/05/2019, 12:57 AM
Copy code
from prefect import task, Flow, Parameter
from prefect.engine.signals import FAIL, SKIP
def custom_trigger(upstream_states):
if any(isinstance(s.result, FAIL) for s in upstream_states):
return False
else:
return True
@task(trigger=custom_trigger)
def my_task():
print('running')
@task
def upstream(x):
if x == 1:
raise FAIL("1")
else:
raise SKIP("other")
with Flow("example") as flow:
x = Parameter("x", default=1)
result = my_task(upstream_tasks=[upstream(x)])
flow.run() # fails
flow.run(x=0) # succeeds
a
Argemiro Neto
11/05/2019, 1:25 AM
yeah, this is basically what I am doing. I just wondered if there was a better way than that
c
Chris White
11/05/2019, 1:49 AM
this is ultimately the way I would recommend -> any decisions about whether your task should run or not based on upstream information should ideally occur within your trigger
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.