https://prefect.io logo
Title
a

Anders Segerberg

01/18/2023, 5:43 PM
Prefect 1.x I have a task I want to run, and then be able to inspect the resulting state of the task -- including if it is a failure (in particular, a timeout). Specifically, the pattern I want is; within a Flow, define a function
check
which takes the result of a task run, and returns
True
if it failed with a TimeOutError,
False
otherwise Adding a state handler would let me do this, but all im interested in is the terminal state of the task; and it seems one has to handle each state transition, in the state handler. This feels somewhat brittle to me. Moreover, I would have to communicate the state handler's behavior back to the parent Flow. The other idea is to have a downstream task that sets the timing-out task as an upstream, and changes the trigger to be any_failed. However, this isn't helpful, because the behavior I want is • If the upstream task times out, this is considered successful behavior • if it returns before timing out, this is considered failed behavior So I can't really inspect both conditions in the same function.
k

Kalise Richmond

01/18/2023, 7:40 PM
Have you thought about using Signals?
a

Anders Segerberg

01/18/2023, 8:19 PM
I'm not sure how to get that to work. For some context, I have a Task that polls a queue in AWS. If it receives a response before the Task times out, I consider the queue non-empty; else, I consider it empty. I basically want a function/task that I can call from within the flow to determine if the queue is empty or not, and to proceed from there. In a simplified example, something like this:
@task(timeout=5)
def A():
  # Return queue response if one is received; or timeout
  while True:
   resp = queue.poll()
   if resp:
     return resp
 
@task
def B(res):
 # Convert the task state/result to a boolean
 if res timed out:
   return True
 return False

@task
def C(timed_out):
  if timed_out:
    print("Queue is empty!")
  else:
    print("Queue is non empty!")

with Flow('f') as f:
 a = A()
 f.add_task(a)

 timed_out = B(a)
 C(timed_out)
If I were to use a signal handler to handle
A
timing out, how could I communicate that back to the main flow context, so that that state transition is available to
B
?
I was looking into using prefect.Context for a state handler for A to set
context.queue_empty = True
but I dont think that will work either If I could set up the Flow to inspect a tasks' runtime state, that might also solve things.