Another question concerning meta data storage of t...
# prefect-community
r
Another question concerning meta data storage of task results: We are thinking of writing a
write_task_result_to_table
task that gets the result of other tasks and writes e.g. into a table whether the task was a
success
,
failed
,
missing data
or empty (= task was not yet executed). We have a flow that runs a several tasks for many systems. So the table we would like to have as result would look like:
system id, task_1, task_2, task_3, ...
and then one row for each system. Is this there already a more elegant way to write all the results of a flow into a table or something like this? Or is the above described way the way to go? Cheers 🙂
j
Hi @Robin yeah I think what you outlined above would be the way to go! Pretty much the load task in a standard ETL flow 🙂
r
Ah, I think I didnt ask my question clearly. We would like to save the state of the task (failed or success, green or red in cloud ui) in a table. For some specific cases we would add the "missing data" state.
j
Hmm you might want to look into state handlers which let you react to state changes on tasks https://docs.prefect.io/core/concepts/notifications.html#state-handlers, not sure if that fully gets you what you want though. Making a custom state handler would let you take that state directly and write it to a table
❤️ 1
r
Yes, thanks! That's what I envisioned, but I wrongly searched for result handler, which is depricated, right? 🙂
j
Yeah result handlers predate what are now just called results!
r
Thanks for the link, state handlers are really exactly what I was looking for!!
Hey @josh , I am a bit confused: A task seems to be both failed and successful 🤔 I implemented the following statehandler:
Copy code
def log_copy_task_state_to_snowflake(task, old_state, new_state):
    '''logs the state of the copy attempt to a table in snowflake'''

    # states: to be executed (tbe), missing, failed, success

    if new_state.is_failed:
        print(f"{task} failed.")
    
    if new_state.is_successful:
        print(f"{task} succeeded.")

    return new_state
and added the statehandler as follows to the mapped task:
Copy code
@task(max_retries=3, retry_delay=timedelta(seconds=60), timeout=300, state_handlers=[log_copy_task_state_to_snowflake])
def copy_configuration(...)
But got the following output:
Copy code
[2020-10-05 09:56:19] INFO - prefect.copy_configuration[5] | system_id=3123
elapsed time for get_configuration: 0.00 minutes
elapsed time for create_sqlalchemy_engine: 0.00 minutes
elapsed time for create_connection: 0.01 minutes
elapsed time for write_huge_table: 0.08 minutes
<Task: copy_configuration> failed.
<Task: copy_configuration> succeeded.
Any ideas why the task seems to be both failed and successful?
j
Hmm not off of the top of my head but it could be something related to the mapping, would you mind opening a github issue with some more information on the flow so I could look at it there?
r
OK, I actually found that the state handler is probably not capable of what I intended to do, so might take some time until I come back to this issue. Shall I post the issue already anyway?