Does anyone have a working example of specifying a...
# prefect-community
j
Does anyone have a working example of specifying a
result_handler
and having
write()
&
read()
functions called? (Mainly interested in
result_handler
on a Task, but Flow-level or overall default is fine, too.) I can't seem to get my ResultHandler called and I'm sure it's something silly I'm doing. (I can post a simple example if it helps, but I suspect others have this working just fine.)
a
Copy code
from prefect import task, Flow

def my_state_handler(task, old_state, new_state):
    type = new_state.type if False else None
    print(f'log this stuff! task: {task.name}, type: {type}, message: {new_state.message}, result: {new_state.result}')


@task(state_handlers=[my_state_handler])
def do_something(var = 'Nothing'):
    print(f'doing something: {var}')
    return var


with Flow('doing it', state_handlers=[my_state_handler]) as flow:
    t1 = do_something(1)
    t2 = do_something(t1+ 99)
    t3 = do_something(3)
    t4 = do_something(4)


flow_state = flow.run()
t1_state = flow_state.result[t1]
print(f't1_state: {t1_state}')
@Joe Schmid oh I just realized this is a state handler not a result handler. I'll leave it in case it helps, but prob isn't the same thing
c
Hey @Joe Schmid! Good question - result handlers are never called unless Core is being run on Cloud; I’d be interested to hear if you want them to be called for testing purposes or for actual functionality that you need in Core
j
Hi @Chris White, that makes sense now! (That they're only called when running on Cloud.) My thinking was that having control over storage could work well when we're preparing large feature vectors (hundreds or maybe even 1,000+ features for large numbers of rows). We currently (without Prefect) persist these to parquet files in an S3 bucket.
I don't think we have to have this functionality in Core since we could just do this manually (write & read) in our Tasks, but it seemed more "Prefect-y" to just return data & let a ResultHandler store it and load it.
c
That makes sense! In an older version of Prefect the
checkpoint
flag on tasks would have the effect of calling result handlers regardless of whether it was being run on Cloud, but we ended up making this something that is only turned on in Cloud. However, if you want to see it in action, simply set this environment variable:
Copy code
PREFECT__CONTEXT__CLOUD=True
This should only have the effect of turning on “checkpointing” for tasks (tasks which are initialized with
checkpoint=True
)
Caveat: - in this case all Parameters must be JSON serializable This makes me wonder if we should just rename this context variable to “result_handlers” or something more generic, and elevate it to a first-class toggle in Prefect Core (and this toggle just happens to be always turned on in Cloud)
j
I kinda like that option. (first-class toggle) We'll play with it a bit and try to have some more educated feedback for you. Thanks for the info!
c
Anytime!
Hey @Joe Schmid! -> https://github.com/PrefectHQ/prefect/pull/1283 was just merged, which allows you to set an environment variable / config option
PREFECT__FLOWS__CHECKPOINTING=true
. When this option is
true
, all tasks with
checkpoint=True
will have their result handlers called during execution
this isn’t well documented yet, but wanted to bring your attention to it if you wanted to kick the tires!
j
@Chris White that's great. Thanks for the heads up -- much appreciated!
c
@Marvin archive “Result handlers are not being called”