https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Steve s

05/14/2022, 2:48 PM
Hi all, I'm seeing a new error crop up in a flow I've been running stably for a few months. The flow is a top-level pipeline that runs a series of
create_flow_run
(and
wait_for_flow_run
) tasks. One of these steps is followed up with a
get_task_run_result
, which has always worked without issue until today. Now it's throwing this error:
ValueError: The task result cannot be loaded if it is not finished
. I'm not seeing how this could be, since I can see in the logs that the upstream task did in fact finish successfully. I tried explicitly setting the result of
wait_for_flow_run
as an upstream dependency of
get_task_run_result
(which i think shouldn't be needed), and I also tried setting the
poll_time
to
30
, but still no luck. Does anyone have any ideas?
a

Anna Geller

05/14/2022, 3:43 PM
can you share the flow structure? do you wait until the child run finishes and did you set the state dependencies correctly to ensure you get the task run result only once all task runs in the child flow run finished?
seeing the flow code would help
s

Steve s

05/14/2022, 3:49 PM
here's the part that's failing:
Copy code
flow_etl_trips_id = create_flow_run(flow_name="ETL Trips")
check_flow_etl_trips_id = check_run_terminal_state(
  wait_for_flow_run(flow_etl_trips_id)
)

latest_file_data = get_task_run_result(flow_etl_trips_id, "latest_file-1", poll_time=30)
latest_file_data.set_upstream(check_flow_etl_trips_id)
i believe i'm setting the dependencies correctly here, but as i understand it i shouldn't have to explicitly (see the note labeled "Results require completion" at this link). i originally left out that
set_upstream
and had no issues with it until today. but adding it in hasn't fixed my problem unfortunately
a

Anna Geller

05/14/2022, 3:54 PM
you may need to raise_final_state to ensure you don't try to retrieve results if the child flow run fails:
Copy code
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

PROJECT_NAME = "community"


with Flow("sample_flow") as flow:
    child_run_id = create_flow_run(
        flow_name="child_flow",
        project_name=PROJECT_NAME,
        task_args={"name": "Friendly name for a DAG node"},
    )
    extract_load_wait_task = wait_for_flow_run(
        child_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args={"name": "Friendly wait task name"},
    )
I agree your dependencies are set correctly
s

Steve s

05/14/2022, 3:56 PM
i'm handling an upstream failure with this (maybe hack-y) task:
Copy code
@task
def check_run_terminal_state(view):
  if isinstance(
    view.state,
    Success
  ):
    raise signals.SUCCESS()
  else:
    raise signals.FAIL()
but in this case the upstream flow run is succeeding
a

Anna Geller

05/14/2022, 3:57 PM
I'd recommend sticking to raise_final_state - it already polls for the end state and might be more reliable than manually raising signals
I need to run, can check tomorrow - keep us posted on your progress
s

Steve s

05/14/2022, 3:57 PM
okay, thank you for the suggestion, i'll give that a try!
@Anna Geller thanks again for the suggestion - i think i must be running an old version of Prefect (0.15.1), now it's throwing
got an unexpected keyword argument 'raise_final_state'
. i pulled up the method def for
wait_for_flow_run
on my local machine and confirmed that kwarg isn't specified there
today another flow of mine that follows the same basic pattern started failing with the same error i posted yesterday. the flow hasn't changed in 2 months and has run daily without issue until now. any possibility something changed on the Cloud platform? or do you think my best bet at this point is to upgrade Prefect and try the
raise_final_state
approach again?
a

Anna Geller

05/15/2022, 2:34 PM
Yes, you're right I believe upgrading to a more recent version seems like the best way to approach it
We've made a lot of improvements to those tasks since that version, including ability to stream logs, raise final state, add scheduled date etc
s

Steve s

05/15/2022, 2:35 PM
okay cool, i'll give that shot. thanks again for your help, i really appreciate it!
👍 1
hi again! quick update: • the original problem started to occur only intermittently on 0.15.1 • i upgraded to 1.2.1 anyway and also found it to be intermittent • i gave your fix a try (dropping my hacky state check and using
raise_final_state
instead), but this didn't change the error behavior - still occurs intermittently at this point i'm just going to re-factor some things and stop using
get_task_run_result
, but i'm starting to think this might be a bug in Prefect. i'm not sure how to further diagnose though 😞
k

Kevin Kho

05/23/2022, 3:34 PM
That’s really weird. Are you writing out something big? I’m wondering if the task is done, but the checkpoint it still being saved
s

Steve s

05/23/2022, 3:36 PM
no, it's just a dict containing one filename and one url
k

Kevin Kho

05/23/2022, 3:39 PM
Yeah I can’t think of anything that would cause this if it’s already done unfortunately. I guess something you can do is write it out yourself, and then persist the location in the KV store. And then you can retrieve it. How intermittent is this? Happening every other day? Once a week?
s

Steve s

05/23/2022, 3:43 PM
luckily i can safely offload the failing part to one of the sub-flows so i'm just going to re-factor, no big deal. i'd say roughly every other day is right. i have two flows that are impacted though and today one failed but the other succeeded
k

Kevin Kho

05/23/2022, 3:44 PM
Ok yeah that’s weird. Will keep this in mind for other reports.
If the wait_for_flow_run finished, this should work
s

Steve s

05/23/2022, 3:46 PM
yeah that part i can see is succeeding based on the logs
6 Views