Hey, I’m trying to use the `LOOP` and write the re...
# prefect-community
j
Hey, I’m trying to use the
LOOP
and write the results on each iteration as documented here. However, it doesn’t seem to work. It only writes the last iteration.
Copy code
@task()
def log_output(result):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(result)


@task(result=LocalResult(dir='./results', location='test-{task_loop_count}.prefect'))
def loop_test():
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    print(n)

    if n > 5:
        return n

    raise LOOP(f'Iteration {n}', result=dict(n=n+1))


with Flow("Postgres -> BigQuery") as flow:
    x = loop_test()
    log_output(x)
See output logging and diagnostics in thread
👀 1
Copy code
[2020-11-05 07:11:07] INFO - prefect.FlowRunner | Beginning Flow run for 'Postgres -> BigQuery'
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Flow 'Postgres -> BigQuery': Handling state change from Scheduled to Running
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
1
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
2
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
3
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
4
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
5
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
6
[2020-11-05 07:11:07] DEBUG - prefect.LocalResult | Starting to upload result to test-6.prefect...
[2020-11-05 07:11:07] DEBUG - prefect.LocalResult | Finished uploading result to /Users/joell/joell.dev/Scraper/scraper-next/prefect-sync/results/test-6.prefect...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Success
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Success'
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'log_output': Starting task run...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Calling task.run() method...
[2020-11-05 07:11:07] INFO - prefect.log_output | 6
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Handling state change from Running to Success
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'log_output': Finished task run for task with final state: 'Success'
[2020-11-05 07:11:07] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Flow 'Postgres -> BigQuery': Handling state change from Running to Success
{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__LOGGING__LEVEL",
    "PREFECT__CONTEXT__SECRETS__POSTGRES",
    "PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS",
    "PREFECT__FLOWS__CHECKPOINTING"
  ],
  "flow_information": {
    "environment": {
      "executor": true,
      "labels": true,
      "logger": true,
      "metadata": {
        "image": true
      },
      "on_exit": false,
      "on_start": false,
      "type": "LocalEnvironment"
    },
    "result": {
      "type": "LocalResult"
    },
    "schedule": {},
    "storage": {
      "_flows": {
        "Postgres -> BigQuery": true
      },
      "_labels": false,
      "add_default_labels": true,
      "directory": true,
      "flows": {
        "Postgres -> BigQuery": true
      },
      "path": false,
      "result": true,
      "secrets": false,
      "stored_as_script": false,
      "type": "Local"
    },
    "task_count": 2
  },
  "system_information": {
    "platform": "macOS-10.15.7-x86_64-i386-64bit",
    "prefect_backend": "server",
    "prefect_version": "0.13.13",
    "python_version": "3.8.6"
  }
}
n
Hi @Joël Luijmes - thanks for reporting this - I can confirm each iteration of the loop is overwriting the previous result as described here, despite a templated location being present (this holds true for strings and callables on the location kwarg).
@Marvin open "Templated results are overwritten on raised LOOP"