Robin
09/30/2020, 12:11 PMwrite_task_result_to_table
task that gets the result of other tasks and writes e.g. into a table whether the task was a success
, failed
, missing data
or empty (= task was not yet executed).
We have a flow that runs a several tasks for many systems.
So the table we would like to have as result would look like:
system id, task_1, task_2, task_3, ...
and then one row for each system.
Is this there already a more elegant way to write all the results of a flow into a table or something like this?
Or is the above described way the way to go?
Cheers 🙂josh
09/30/2020, 1:10 PMRobin
09/30/2020, 1:19 PMjosh
09/30/2020, 1:21 PMRobin
09/30/2020, 3:46 PMjosh
09/30/2020, 3:47 PMRobin
09/30/2020, 3:49 PMdef log_copy_task_state_to_snowflake(task, old_state, new_state):
'''logs the state of the copy attempt to a table in snowflake'''
# states: to be executed (tbe), missing, failed, success
if new_state.is_failed:
print(f"{task} failed.")
if new_state.is_successful:
print(f"{task} succeeded.")
return new_state
and added the statehandler as follows to the mapped task:
@task(max_retries=3, retry_delay=timedelta(seconds=60), timeout=300, state_handlers=[log_copy_task_state_to_snowflake])
def copy_configuration(...)
But got the following output:
[2020-10-05 09:56:19] INFO - prefect.copy_configuration[5] | system_id=3123
elapsed time for get_configuration: 0.00 minutes
elapsed time for create_sqlalchemy_engine: 0.00 minutes
elapsed time for create_connection: 0.01 minutes
elapsed time for write_huge_table: 0.08 minutes
<Task: copy_configuration> failed.
<Task: copy_configuration> succeeded.
Any ideas why the task seems to be both failed and successful?josh
10/05/2020, 1:32 PMRobin
10/05/2020, 3:48 PM