Other issue, is in a flow that runs another flows,...
# ask-community
j
Other issue, is in a flow that runs another flows, he is triggering a fail in some of the sub flows, but in prefect ui, there are no fails in the histories of these subflows... First occurs:
Copy code
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
and after some time, other error shows:
Copy code
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID B: provided a running state but associated flow run A is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
does anyone have any guesses as to what is causing this? šŸ˜…
k
Hey @João Amorim, are you calling
flow.run()
in your flow by chance? Is your flow small enough to share?
j
I don't understand, what do you mean with "flow by chance"? The flow:
Copy code
import prefect
import datetime

from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow

def roundTime(dt=None, roundTo=60):
   if dt == None : dt = datetime.datetime.now()
   seconds = (dt.replace(tzinfo=None) - dt.min).seconds
   rounding = (seconds+roundTo/2) // roundTo * roundTo

   dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
   return dt.replace(tzinfo=datetime.timezone.utc)

with Flow("GPM: Download, enrichment and classify") as flow:
    dt = roundTime(roundTo=30*60) # round to 30 and 30 min
    date = datetime.datetime.strftime(dt, "%Y-%m-%d")
    time = datetime.datetime.strftime(dt, "%H:%M")

    downloadL_flow = FlowRunTask(flow_name='Download GPM',
                               project_name="A",
                               new_flow_context=prefect.context.get('config'),
                               wait=True,
                               parameters={'type_data': "LATE"})


    downloadE_flow = FlowRunTask(flow_name='Download GPM',
                               project_name="CPFL",
                               new_flow_context=prefect.context.get('config'),
                               wait=True,
                               parameters={'type_data': "EARLY"})


    enrichment_flow = FlowRunTask(flow_name='Enrichment GPM',
                                       project_name="A",
                                       new_flow_context=prefect.context.get('config'),
                                       wait=True,
                                       parameters={'date': date ,
                                           'time': time })

    ia_flow = FlowRunTask(flow_name='Classify GPM',
                                       project_name="A",
                                       new_flow_context=prefect.context.get('config'),
                                       wait=True,
                                       parameters={'date': date ,
                                           'time': time })

    downloadL_flow.set_downstream(downloadE_flow)
    downloadE_flow.set_downstream(enrichment_flow)
    enrichment_flow.set_downstream(ia_flow)
Sometimes this works, and sometimes this fail after the first subflow
k
Remove ā€œby chanceā€, taking a look
How did you register this?
j
python3 -m prefect register --project "A" -p /home/hi/app/ -l "03"  --watch --force> /home/hi/flow_log.txt &
k
The
FlowRunTask
doesn’t have the
wait
parameter here . Did you want to use
StartFlowRun
instead here. I think it’s not respecting waiting for the subflows to run because of this.
šŸ™Œ 1
Also -
Copy code
FlowRunTask(flow_name='Classify GPM',
                                       project_name="A",
                                       new_flow_context=prefect.context.get('config'),
                                       wait=True,
                                       parameters={'date': date ,
                                           'time': time })
