Joe Schmid
07/25/2019, 3:01 PMresult_handler
and having write()
& read()
functions called? (Mainly interested in result_handler
on a Task, but Flow-level or overall default is fine, too.) I can't seem to get my ResultHandler called and I'm sure it's something silly I'm doing. (I can post a simple example if it helps, but I suspect others have this working just fine.)Adam Roderick
07/25/2019, 3:22 PMfrom prefect import task, Flow
def my_state_handler(task, old_state, new_state):
type = new_state.type if False else None
print(f'log this stuff! task: {task.name}, type: {type}, message: {new_state.message}, result: {new_state.result}')
@task(state_handlers=[my_state_handler])
def do_something(var = 'Nothing'):
print(f'doing something: {var}')
return var
with Flow('doing it', state_handlers=[my_state_handler]) as flow:
t1 = do_something(1)
t2 = do_something(t1+ 99)
t3 = do_something(3)
t4 = do_something(4)
flow_state = flow.run()
t1_state = flow_state.result[t1]
print(f't1_state: {t1_state}')
Adam Roderick
07/25/2019, 3:23 PMChris White
Joe Schmid
07/25/2019, 3:37 PMJoe Schmid
07/25/2019, 3:40 PMChris White
checkpoint
flag on tasks would have the effect of calling result handlers regardless of whether it was being run on Cloud, but we ended up making this something that is only turned on in Cloud. However, if you want to see it in action, simply set this environment variable:
PREFECT__CONTEXT__CLOUD=True
This should only have the effect of turning on “checkpointing” for tasks (tasks which are initialized with checkpoint=True
)Chris White
Joe Schmid
07/25/2019, 4:00 PMChris White
Chris White
PREFECT__FLOWS__CHECKPOINTING=true
. When this option is true
, all tasks with checkpoint=True
will have their result handlers called during executionChris White
Joe Schmid
07/30/2019, 6:45 AMChris White
Marvin
08/05/2019, 9:48 PM