Anders Segerberg
01/18/2023, 5:43 PMcheck
which takes the result of a task run, and returns True
if it failed with a TimeOutError, False
otherwise
Adding a state handler would let me do this, but all im interested in is the terminal state of the task; and it seems one has to handle each state transition, in the state handler. This feels somewhat brittle to me. Moreover, I would have to communicate the state handler's behavior back to the parent Flow.
The other idea is to have a downstream task that sets the timing-out task as an upstream, and changes the trigger to be any_failed. However, this isn't helpful, because the behavior I want is
• If the upstream task times out, this is considered successful behavior
• if it returns before timing out, this is considered failed behavior
So I can't really inspect both conditions in the same function.Kalise Richmond
01/18/2023, 7:40 PMAnders Segerberg
01/18/2023, 8:19 PM@task(timeout=5)
def A():
# Return queue response if one is received; or timeout
while True:
resp = queue.poll()
if resp:
return resp
@task
def B(res):
# Convert the task state/result to a boolean
if res timed out:
return True
return False
@task
def C(timed_out):
if timed_out:
print("Queue is empty!")
else:
print("Queue is non empty!")
with Flow('f') as f:
a = A()
f.add_task(a)
timed_out = B(a)
C(timed_out)
If I were to use a signal handler to handle A
timing out, how could I communicate that back to the main flow context, so that that state transition is available to B
?context.queue_empty = True
but I dont think that will work either
If I could set up the Flow to inspect a tasks' runtime state, that might also solve things.