https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Tanasorn Chindasook

05/16/2022, 2:32 PM
Hi everyone! I encountered an error when attempting to restart a flow from the failed task and was wondering if anyone has some best practices on how to handle this? Context: • I have a "Flow of Flows" that triggers a few connectors from Fivetran in parallel then triggers a dbt job run for the data modelling for the source once the data has finished syncing • There is one dbt job that is then run after all the prior dbt jobs have succeeded (this is the one that failed in this instance) • After fixing the error, when I press restart I get an error saying ◦ This then returns a list of all the flow runs on this flow id Question: • What would I need to specify in the task parameter so that it only re-runs the latest flow when I press restart on the UI? Thank you so much in advance for your time and help! 😄
k

Kevin Kho

05/16/2022, 3:06 PM
Hi @Tanasorn Chindasook, could you move the error to the thread when you get a chance so we dont crowd the main channel? In Prefect 1, it’s hard to restart a failed child flow run. See the full writeup here. Are you restarting the subflow and then running this wait? How did you fix the error? Because you needed to re-register right? I’m not sure you can re-run an archived flow.
t

Tanasorn Chindasook

05/17/2022, 9:05 AM
Edited to move error to the thread!
Copy code
Task 'wait_for_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 884, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 264, in wait_for_flow_run
    flow_run = FlowRunView.from_flow_run_id(flow_run_id)
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 571, in from_flow_run_id
    flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 629, in _query_for_flow_run
    f"Found multiple ({len(flow_runs)}) flow runs while querying for flow "
ValueError: Found multiple (134) flow runs while querying for flow runs
@Kevin Kho Thank you so much for your reply and for linking me to the write up! I will take a look and try to apply the solution 🙂 So the error was thrown on DBT's side... I just simply fixed the DBT code (we use dbt cloud so it was easy to trigger the job via the dbt cloud prefect job)
a

Anna Geller

05/17/2022, 12:09 PM
@Tanasorn Chindasook are you good now?
generally speaking, restarting child flow runs from a parent flow is hard, but the main benefit of the flow of flows is that each part of this "parent flow" is an independent flow that can be triggered individually. So you may just restart this failed dbt flow when needed and if you need to rerun multiple ones you could even run a bash command from a terminal:
Copy code
prefect run --name flow1 --watch
prefect run --name flow2 --watch
prefect run --name flow3 --watch
this will trigger all 3 flows one after the other and the watch will ensure to start the next one only if the previous one is completed - you could do the same from Python with create_flow_run + wait_for_flow_run tasks
1
5 Views