João Amorim
06/07/2021, 12:48 PMprefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
and after some time, other error shows:
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID B: provided a running state but associated flow run A is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
does anyone have any guesses as to what is causing this? š
Kevin Kho
flow.run()
in your flow by chance? Is your flow small enough to share?João Amorim
06/07/2021, 1:24 PMimport prefect
import datetime
from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow
def roundTime(dt=None, roundTo=60):
if dt == None : dt = datetime.datetime.now()
seconds = (dt.replace(tzinfo=None) - dt.min).seconds
rounding = (seconds+roundTo/2) // roundTo * roundTo
dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
return dt.replace(tzinfo=datetime.timezone.utc)
with Flow("GPM: Download, enrichment and classify") as flow:
dt = roundTime(roundTo=30*60) # round to 30 and 30 min
date = datetime.datetime.strftime(dt, "%Y-%m-%d")
time = datetime.datetime.strftime(dt, "%H:%M")
downloadL_flow = FlowRunTask(flow_name='Download GPM',
project_name="A",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'type_data': "LATE"})
downloadE_flow = FlowRunTask(flow_name='Download GPM',
project_name="CPFL",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'type_data': "EARLY"})
enrichment_flow = FlowRunTask(flow_name='Enrichment GPM',
project_name="A",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'date': date ,
'time': time })
ia_flow = FlowRunTask(flow_name='Classify GPM',
project_name="A",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'date': date ,
'time': time })
downloadL_flow.set_downstream(downloadE_flow)
downloadE_flow.set_downstream(enrichment_flow)
enrichment_flow.set_downstream(ia_flow)
João Amorim
06/07/2021, 1:26 PMKevin Kho
Kevin Kho
João Amorim
06/07/2021, 1:30 PMpython3 -m prefect register --project "A" -p /home/hi/app/ -l "03" --watch --force> /home/hi/flow_log.txt &
Kevin Kho
FlowRunTask(flow_name='Classify GPM',
project_name="A",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'date': date ,
'time': time })
doesnāt call the run method. This is just the init. You might want another ()
at the end like a = FlowRunTask(ā¦)()
João Amorim
06/07/2021, 2:09 PMStartFlowRun
or the ()
in FlowRunTask
? Make sense this point, but just is weird that is working sometimes now kkkKevin Kho
StartFlowRun
will be used the same way. a = StartFlowRun(ā¦)()
.Kevin Kho
João Amorim
06/07/2021, 2:37 PMJoão Amorim
06/07/2021, 4:12 PMKevin Kho
João Amorim
06/07/2021, 4:50 PMJoão Amorim
06/07/2021, 5:28 PMKevin Kho
Kevin Kho
Kevin Kho
João Amorim
06/07/2021, 5:56 PM7 June 2021,01:31:29 ia INFO Submitted for execution: PID: 61924
7 June 2021,01:31:29 ia INFO Submitted for execution: PID: 61925
7 June 2021,01:31:29 ia INFO Submitted for execution: PID: 61926
7 June 2021,01:31:30 prefect.CloudFlowRunner INFO Beginning Flow run for 'GPM: Download, enrichment and classification'
7 June 2021,01:31:30 prefect.CloudFlowRunner INFO Beginning Flow run for 'GPM: Download, enrichment and classification'
7 June 2021,01:31:30 prefect.CloudFlowRunner INFO Beginning Flow run for 'GPM: Download, enrichment and classification'
7 June 2021,01:31:30 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:30 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:30 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:30 prefect.CloudTaskRunner ERROR Unexpected error: ClientError([{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/home/hi/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/hi/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/hi/.local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/hi/.local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
return run_method(self, *args, **kwargs)
File "/home/hi/.local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 177, in run
flow_run_id = client.create_flow_run(
File "/home/hi/.local/lib/python3.8/site-packages/prefect/client/client.py", line 1108, in create_flow_run
res = self.graphql(create_mutation, variables=dict(input=inputs))
File "/home/hi/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
7 June 2021,01:31:30 prefect.Flow Download GPM INFO Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/AAAAAAAAA8e66-82bb58f3cf6b>
7 June 2021,01:31:31 prefect.Flow Download GPM INFO Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/AAAAAAAAA8e66-82bb58f3cf6b>
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Finished task run for task with final state: 'Success'
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Finished task run for task with final state: 'Success'
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Starting task run...
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Enrichment GPM': Starting task run...
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Classify GPM': Starting task run...
7 June 2021,01:31:31 prefect.CloudTaskRunner INFO Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:31 prefect.CloudFlowRunner INFO Flow run FAILED: some reference tasks failed.
7 June 2021,01:31:31 prefect.Flow Download GPM INFO Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/BBBBBBBBBBBBBB-aefa7320bfed>
7 June 2021,01:31:31 prefect.Flow Download GPM INFO Flow Run: <https://cloud.prefect.io/myacc-hi/flow-run/BBBBBBBBBBBBBB-aefa7320bfed>
7 June 2021,01:31:45 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:45 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:51 prefect.CloudTaskRunner INFO FAIL signal raised: FAIL('BBBBBBBBBBBBBB-aefa7320bfed finished in state <Failed: "Some reference tasks failed.">')
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO FAIL signal raised: FAIL('BBBBBBBBBBBBBB-aefa7320bfed finished in state <Failed: "Some reference tasks failed.">')
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Download GPM': Finished task run for task with final state: 'Failed'
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Enrichment GPM': Starting task run...
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Classify GPM': Starting task run...
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Enrichment GPM': Starting task run...
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Enrichment GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:52 prefect.CloudFlowRunner INFO Flow run FAILED: some reference tasks failed.
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Classify GPM': Starting task run...
7 June 2021,01:31:52 prefect.CloudTaskRunner INFO Task 'Flow Classify GPM': Finished task run for task with final state: 'TriggerFailed'
7 June 2021,01:31:52 prefect.CloudFlowRunner WARNING Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">
7 June 2021,01:31:52 prefect.CloudFlowRunner INFO Flow run FAILED: some reference tasks failed.
Kevin Kho
João Amorim
06/07/2021, 7:05 PMJoão Amorim
06/07/2021, 7:06 PMKevin Kho
João Amorim
06/07/2021, 7:08 PMKevin Kho
Kevin Kho
Kevin Kho
dt = roundTime(roundTo=30*60) # round to 30 and 30 min
date = datetime.datetime.strftime(dt, "%Y-%m-%d")
time = datetime.datetime.strftime(dt, "%H:%M")
Kevin Kho
Kevin Kho
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect
@task
def test(x):
return x+1
with Flow("flowA") as flow1:
test(1)
with Flow("flowB") as flow2:
test(1)
with Flow("flowC") as flow3:
test(1)
with Flow("flowD") as flow4:
test(1)
flow1.register("omlds")
flow2.register("omlds")
flow3.register("omlds")
flow4.register("omlds")
with Flow("main") as flow:
a = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 1})()
b = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 2})()
c = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 3})()
d = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 4})()
a.set_downstream(b)
b.set_downstream(c)
c.set_downstream(d)
flow.register("omlds")
João Amorim
06/07/2021, 7:45 PMKevin Kho
João Amorim
06/07/2021, 7:48 PMKevin Kho
João Amorim
06/07/2021, 8:01 PMTime of the set time! date=2021-06-07 | time=18:00
this seems to work, but I agree with you that it should be a task in the main flowKevin Kho
roundTime
a task. 2. make the get_time
and get_date
into tasks. 3. use the tasks inside your Flow. 4. Tasks are deferred so you need to pass them as parameters in the run
method. Full example:
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect
import datetime
@task
def roundTime(dt=None, roundTo=60):
if dt == None : dt = datetime.datetime.now()
seconds = (dt.replace(tzinfo=None) - dt.min).seconds
rounding = (seconds+roundTo/2) // roundTo * roundTo
dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
return dt.replace(tzinfo=datetime.timezone.utc)
@task
def get_date(dt):
return datetime.datetime.strftime(dt, "%Y-%m-%d")
@task
def get_time(dt):
return datetime.datetime.strftime(dt, "%H:%M")
@task
def test(x):
return x
with Flow("flowA") as flow1:
test(1)
with Flow("flowB") as flow2:
test(1)
with Flow("flowC") as flow3:
test(1)
with Flow("flowD") as flow4:
test(1)
flow1.register("omlds")
flow2.register("omlds")
flow3.register("omlds")
flow4.register("omlds")
with Flow("main") as flow:
dt = roundTime(roundTo=30*60) # round to 30 and 30 min
date = get_date(dt)
time = get_time(dt)
a = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True)(parameters={'x': date})
b = StartFlowRun("flowB", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True)(parameters={'x': time})
c = StartFlowRun("flowC", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 3})()
d = StartFlowRun("flowD", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 4})()
a.set_downstream(b)
b.set_downstream(c)
c.set_downstream(d)
# flow.register("omlds")
flow.run()
Kevin Kho
datetime.now()
. Can you try my simple flow and see if you get the multiple logs?João Amorim
06/07/2021, 8:03 PMKevin Kho
João Amorim
06/07/2021, 8:36 PMKevin Kho
Kevin Kho
João Amorim
06/08/2021, 10:39 AMJoão Amorim
06/08/2021, 12:10 PMJoão Amorim
06/08/2021, 12:13 PMJoão Amorim
06/08/2021, 12:16 PMJoão Amorim
06/08/2021, 12:18 PMKevin Kho
João Amorim
06/08/2021, 2:24 PMKevin Kho
Kevin Kho
Kevin Kho
<http://cloud.prefect.io|cloud.prefect.io>
? Or did you spin up Prefect Server and host it on the cloud?João Amorim
06/08/2021, 4:20 PMKevin Kho
João Amorim
06/08/2021, 4:21 PMKevin Kho
João Amorim
06/08/2021, 4:23 PMKevin Kho
João Amorim
06/08/2021, 5:34 PMKevin Kho
Kevin Kho
João Amorim
06/08/2021, 5:45 PMKevin Kho
RunConfig
that we pass to StartFlowRun
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect
@task
def test(x):
return x+1
with Flow("flowA") as flow1:
test(1)
with Flow("flowB") as flow2:
test(1)
flow1.register("omlds")
flow2.register("omlds")
with Flow("main") as flow:
a = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
run_config=LocalRun(labels=["vm1"]),
parameters={'x': 1})()
b = StartFlowRun("flowB", "omlds",
new_flow_context=prefect.context.get('config'),
run_config=LocalRun(labels=["vm2"]),
wait=True,
parameters={'x': 2})()
b.set_upstream(a)
flow.register("omlds")
Kevin Kho
João Amorim
06/08/2021, 6:16 PMKevin Kho
Kevin Kho
João Amorim
06/08/2021, 6:21 PMJoão Amorim
06/09/2021, 11:05 AMprefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Trigger was "all_successful" but some of the upstream tasks failed.
Kevin Kho
João Amorim
06/09/2021, 5:17 PMKevin Kho
João Amorim
06/09/2021, 5:34 PMKevin Kho
Kevin Kho
StartFlowRun()(idempotency_key="any_string")
. Just make sure to use something like the datetime that will give a unique run each time.João Amorim
06/09/2021, 6:59 PMKevin Kho
João Amorim
06/09/2021, 7:07 PMStartFlowRun()(idempotency_key=datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d__%H-%M-%S--%f"))
or need to put this datetime in a task?Kevin Kho
João Amorim
06/09/2021, 7:14 PMJoão Amorim
06/10/2021, 11:59 AMJoão Amorim
06/10/2021, 12:19 PMChris White
StartFlowRun
also needs to be associated with one and only one agent via labels.
⢠run idempotency keys: these ensure that you don't accidentally create duplicate runs unintentionally. Prefect's StartFlowRun
task automatically uses its own task run ID as an idempotency key, but you can override this for other purposes
Also note, if you are running your flow on multiple machines, you need to make sure that you use a storage class that is available on all 3 machines.João Amorim
06/10/2021, 8:15 PMJoão Amorim
06/10/2021, 8:19 PMJoão Amorim
06/10/2021, 8:29 PMprefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
and in the UI dashboard and schematic says "TriggerFailed"