Thread
#prefect-community
    João Amorim

    João Amorim

    1 year ago
    Other issue, is in a flow that runs another flows, he is triggering a fail in some of the sub flows, but in prefect ui, there are no fails in the histories of these subflows... First occurs:
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    and after some time, other error shows:
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID B: provided a running state but associated flow run A is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    does anyone have any guesses as to what is causing this? 😅
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @João Amorim, are you calling
    flow.run()
    in your flow by chance? Is your flow small enough to share?
    João Amorim

    João Amorim

    1 year ago
    I don't understand, what do you mean with "flow by chance"? The flow:
    import prefect
    import datetime
    
    from prefect.tasks.prefect.flow_run import FlowRunTask
    from prefect import Flow
    
    def roundTime(dt=None, roundTo=60):
       if dt == None : dt = datetime.datetime.now()
       seconds = (dt.replace(tzinfo=None) - dt.min).seconds
       rounding = (seconds+roundTo/2) // roundTo * roundTo
    
       dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
       return dt.replace(tzinfo=datetime.timezone.utc)
    
    with Flow("GPM: Download, enrichment and classify") as flow:
        dt = roundTime(roundTo=30*60) # round to 30 and 30 min
        date = datetime.datetime.strftime(dt, "%Y-%m-%d")
        time = datetime.datetime.strftime(dt, "%H:%M")
    
        downloadL_flow = FlowRunTask(flow_name='Download GPM',
                                   project_name="A",
                                   new_flow_context=prefect.context.get('config'),
                                   wait=True,
                                   parameters={'type_data': "LATE"})
    
    
        downloadE_flow = FlowRunTask(flow_name='Download GPM',
                                   project_name="CPFL",
                                   new_flow_context=prefect.context.get('config'),
                                   wait=True,
                                   parameters={'type_data': "EARLY"})
    
    
        enrichment_flow = FlowRunTask(flow_name='Enrichment GPM',
                                           project_name="A",
                                           new_flow_context=prefect.context.get('config'),
                                           wait=True,
                                           parameters={'date': date ,
                                               'time': time })
    
        ia_flow = FlowRunTask(flow_name='Classify GPM',
                                           project_name="A",
                                           new_flow_context=prefect.context.get('config'),
                                           wait=True,
                                           parameters={'date': date ,
                                               'time': time })
    
        downloadL_flow.set_downstream(downloadE_flow)
        downloadE_flow.set_downstream(enrichment_flow)
        enrichment_flow.set_downstream(ia_flow)
    Sometimes this works, and sometimes this fail after the first subflow
    Kevin Kho

    Kevin Kho

    1 year ago
    Remove “by chance”, taking a look
    How did you register this?
    João Amorim

    João Amorim

    1 year ago
    python3 -m prefect register --project "A" -p /home/hi/app/ -l "03"  --watch --force> /home/hi/flow_log.txt &
    Kevin Kho

    Kevin Kho

    1 year ago
    The
    FlowRunTask
    doesn’t have the
    wait
    parameter here . Did you want to use
    StartFlowRun
    instead here. I think it’s not respecting waiting for the subflows to run because of this.
    Also -
    FlowRunTask(flow_name='Classify GPM',
                                           project_name="A",
                                           new_flow_context=prefect.context.get('config'),
                                           wait=True,
                                           parameters={'date': date ,
                                               'time': time })
    doesn’t call the run method. This is just the init. You might want another
    ()
    at the end like
    a = FlowRunTask(…)()
    João Amorim

    João Amorim

    1 year ago
    what option is better? use the the
    StartFlowRun
    or the
    ()
    in
    FlowRunTask
    ? Make sense this point, but just is weird that is working sometimes now kkk
    Kevin Kho

    Kevin Kho

    1 year ago
    StartFlowRun
    will be used the same way.
    a = StartFlowRun(…)()
    .
    I actually only see people use StartFlowRun instead of FlowRunTask
    João Amorim

    João Amorim

    1 year ago
    Okay, thanks
    So, this dont resolve my issue, i still with the same problem when this run in the scheduler... what I don't understand, is that when the flow starts by the scheduler, 3 processes are submitted for execution (like the picture in my other issue), but when started in the UI by the button, it only starts one process. In the manual execution, all occurs ok, but in the scheduler sometimes have fails...
    Kevin Kho

    Kevin Kho

    1 year ago
    When you go the dashboard for that subflow, do you see 3 separate flow runs created?
    João Amorim

    João Amorim

    1 year ago
    No, just one
    Like i tried explain in the starts, seems like the process "lost the connection". Because the previous flow done with success, but not trigger the next flow, but some times all works...
    Kevin Kho

    Kevin Kho

    1 year ago
    Looking into this now…just so I understand correctly. We have two separate issues. One is the triple submissions of the flow run and second is this TriggerFailed right?
    Even successful flows are running two processes?
    Are these scheduled runs or are they using the “Quick Run”?
    João Amorim

    João Amorim

    1 year ago
    the problems occurs with scheduled runs, with the "quick run" i don't have this issue... This triple submission seems just in the start, after he print the log three times, but all with the same uuid... the sucessful flows with the scheduled show two processes, in the quick run don't the log of one fail (I don't know if it is too big to post):
    7 June 2021,01:31:29 	ia	INFO	Submitted for execution: PID: 61924
    7 June 2021,01:31:29 	ia	INFO	Submitted for execution: PID: 61925
    7 June 2021,01:31:29 	ia	INFO	Submitted for execution: PID: 61926
    7 June 2021,01:31:30 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'GPM: Download, enrichment and classification'
    7 June 2021,01:31:30 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'GPM: Download, enrichment and classification'
    7 June 2021,01:31:30 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'GPM: Download, enrichment and classification'
    7 June 2021,01:31:30 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
    7 June 2021,01:31:30 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
    7 June 2021,01:31:30 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
    7 June 2021,01:31:30 	prefect.CloudTaskRunner	ERROR	Unexpected error: ClientError([{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
        return run_method(self, *args, **kwargs)
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 177, in run
        flow_run_id = client.create_flow_run(
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/client/client.py", line 1108, in create_flow_run
        res = self.graphql(create_mutation, variables=dict(input=inputs))
      File "/home/hi/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    7 June 2021,01:31:30 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/AAAAAAAAA8e66-82bb58f3cf6b>
    7 June 2021,01:31:31 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/AAAAAAAAA8e66-82bb58f3cf6b>
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Success'
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Success'
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Starting task run...
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Starting task run...
    7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:31 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
    7 June 2021,01:31:31 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/BBBBBBBBBBBBBB-aefa7320bfed>
    7 June 2021,01:31:31 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/BBBBBBBBBBBBBB-aefa7320bfed>
    7 June 2021,01:31:45 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    7 June 2021,01:31:45 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    7 June 2021,01:31:51 	prefect.CloudTaskRunner	INFO	FAIL signal raised: FAIL('BBBBBBBBBBBBBB-aefa7320bfed finished in state <Failed: "Some reference tasks failed.">')
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	FAIL signal raised: FAIL('BBBBBBBBBBBBBB-aefa7320bfed finished in state <Failed: "Some reference tasks failed.">')
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Starting task run...
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Starting task run...
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Starting task run...
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:52 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    7 June 2021,01:31:52 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Starting task run...
    7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
    7 June 2021,01:31:52 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
    7 June 2021,01:31:52 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
    Kevin Kho

    Kevin Kho

    1 year ago
    Is that flow you showed me the whole code? I’ve seen this once where the person was configuring their own logger (though that wouldn’t explain why it works for quick run). Are you configuring any loggers on your own?
    João Amorim

    João Amorim

    1 year ago
    yes, it's the flow I sent earlier
    no, just in the subflows i get the context and use the logger.info from prefect...
    Kevin Kho

    Kevin Kho

    1 year ago
    I don’t think this is related, but how did you set the schedule?
    João Amorim

    João Amorim

    1 year ago
    in the UI
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok let me write a simple flow that has the pattern you have and then we can work off that and see if we get the same behavior for that
    Sorry I realized something off about your flow
    These three lines will be evaluated during build time, which means that they will take the values at the time of registration. You won’t get the time during execution. If you need them to be execution time, they need to be in a task because tasks are executed during run time.
    dt = roundTime(roundTo=30*60) # round to 30 and 30 min
        date = datetime.datetime.strftime(dt, "%Y-%m-%d")
        time = datetime.datetime.strftime(dt, "%H:%M")
    So those parameters to downstream flows will always take the time of the registration. I assume that’s not what you wanted right?
    I also have a simple flow here that you can try and see if you still get the extra logs? I would fix the datetime issue before this though. I assume this datetime issue might be causing flow failures
    from prefect import Flow, task
    from prefect.tasks.prefect.flow_run import StartFlowRun
    import prefect
    
    @task
    def test(x):
        return x+1
    
    with Flow("flowA") as flow1:
        test(1)
    
    with Flow("flowB") as flow2:
        test(1)
    
    with Flow("flowC") as flow3:
        test(1)
    
    with Flow("flowD") as flow4:
        test(1)
    
    flow1.register("omlds")
    flow2.register("omlds")
    flow3.register("omlds")
    flow4.register("omlds")
    
    
    
    with Flow("main") as flow:
        a = StartFlowRun("flowA", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         parameters={'x': 1})()
    
        b = StartFlowRun("flowA", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         parameters={'x': 2})()
    
        c = StartFlowRun("flowA", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         parameters={'x': 3})()
    
        d = StartFlowRun("flowA", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         parameters={'x': 4})()
    
        a.set_downstream(b)
        b.set_downstream(c)
        c.set_downstream(d)
    
    flow.register("omlds")
    João Amorim

    João Amorim

    1 year ago
    Yes, dont is the time of the register... but this is working fine, he is running that three lines in all new flow... I don't put this in a task, because i don't understand how pass the return from a task like a parameter from the flows... i want to set the same datetime for all flows, so how i can do this?
    Kevin Kho

    Kevin Kho

    1 year ago
    Sorry I’m a bit confused. You want it to be set for all flows as the registration time or execution time?
    João Amorim

    João Amorim

    1 year ago
    in the execution time
    Kevin Kho

    Kevin Kho

    1 year ago
    Let me write a sample for you
    João Amorim

    João Amorim

    1 year ago
    but weird, because this work on "quick run"... I put a logger in the subflow to see the date and time:
    Time of the set time! date=2021-06-07 | time=18:00
    this seems to work, but I agree with you that it should be a task in the main flow
    Kevin Kho

    Kevin Kho

    1 year ago
    A couple of things you need to do. 1. make
    roundTime
    a task. 2. make the
    get_time
    and
    get_date
    into tasks. 3. use the tasks inside your Flow. 4. Tasks are deferred so you need to pass them as parameters in the
    run
    method. Full example:
    from prefect import Flow, task
    from prefect.tasks.prefect.flow_run import StartFlowRun
    import prefect
    import datetime
    
    @task
    def roundTime(dt=None, roundTo=60):
       if dt == None : dt = datetime.datetime.now()
       seconds = (dt.replace(tzinfo=None) - dt.min).seconds
       rounding = (seconds+roundTo/2) // roundTo * roundTo
       dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
       return dt.replace(tzinfo=datetime.timezone.utc)
    
    @task
    def get_date(dt):
        return datetime.datetime.strftime(dt, "%Y-%m-%d")
    
    @task
    def get_time(dt):
        return datetime.datetime.strftime(dt, "%H:%M")
    
    @task
    def test(x):
        return x
    
    with Flow("flowA") as flow1:
        test(1)
    
    with Flow("flowB") as flow2:
        test(1)
    
    with Flow("flowC") as flow3:
        test(1)
    
    with Flow("flowD") as flow4:
        test(1)
    
    flow1.register("omlds")
    flow2.register("omlds")
    flow3.register("omlds")
    flow4.register("omlds")
    
    
    
    with Flow("main") as flow:
        dt = roundTime(roundTo=30*60) # round to 30 and 30 min
        date = get_date(dt)
        time = get_time(dt)
    
        a = StartFlowRun("flowA", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True)(parameters={'x': date})
    
        b = StartFlowRun("flowB", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True)(parameters={'x': time})
    
        c = StartFlowRun("flowC", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         parameters={'x': 3})()
    
        d = StartFlowRun("flowD", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         parameters={'x': 4})()
    
        a.set_downstream(b)
        b.set_downstream(c)
        c.set_downstream(d)
    
    # flow.register("omlds")
    flow.run()
    Oh I see. My mistake. I guess you’re fine because the function is deferred and takes care of getting the time with the
    datetime.now()
    . Can you try my simple flow and see if you get the multiple logs?
    João Amorim

    João Amorim

    1 year ago
    yes I will test it now, but thanks for the example!!
    Kevin Kho

    Kevin Kho

    1 year ago
    Of course! We’ll try to fix the other issues!
    João Amorim

    João Amorim

    1 year ago
    This continue starting more than one at starts... But, i run this 3 times, have 2 fails (one with the same problem and other because my script), and one a success. I will leave this scheduled flow running until tomorrow, to verify if it will work or not, and tomorrow I will give you a feedback about it.
    Kevin Kho

    Kevin Kho

    1 year ago
    Are you on server or cloud? And do you have multiple agents running at the same time?
    You may be running into this
    João Amorim

    João Amorim

    1 year ago
    Hi, the problem persists, the css of github is offline for me, in an hour or two I'll take a look at this issue the server is on cloud, using different agents (in different machines/VM)
    Now i can see the github... i have diferents agents in diferents machines, but each machine run different a flow an example: i have a machine for each step at ETL process, one for run flows to extract values, other machine to tranform this values and other to load this values. So, in each machine i have a agent running, and this flow that i am tryng to create need to trigger the ETL process... So my main flow need to trigger the subflows for each step.
    But, the i continue with this issue of have a lot of submitted for execution at the start of the flow in other flow too... This is just a flow of transform in my data, that run every hour... When the flow starts the logs show:
    Consequently this flow (from the print screen) go fail in a step of my script because more than one process tries to read/write the same file...
    I don't know if both problems are correlated with this strange behavior of being submitted to several runs at the beginning... But, when run at "quick run", just submit an run, and all work well
    Kevin Kho

    Kevin Kho

    1 year ago
    What Prefect version are you on?
    João Amorim

    João Amorim

    1 year ago
    prefect==0.14.20
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok will ask the team for more ideas here
    Can I have a flow run ID that contains multiple processes?
    When you said “the server is on cloud”, do you mean you’re using
    <http://cloud.prefect.io|cloud.prefect.io>
    ? Or did you spin up Prefect Server and host it on the cloud?
    João Amorim

    João Amorim

    1 year ago
    yes, i am using cloud.prefect.io
    Kevin Kho

    Kevin Kho

    1 year ago
    Sounds good. Can I have a flow run ID?
    João Amorim

    João Amorim

    1 year ago
    how i get this? its the uuid at the URL?
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes
    João Amorim

    João Amorim

    1 year ago
    3c97f2a5-5a20-4250-b3fa-170ae9742fc3
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok the multiple processes might be a bug on the Prefect side, but the recommendation also is to not run 4 Local Agents. One agent should be enough to handle your needs. The problem is multiple agents picking up the same Flow. You can use Labels to isolate which agents pick up which flows.
    João Amorim

    João Amorim

    1 year ago
    But i have different PC / virtual machines, how i can use just one agent?
    Kevin Kho

    Kevin Kho

    1 year ago
    If that is the case, then Labels will help you isolate which VMs can take which Flows. Do you want the Flow to be able to be pickled up by any of those PCs or virtual machines?
    Or were you aware of Labels already?
    João Amorim

    João Amorim

    1 year ago
    No, no i have specifics flows for specific VM... and i like to use a main flow to run this flows in each VM, this is possible?
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes this is possible! So first, we need to understand that labels allow you to specify which flows can be picked up by which agents. And then we can add labels to the
    RunConfig
    that we pass to
    StartFlowRun
    from prefect import Flow, task
    from prefect.tasks.prefect.flow_run import StartFlowRun
    import prefect
    @task
    def test(x):
        return x+1
    with Flow("flowA") as flow1:
        test(1)
    with Flow("flowB") as flow2:
        test(1)
    
    flow1.register("omlds")
    flow2.register("omlds")
    
    with Flow("main") as flow:
        a = StartFlowRun("flowA", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         wait=True,
                         run_config=LocalRun(labels=["vm1"]),
                         parameters={'x': 1})()
        b = StartFlowRun("flowB", "omlds", 
                         new_flow_context=prefect.context.get('config'),
                         run_config=LocalRun(labels=["vm2"]),
                         wait=True,
                         parameters={'x': 2})()
    
        b.set_upstream(a)
    flow.register("omlds")
    Now flowA will run on the agent with label vm1 and flowB will run on the agent with label vm2
    João Amorim

    João Amorim

    1 year ago
    Okay, i go try this
    Kevin Kho

    Kevin Kho

    1 year ago
    You need to start the VM agents again with the appropriate labels
    Sorry this has taken a while to resolve but thanks for being patient!
    João Amorim

    João Amorim

    1 year ago
    i already have labels on VMs, i just don't is using this
    seems to have decreased the number of failures, but the problem persists the fails is:
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    Trigger was "all_successful" but some of the upstream tasks failed.
    Kevin Kho

    Kevin Kho

    1 year ago
    Do you get this everytime you run the Flow?
    João Amorim

    João Amorim

    1 year ago
    only when it fails
    Kevin Kho

    Kevin Kho

    1 year ago
    So sometimes it succeeds and sometimes it fails?
    João Amorim

    João Amorim

    1 year ago
    yes
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok asking the team about this
    How often are you running this? It seems you’re running the same Flow in parallel on multiple agents? We need to provide a unique identifier so that these don’t hit each other on Prefect cloud. It seems like passing an idempotency key will make sure that the Flow run being created by StartFlowRun is a new one. You can do this with
    StartFlowRun()(idempotency_key="any_string")
    . Just make sure to use something like the datetime that will give a unique run each time.
    João Amorim

    João Amorim

    1 year ago
    This flow runs every 40 minutes, it takes between 5 and 10 minutes to successfully execute all the steps... so this never run in parallel
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok I think this is related to the relatively frequent runs. I think the idempotency_key should fix this issue.
    João Amorim

    João Amorim

    1 year ago
    i can use:
    StartFlowRun()(idempotency_key=datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d__%H-%M-%S--%f"))
    or need to put this datetime in a task?
    Kevin Kho

    Kevin Kho

    1 year ago
    I think a task will be safer here
    João Amorim

    João Amorim

    1 year ago
    Okay
    now this is trigging the same subflows to run three times, always two fail and one success, but this make the main flow fail... i really don't know why in the "quick run" the flow works and in the scheduled have this issue...
    Here is the subflow that was executed three times, as I said above...
    Chris White

    Chris White

    1 year ago
    Hi @João Amorim I'm having trouble following your use case here; there are two Prefect concepts you should read about: flow labels: these associate each run of a flow with a particular agent; to run a flow on 3 different VMs, you should have 3 different agents, each with their own unique labels, and then create 3 different runs with the correspond label sets. Your "parent" flow that is calling
    StartFlowRun
    also needs to be associated with one and only one agent via labels. • run idempotency keys: these ensure that you don't accidentally create duplicate runs unintentionally. Prefect's
    StartFlowRun
    task automatically uses its own task run ID as an idempotency key, but you can override this for other purposes Also note, if you are running your flow on multiple machines, you need to make sure that you use a storage class that is available on all 3 machines.
    João Amorim

    João Amorim

    1 year ago
    I think the only thing I didn't get to touch was this storage class...
    When i try use the idempotency key, this generate the last issue above and always make my main flow fail... I made some modifications in my subflows, and remove the idempotency key at the main flow, in the last few hours I have stopped having failures... let's see if this will continue to work in the next hours 😉
    the unique fail in the last hours, is still the same as when I opened this thread: in the log i get:
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    and in the UI dashboard and schematic says
    "TriggerFailed"