https://prefect.io logo
Title
j

Jon

03/14/2023, 2:41 PM
In prefect 1, with the flow of flows pattern,
get_task_run_result
occasionally fails to get the task result of a child flow. looking at the timestamps, the task result is available. any sense for what's going on?
Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 239, in get_task_run_result
    return task_run.get_result()
  File "/usr/local/lib/python3.9/site-packages/prefect/backend/task_run.py", line 73, in get_result
    raise ValueError("The task result cannot be loaded if it is not finished.")
ValueError: The task result cannot be loaded if it is not finished.
1
this is a flaky failure. i can't replicate it locally, and only see it sometimes in prefect cloud
bump 😅
bump 😅
b

Bianca Hoch

03/16/2023, 7:18 PM
Hey Jon, thanks for reaching out! Where are you calling
get_task_run_result
? Is it from the parent flow, like this example?
My other line of thought is that there may be some sort of race condition going on. Have you tried adding a sleep before calling
get_task_run_result
?
j

Jon

04/04/2023, 8:11 PM
hey @Bianca Hoch thanks for hopping in here. i am using the pattern you shared above. we seem to have resolved the issue by wrapping it in a task and retrying it:
@prefect.task(
    max_retries=3,
    retry_delay=timedelta(seconds=10),
)
def get_child_flow_task_run_result(
    flow_run_id: str,
    task_slug: str,
) -> Any:
    """Gets the result of a task in a child flow.

    Args:
        flow_run_id (str): child flow run id.
        task_slug (str): child flow task slug

    Returns:
        Any: the result of the task
    """
    <http://logger.info|logger.info>(
        f"getting task result for [flow_run_id: '{flow_run_id}', "
        f"task_slug: {task_slug}]."
    )

    result = get_task_run_result.run(
        flow_run_id=flow_run_id,
        # in any given flow, we might call a task multiple times
        # the "-#" is used as an index to identify which one
        task_slug=task_slug,
        # explicitly wait for the flow to finish before getting its result
    )

    <http://logger.info|logger.info>(
        f"got task result for [flow_run_id: '{flow_run_id}', "
        f"task_slug: {task_slug}]. Result: '{result}'"
    )

    return result
🙌 1
1