Another question concerning meta data storage of task results: We are thinking of writing a
task that gets the result of other tasks and writes e.g. into a table whether the task was a
or empty (= task was not yet executed). We have a flow that runs a several tasks for many systems. So the table we would like to have as result would look like:
system id, task_1, task_2, task_3, ...
and then one row for each system. Is this there already a more elegant way to write all the results of a flow into a table or something like this? Or is the above described way the way to go? Cheers 🙂
Hi @Robin yeah I think what you outlined above would be the way to go! Pretty much the load task in a standard ETL flow 🙂
Ah, I think I didnt ask my question clearly. We would like to save the state of the task (failed or success, green or red in cloud ui) in a table. For some specific cases we would add the "missing data" state.
Hmm you might want to look into state handlers which let you react to state changes on tasks, not sure if that fully gets you what you want though. Making a custom state handler would let you take that state directly and write it to a table
Yes, thanks! That's what I envisioned, but I wrongly searched for result handler, which is depricated, right? 🙂
Yeah result handlers predate what are now just called results!
Thanks for the link, state handlers are really exactly what I was looking for!!
Hey @josh , I am a bit confused: A task seems to be both failed and successful 🤔 I implemented the following statehandler:
def log_copy_task_state_to_snowflake(task, old_state, new_state):
    '''logs the state of the copy attempt to a table in snowflake'''

    # states: to be executed (tbe), missing, failed, success

    if new_state.is_failed:
        print(f"{task} failed.")
    if new_state.is_successful:
        print(f"{task} succeeded.")

    return new_state
and added the statehandler as follows to the mapped task:
@task(max_retries=3, retry_delay=timedelta(seconds=60), timeout=300, state_handlers=[log_copy_task_state_to_snowflake])
def copy_configuration(...)
But got the following output:
[2020-10-05 09:56:19] INFO - prefect.copy_configuration[5] | system_id=3123
elapsed time for get_configuration: 0.00 minutes
elapsed time for create_sqlalchemy_engine: 0.00 minutes
elapsed time for create_connection: 0.01 minutes
elapsed time for write_huge_table: 0.08 minutes
<Task: copy_configuration> failed.
<Task: copy_configuration> succeeded.
Any ideas why the task seems to be both failed and successful?
Hmm not off of the top of my head but it could be something related to the mapping, would you mind opening a github issue with some more information on the flow so I could look at it there?
OK, I actually found that the state handler is probably not capable of what I intended to do, so might take some time until I come back to this issue. Shall I post the issue already anyway?