Joël Luijmes
11/05/2020, 7:14 AMLOOP
and write the results on each iteration as documented here. However, it doesn’t seem to work. It only writes the last iteration.
@task()
def log_output(result):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(result)
@task(result=LocalResult(dir='./results', location='test-{task_loop_count}.prefect'))
def loop_test():
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)
print(n)
if n > 5:
return n
raise LOOP(f'Iteration {n}', result=dict(n=n+1))
with Flow("Postgres -> BigQuery") as flow:
x = loop_test()
log_output(x)
See output logging and diagnostics in thread[2020-11-05 07:11:07] INFO - prefect.FlowRunner | Beginning Flow run for 'Postgres -> BigQuery'
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Flow 'Postgres -> BigQuery': Handling state change from Scheduled to Running
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
1
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
2
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
3
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
4
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
5
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
6
[2020-11-05 07:11:07] DEBUG - prefect.LocalResult | Starting to upload result to test-6.prefect...
[2020-11-05 07:11:07] DEBUG - prefect.LocalResult | Finished uploading result to /Users/joell/joell.dev/Scraper/scraper-next/prefect-sync/results/test-6.prefect...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Success
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Success'
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'log_output': Starting task run...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Calling task.run() method...
[2020-11-05 07:11:07] INFO - prefect.log_output | 6
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Handling state change from Running to Success
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'log_output': Finished task run for task with final state: 'Success'
[2020-11-05 07:11:07] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Flow 'Postgres -> BigQuery': Handling state change from Running to Success
{
"config_overrides": {},
"env_vars": [
"PREFECT__LOGGING__LEVEL",
"PREFECT__CONTEXT__SECRETS__POSTGRES",
"PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS",
"PREFECT__FLOWS__CHECKPOINTING"
],
"flow_information": {
"environment": {
"executor": true,
"labels": true,
"logger": true,
"metadata": {
"image": true
},
"on_exit": false,
"on_start": false,
"type": "LocalEnvironment"
},
"result": {
"type": "LocalResult"
},
"schedule": {},
"storage": {
"_flows": {
"Postgres -> BigQuery": true
},
"_labels": false,
"add_default_labels": true,
"directory": true,
"flows": {
"Postgres -> BigQuery": true
},
"path": false,
"result": true,
"secrets": false,
"stored_as_script": false,
"type": "Local"
},
"task_count": 2
},
"system_information": {
"platform": "macOS-10.15.7-x86_64-i386-64bit",
"prefect_backend": "server",
"prefect_version": "0.13.13",
"python_version": "3.8.6"
}
}
nicholas
Marvin
11/05/2020, 3:30 PM