doesn’t call the run method. This is just the init. You might want another
()
at the end like
a = FlowRunTask(…)()
šŸ™Œ 1
j
what option is better? use the the
StartFlowRun
or the
()
in
FlowRunTask
? Make sense this point, but just is weird that is working sometimes now kkk
k
StartFlowRun
will be used the same way.
a = StartFlowRun(…)()
.
I actually only see people use StartFlowRun instead of FlowRunTask
āœ… 1
j
Okay, thanks
So, this dont resolve my issue, i still with the same problem when this run in the scheduler... what I don't understand, is that when the flow starts by the scheduler, 3 processes are submitted for execution (like the picture in my other issue), but when started in the UI by the button, it only starts one process. In the manual execution, all occurs ok, but in the scheduler sometimes have fails...
k
When you go the dashboard for that subflow, do you see 3 separate flow runs created?
j
No, just one
Like i tried explain in the starts, seems like the process "lost the connection". Because the previous flow done with success, but not trigger the next flow, but some times all works...
k
Looking into this now…just so I understand correctly. We have two separate issues. One is the triple submissions of the flow run and second is this TriggerFailed right?
Even successful flows are running two processes?
Are these scheduled runs or are they using the ā€œQuick Runā€?
j
the problems occurs with scheduled runs, with the "quick run" i don't have this issue... This triple submission seems just in the start, after he print the log three times, but all with the same uuid... the sucessful flows with the scheduled show two processes, in the quick run don't the log of one fail (I don't know if it is too big to post):
Copy code
7 June 2021,01:31:29 	ia	INFO	Submitted for execution: PID: 61924
7 June 2021,01:31:29 	ia	INFO	Submitted for execution: PID: 61925
7 June 2021,01:31:29 	ia	INFO	Submitted for execution: PID: 61926
7 June 2021,01:31:30 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'GPM: Download, enrichment and classification'
7 June 2021,01:31:30 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'GPM: Download, enrichment and classification'
7 June 2021,01:31:30 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'GPM: Download, enrichment and classification'
7 June 2021,01:31:30 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:30 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:30 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:30 	prefect.CloudTaskRunner	ERROR	Unexpected error: ClientError([{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
    return run_method(self, *args, **kwargs)
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 177, in run
    flow_run_id = client.create_flow_run(
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/client/client.py", line 1108, in create_flow_run
    res = self.graphql(create_mutation, variables=dict(input=inputs))
  File "/home/hi/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
7 June 2021,01:31:30 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/AAAAAAAAA8e66-82bb58f3cf6b>
7 June 2021,01:31:31 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/AAAAAAAAA8e66-82bb58f3cf6b>
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Success'
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Success'
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Starting task run...
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Starting task run...
7 June 2021,01:31:31 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:31 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
7 June 2021,01:31:31 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/BBBBBBBBBBBBBB-aefa7320bfed>
7 June 2021,01:31:31 	prefect.Flow Download GPM	INFO	Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/BBBBBBBBBBBBBB-aefa7320bfed>
7 June 2021,01:31:45 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:45 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:51 	prefect.CloudTaskRunner	INFO	FAIL signal raised: FAIL('BBBBBBBBBBBBBB-aefa7320bfed finished in state <Failed: "Some reference tasks failed.">')
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	FAIL signal raised: FAIL('BBBBBBBBBBBBBB-aefa7320bfed finished in state <Failed: "Some reference tasks failed.">')
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Starting task run...
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Starting task run...
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Starting task run...
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:52 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Starting task run...
7 June 2021,01:31:52 	prefect.CloudTaskRunner	INFO	Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 	prefect.CloudFlowRunner	WARNING	Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:52 	prefect.CloudFlowRunner	INFO	Flow run FAILED: some reference tasks failed.
k
Is that flow you showed me the whole code? I’ve seen this once where the person was configuring their own logger (though that wouldn’t explain why it works for quick run). Are you configuring any loggers on your own?
j
yes, it's the flow I sent earlier
no, just in the subflows i get the context and use the logger.info from prefect...
k
I don’t think this is related, but how did you set the schedule?
j
in the UI
k
Ok let me write a simple flow that has the pattern you have and then we can work off that and see if we get the same behavior for that
šŸ¤™ 1
Sorry I realized something off about your flow
These three lines will be evaluated during build time, which means that they will take the values at the time of registration. You won’t get the time during execution. If you need them to be execution time, they need to be in a task because tasks are executed during run time.
Copy code
dt = roundTime(roundTo=30*60) # round to 30 and 30 min
    date = datetime.datetime.strftime(dt, "%Y-%m-%d")
    time = datetime.datetime.strftime(dt, "%H:%M")
So those parameters to downstream flows will always take the time of the registration. I assume that’s not what you wanted right?
I also have a simple flow here that you can try and see if you still get the extra logs? I would fix the datetime issue before this though. I assume this datetime issue might be causing flow failures
Copy code
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect

@task
def test(x):
    return x+1

with Flow("flowA") as flow1:
    test(1)

with Flow("flowB") as flow2:
    test(1)

with Flow("flowC") as flow3:
    test(1)

with Flow("flowD") as flow4:
    test(1)

flow1.register("omlds")
flow2.register("omlds")
flow3.register("omlds")
flow4.register("omlds")



with Flow("main") as flow:
    a = StartFlowRun("flowA", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     parameters={'x': 1})()

    b = StartFlowRun("flowA", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     parameters={'x': 2})()

    c = StartFlowRun("flowA", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     parameters={'x': 3})()

    d = StartFlowRun("flowA", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     parameters={'x': 4})()

    a.set_downstream(b)
    b.set_downstream(c)
    c.set_downstream(d)

flow.register("omlds")
j
Yes, dont is the time of the register... but this is working fine, he is running that three lines in all new flow... I don't put this in a task, because i don't understand how pass the return from a task like a parameter from the flows... i want to set the same datetime for all flows, so how i can do this?
k
Sorry I’m a bit confused. You want it to be set for all flows as the registration time or execution time?
j
in the execution time
k
Let me write a sample for you
j
but weird, because this work on "quick run"... I put a logger in the subflow to see the date and time:
Time of the set time! date=2021-06-07 | time=18:00
this seems to work, but I agree with you that it should be a task in the main flow
k
A couple of things you need to do. 1. make
roundTime
a task. 2. make the
get_time
and
get_date
into tasks. 3. use the tasks inside your Flow. 4. Tasks are deferred so you need to pass them as parameters in the
run
method. Full example:
Copy code
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect
import datetime

@task
def roundTime(dt=None, roundTo=60):
   if dt == None : dt = datetime.datetime.now()
   seconds = (dt.replace(tzinfo=None) - dt.min).seconds
   rounding = (seconds+roundTo/2) // roundTo * roundTo
   dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
   return dt.replace(tzinfo=datetime.timezone.utc)

@task
def get_date(dt):
    return datetime.datetime.strftime(dt, "%Y-%m-%d")

@task
def get_time(dt):
    return datetime.datetime.strftime(dt, "%H:%M")

@task
def test(x):
    return x

with Flow("flowA") as flow1:
    test(1)

with Flow("flowB") as flow2:
    test(1)

with Flow("flowC") as flow3:
    test(1)

with Flow("flowD") as flow4:
    test(1)

flow1.register("omlds")
flow2.register("omlds")
flow3.register("omlds")
flow4.register("omlds")



with Flow("main") as flow:
    dt = roundTime(roundTo=30*60) # round to 30 and 30 min
    date = get_date(dt)
    time = get_time(dt)

    a = StartFlowRun("flowA", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True)(parameters={'x': date})

    b = StartFlowRun("flowB", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True)(parameters={'x': time})

    c = StartFlowRun("flowC", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     parameters={'x': 3})()

    d = StartFlowRun("flowD", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     parameters={'x': 4})()

    a.set_downstream(b)
    b.set_downstream(c)
    c.set_downstream(d)

# flow.register("omlds")
flow.run()
ā¤ļø 1
Oh I see. My mistake. I guess you’re fine because the function is deferred and takes care of getting the time with the
datetime.now()
. Can you try my simple flow and see if you get the multiple logs?
j
yes I will test it now, but thanks for the example!!
k
Of course! We’ll try to fix the other issues!
j
This continue starting more than one at starts... But, i run this 3 times, have 2 fails (one with the same problem and other because my script), and one a success. I will leave this scheduled flow running until tomorrow, to verify if it will work or not, and tomorrow I will give you a feedback about it.
k
Are you on server or cloud? And do you have multiple agents running at the same time?
You may be running into this
j
Hi, the problem persists, the css of github is offline for me, in an hour or two I'll take a look at this issue the server is on cloud, using different agents (in different machines/VM)
Now i can see the github... i have diferents agents in diferents machines, but each machine run different a flow an example: i have a machine for each step at ETL process, one for run flows to extract values, other machine to tranform this values and other to load this values. So, in each machine i have a agent running, and this flow that i am tryng to create need to trigger the ETL process... So my main flow need to trigger the subflows for each step.
But, the i continue with this issue of have a lot of submitted for execution at the start of the flow in other flow too... This is just a flow of transform in my data, that run every hour... When the flow starts the logs show:
Consequently this flow (from the print screen) go fail in a step of my script because more than one process tries to read/write the same file...
I don't know if both problems are correlated with this strange behavior of being submitted to several runs at the beginning... But, when run at "quick run", just submit an run, and all work well
k
What Prefect version are you on?
j
prefect==0.14.20
k
Ok will ask the team for more ideas here
ā¤ļø 1
Can I have a flow run ID that contains multiple processes?
When you said ā€œthe server is on cloudā€, do you mean you’re using
<http://cloud.prefect.io|cloud.prefect.io>
? Or did you spin up Prefect Server and host it on the cloud?
j
yes, i am using cloud.prefect.io
k
Sounds good. Can I have a flow run ID?
j
how i get this? its the uuid at the URL?
k
Yes
j
3c97f2a5-5a20-4250-b3fa-170ae9742fc3
k
Ok the multiple processes might be a bug on the Prefect side, but the recommendation also is to not run 4 Local Agents. One agent should be enough to handle your needs. The problem is multiple agents picking up the same Flow. You can use Labels to isolate which agents pick up which flows.
j
But i have different PC / virtual machines, how i can use just one agent?
k
If that is the case, then Labels will help you isolate which VMs can take which Flows. Do you want the Flow to be able to be pickled up by any of those PCs or virtual machines?
Or were you aware of Labels already?
j
No, no i have specifics flows for specific VM... and i like to use a main flow to run this flows in each VM, this is possible?
k
Yes this is possible! So first, we need to understand that labels allow you to specify which flows can be picked up by which agents. And then we can add labels to the
RunConfig
that we pass to
StartFlowRun
Copy code
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect
@task
def test(x):
    return x+1
with Flow("flowA") as flow1:
    test(1)
with Flow("flowB") as flow2:
    test(1)

flow1.register("omlds")
flow2.register("omlds")

with Flow("main") as flow:
    a = StartFlowRun("flowA", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     wait=True,
                     run_config=LocalRun(labels=["vm1"]),
                     parameters={'x': 1})()
    b = StartFlowRun("flowB", "omlds", 
                     new_flow_context=prefect.context.get('config'),
                     run_config=LocalRun(labels=["vm2"]),
                     wait=True,
                     parameters={'x': 2})()

    b.set_upstream(a)
flow.register("omlds")
Now flowA will run on the agent with label vm1 and flowB will run on the agent with label vm2
j
Okay, i go try this
k
You need to start the VM agents again with the appropriate labels
Sorry this has taken a while to resolve but thanks for being patient!
j
i already have labels on VMs, i just don't is using this
seems to have decreased the number of failures, but the problem persists the fails is:
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Trigger was "all_successful" but some of the upstream tasks failed.
k
Do you get this everytime you run the Flow?
j
only when it fails
k
So sometimes it succeeds and sometimes it fails?
j
yes
k
Ok asking the team about this
How often are you running this? It seems you’re running the same Flow in parallel on multiple agents? We need to provide a unique identifier so that these don’t hit each other on Prefect cloud. It seems like passing an idempotency key will make sure that the Flow run being created by StartFlowRun is a new one. You can do this with
StartFlowRun()(idempotency_key="any_string")
. Just make sure to use something like the datetime that will give a unique run each time.
j
This flow runs every 40 minutes, it takes between 5 and 10 minutes to successfully execute all the steps... so this never run in parallel
k
Ok I think this is related to the relatively frequent runs. I think the idempotency_key should fix this issue.
j
i can use:
StartFlowRun()(idempotency_key=datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d__%H-%M-%S--%f"))
or need to put this datetime in a task?
k
I think a task will be safer here
j
Okay
now this is trigging the same subflows to run three times, always two fail and one success, but this make the main flow fail... i really don't know why in the "quick run" the flow works and in the scheduled have this issue...
Here is the subflow that was executed three times, as I said above...
c
Hi @JoĆ£o Amorim I'm having trouble following your use case here; there are two Prefect concepts you should read about: • flow labels: these associate each run of a flow with a particular agent; to run a flow on 3 different VMs, you should have 3 different agents, each with their own unique labels, and then create 3 different runs with the correspond label sets. Your "parent" flow that is calling
StartFlowRun
also needs to be associated with one and only one agent via labels. • run idempotency keys: these ensure that you don't accidentally create duplicate runs unintentionally. Prefect's
StartFlowRun
task automatically uses its own task run ID as an idempotency key, but you can override this for other purposes Also note, if you are running your flow on multiple machines, you need to make sure that you use a storage class that is available on all 3 machines.
j
I think the only thing I didn't get to touch was this storage class...
When i try use the idempotency key, this generate the last issue above and always make my main flow fail... I made some modifications in my subflows, and remove the idempotency key at the main flow, in the last few hours I have stopped having failures... let's see if this will continue to work in the next hours ;)
the unique fail in the last hours, is still the same as when I opened this thread: in the log i get:
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
and in the UI dashboard and schematic says
"TriggerFailed"