Joe Schmid
07/25/2019, 3:01 PMresult_handler
and having write()
& read()
functions called? (Mainly interested in result_handler
on a Task, but Flow-level or overall default is fine, too.) I can't seem to get my ResultHandler called and I'm sure it's something silly I'm doing. (I can post a simple example if it helps, but I suspect others have this working just fine.)Adam Roderick
07/25/2019, 3:22 PMfrom prefect import task, Flow
def my_state_handler(task, old_state, new_state):
type = new_state.type if False else None
print(f'log this stuff! task: {task.name}, type: {type}, message: {new_state.message}, result: {new_state.result}')
@task(state_handlers=[my_state_handler])
def do_something(var = 'Nothing'):
print(f'doing something: {var}')
return var
with Flow('doing it', state_handlers=[my_state_handler]) as flow:
t1 = do_something(1)
t2 = do_something(t1+ 99)
t3 = do_something(3)
t4 = do_something(4)
flow_state = flow.run()
t1_state = flow_state.result[t1]
print(f't1_state: {t1_state}')
Chris White
07/25/2019, 3:28 PMJoe Schmid
07/25/2019, 3:37 PMChris White
07/25/2019, 3:45 PMcheckpoint
flag on tasks would have the effect of calling result handlers regardless of whether it was being run on Cloud, but we ended up making this something that is only turned on in Cloud. However, if you want to see it in action, simply set this environment variable:
PREFECT__CONTEXT__CLOUD=True
This should only have the effect of turning on “checkpointing” for tasks (tasks which are initialized with checkpoint=True
)Joe Schmid
07/25/2019, 4:00 PMChris White
07/25/2019, 4:10 PMPREFECT__FLOWS__CHECKPOINTING=true
. When this option is true
, all tasks with checkpoint=True
will have their result handlers called during executionJoe Schmid
07/30/2019, 6:45 AMChris White
08/05/2019, 9:48 PMMarvin
08/05/2019, 9:48 PM