I’d like to be able to run a flow from a set of st...
# prefect-community
m
I’d like to be able to run a flow from a set of start tasks. I understand this used to be supported but might not be any more. What’s the best way to go about this?
k
I’m not following when you say “set of start tasks”. Do you mean like event based flows?
Are those Prefect tasks or external events?
m
Apologies. I mean for a set of tasks within the flow. Right now, we have the ability to restart a flow from a single task, or restart and flow from all failed tasks. I’d like to be able to pick specific tasks, regardless of state, from which to restart the flow.
k
Ohhh I think this was never supported because we just recommend a flow of flows, but I can list this as an idea for Orion @Marvin open “Idea - Start a Flow Run from a Middle Task Onwards”
m
Hm. The FlowRunner used to have a start_tasks parameter no?
On a related note, if I take a successful flow and mark two tasks as failed, then restart the flow run, it doesn’t execute those two failed tasks. It just changes the flow back to Success.
k
Will take a look. Try marking the Flow as failed also before you restart and see if that kicks them off?
Am going a while back and I don’t see a
start_tasks
parameter. I joined Prefect around 0.14.12 and nothing like that rings a bell
m
Thanks Kevin. I tried marking the flow as failed but it still does not seem to properly pick up the failed tasks on restart.
k
Will try this out myself.
So I have a flow like this:
Copy code
from prefect import Flow, task
import prefect

@task
def abc():
     <http://prefect.context.logger.info|prefect.context.logger.info>("I RAN")
     return 1

@task
def bcd():
     <http://prefect.context.logger.info|prefect.context.logger.info>("I RAN")
     return 1

@task
def cde():
     <http://prefect.context.logger.info|prefect.context.logger.info>("I RAN")
     return 1

with Flow("resukt") as flow:
     abc()
     bcd()
     cde()
and I ran it and it succeeded. Marked
bcd
and
cde
as failed. Restarted on the flow run page and it kicked off again. Images of logs below. One shows the restart and one shows the re-run.
Did not have to mark the Flow as failed. When you say it doesn’t run, is it changed to a scheduled state or left in a Failed state?
m
Can you try making cde downstream of bcd downstream of abc? Then mark bcd as failed and restart
k
Will try
I can still restart the same way yep
m
Hmm that’s odd. I used your code and reproduced. When I created dependencies and marked the middle task as failed, restarting did not work. The flow transitioned to a success state, and the failed task transitioned to pending but never executed.
k
Ok i’ll try marking just the middle in a bit
m
Yeah or try clicking on the root task and restarting the flow run from that task
k
Why do you want to restart it from there instead of the flow level?
Restarting just the middle task works also (restarting on flow level)
m
We’ll there could be 10+ root nodes in the flow but I might just want to restart from one of them
Interesting
k
I think I observe what you are saying, but will have a call for the next hour, and then will come back to it
m
Thanks
k
I realized the attempt here. You’re trying to start from a middle task with the retry mechanism right? That’s pretty clever.
So in A->B->C if I mark b as failed. I can restart from B but not from A. I think instead of A, you’d have to restart from the Flow
m
In A-B-C if I mark B as failed then restarting the flow does not work
I can restart from A but not B
The flow goes straight to a success state and doesn’t run any tasks
k
I am utterly confused. Failing task B only and then clicking the restart button from the UI starts it and succeeds. Are you sure it’s just not the delay in restarting? It can take a couple of seconds
m
Are you restarting the task or the flow run?
I am setting the task to failed and then restarting the flow run (from the flow run page)
This is with a Kubernetes run
What happens is that the failed task (B) transitions to a pending state, but never actually runs and the flow transitions to a success state
k
Let me check real quick
3 tests. A->B->C I marked B as failed. 1. Re-run from A - nothing happens 2. Re-run from B - B runs and completes 3. Re-run from Flow - B runs and completes Is the intent here to re-run a section of a Flow? I really think the Flow-of-flows might serve you better if you’re seeing different behavior
m
The intent here is to rerun a section of a flow, but a flow of flows would not be better in the more complex cases. Your third test is what we’re looking for but is not happening for us. I’ve escalated to Darren
k
Ah ok sounds good! Yes they should be able to help you more in-depth
m
Hi Kevin. Can I cc you on this email? Prefect support is saying that “retroactively manipulating the tasks outside of the context of a flow is not supported”, yet it seems to me that you’ve been able to successfully do this. I am quite confused.
k
Yep CC me and I can provide more context there
m
Thanks so much Kevin, I really appreciate it. What’s hour email?
your*
k
I talked to Bianca and went over this thread to give more context. She’ll handle and get resources to work through this with you.
m
Thanks so much Kevin.