Hi everyone. I have a flow that triggers another f...
# ask-community
p
Hi everyone. I have a flow that triggers another flow using
create_flow_run
and
get_task_run_result
. I am using these two tasks inside of another task so this can be retried as a unit (see code in thread). Last night, the child flow was triggered three times (triggered by the retries in the parent flow) and all three times the flow succeeded so it looks like there was an issue where the parent flow misread the child flow's status. I am attaching the logs in the thread. Am I misusing these tasks? Any ideas as to why this could be happening? This has been running every night for a couple of weeks and it's the first time this problem happens. Thanks!
triggering child flow.py
log showing retries.txt
s
@Pedro Machado It looks like you were experiencing connection errors to your postgres backend. If you deploy on Kubernetes with the Prefect defaults, depending on load it's really easy to lock up your backend resources and hit connection errors. You may need to increase the number of max connections your Postgres server can accept and/or increase the number of workers your graphql app uses. Depending on your use case, it might be helpful to use Kubernetes based tasks to manage sub jobs https://docs.prefect.io/api/latest/tasks/kubernetes.html#createnamespacedjob
p
Hi Sam. Thanks for your reply. I am confused. We are using prefect cloud. Although I did not set up the Kubernetes piece, I am not aware of any postgres database involved in the set up.
s
Ah, my mistake. I assumed from the KubernetesRun run config that you had an entire on-prem Prefect kubernetes deployment. There's still connection errors to postgres in your logs, though, that would imply some issue on the cloud side in that case
Copy code
prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'connection error', 'extensions': {'path': '$', 'code': 'postgres-error', 'exception': {'message': 'connection error'}}}]
p
Got it. What could I do to make the code more robust against this type of issue? This child flow is a long running process that while idempotent, it should not run repeatedly. I will be setting flow limit labels but I wonder... What would be a good pattern to ensure it does not run more than once per day?
s
I'd say unless you absolutely need the first flow to report the result of the second, just get rid of the part where it checks the child flow run result. That way, you decouple the run of the two. In that case, the first will still error if it fails to create the child flow, but wont error out and retry if something goes wrong after that. You could also implement a custom on_failure routine for the task to handle failure cases and set the task state there so you don't retry unnecessarily.
p
I do need the dependency. Which take would have the on failure handling? I suppose the parent one. Correct? The issue here is that the parent flow thinks the child failed but it didn't. Could you comment a little more on how this would work?
k
Yeah I think this should be solved with
idempotency_key
so that the retries will just trigger the same flow run. I think I’ll open a ticket for this. Maybe you can try using
StartFlowRun
because that uses idempotency key’s by default, so one one flow run will trigger in a given day unless you specify otherwise
z
Note that this connection error was due to a service disruption in Prefect Cloud -- we're rolling out retries for that particular error code very soon.
As Kevin has pointed out, the
idempotency_key
is the solution but it's not exposed on
create_flow_run
yet. It's a 3 line change if anyone is interested in contributing!
p
Thanks guys.
Yeah I think this should be solved with 
idempotency_key
 so that the retries will just trigger the same flow run.
In this case would Prefect Cloud ignore requests to start a new flow run if the subsequent requests have the same
idempotency_key
? What would you set
idempotency_key
to when triggering the child flow?
z
Yes. The
task_run_id
is a good choice if you want to avoid retriggering on retries.
p
Hi Michael. As far as making the change and opening a PR ... I'll try to find some time in the next week or so. Do we just have to add the
idempotency_key
to the
create_flow_run
task and then forward it to the client call in this segment? https://github.com/PrefectHQ/prefect/blob/b79d004cfba878b73d459fec1bc0f2cd52e9d050/src/prefect/tasks/prefect/flow_run.py#L132-L140 I'd also implement the default logic used here: https://github.com/PrefectHQ/prefect/blob/b79d004cfba878b73d459fec1bc0f2cd52e9d050/src/prefect/tasks/prefect/flow_run.py#L413-L414
z
I actually don't like the default logic from the old task so I'd prefer not to implement that by default, perhaps a boolean
rerun_on_retry
would be better? Otherwise, yeah it's just passing the key down to the client call