Jake
03/23/2022, 3:44 PMFlorian Guily
03/23/2022, 4:33 PMRachel Funk
03/23/2022, 5:20 PMno heartbeat detected
error that I've seen a few other folks flag. I read through the discourse documentation and the FAQ. I don't think it's a memory issue because a much larger Flow is running from the same agent + Google Cloud VM without any issues.
I can't seem to use the proposed solutions to configure heartbeats to use threads because I'm realizing that my Flows do not use:
*from* prefect.run_configs
So, I'm not using Universal Run or ECSRun. I'm guessing the setup of my flow might pre-date that library. Is that possible? Should I import that library if my Flows have been running fine without it?
I should mention that this heartbeat issue only started to crop up a few days ago.Jason Bertman
03/23/2022, 5:26 PMTraceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1922, in set_task_run_state
result = self.graphql(
...
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefecthq-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f12486259d0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
This error would seem to indicate that the service can't even be resolved, but this DNS name is perfectly resolvable from the dask pods, and many tasks succeed before a couple fail. I see signs of distress from the apollo service in the form of:
│ BadRequestError: request aborted │
│ at IncomingMessage.onAborted (/apollo/node_modules/raw-body/index.js:231:10) │
│ at IncomingMessage.emit (events.js:315:20) │
│ at abortIncoming (_http_server.js:561:9) │
│ at socketOnClose (_http_server.js:554:3) │
│ at Socket.emit (events.js:327:22) │
│ at TCP.<anonymous> (net.js:673:12)
repeated pretty much as many times as the dask pod tries it. Light research on this case points to the service being overwhelmed. Is this case covered by the request retrier here? https://github.com/PrefectHQ/prefect/blob/a2041c7ff1a619e611f614ed3394fccd05bb2005/src/prefect/client/client.py#L633
If not, what's the best way to handle this case? Any configuration changes I could be making the Apollo pod?Ken Nguyen
03/23/2022, 6:08 PMAnatoly Myachev
03/23/2022, 6:21 PMprefect orion kubernetes-manifest | kubectl apply -f -
command, kubernetes
queue is not created, although according to the documentation it should. Has anyone had similar?Hedgar
03/23/2022, 6:37 PMRajan Subramanian
03/23/2022, 6:45 PMChris Reuter
03/23/2022, 6:55 PMkevin
03/23/2022, 8:06 PMwith Flow('foo') as f:
a = task(lambda x: {'key': x})('value')
b = task(lambda y: y.pop('key'))(a)
c = task(lambda z: z.get('key'))(a)
Anatoly Myachev
03/23/2022, 8:20 PMDaskTaskRunner
that use dask_cloudprovider.aws.FargateCluster
? There is no logs in Orion UI except 'task-b610b3b0-0' - Crash detected! Execution was interrupted by an unexpected exception.
and RuntimeError: IOLoop is closed
but it's possible to see it in CloudWatch console. It contains the following error: Exception: 'RuntimeError("Cannot orchestrate task run \'9987d480-fad1-433a-9749-077a63fbdc0a\'. Failed to connect to API at <http://orion:4200/api/>.")'
. Note: Orion is deployed on Kubernetes via orion kubernetes-manifest
command.Adam
03/23/2022, 8:32 PMEdward Chen
03/23/2022, 8:46 PMKen Nguyen
03/23/2022, 11:21 PMwith Flow("flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
json_data = get_json_data(
url, query, headers,
task_args={"name": "Getting Flow Data"}
)
How can I then access json_data as a python object, rather than a FunctionTask object?Richard Freeman
03/24/2022, 1:25 AMKen Nguyen
03/24/2022, 2:23 AMStartFlowRun
vs create_flow_run
? They seem to be slightly different in the documentation but has the same end resultJeff Kehler
03/24/2022, 2:52 AMmy_module/
flows/
my_flow.py
tasks/
lib/
tasks/
shared_task.py
So in my_flow.py
I am importing from my_module.lib.tasks.shared_task.py
where i've created a reusable Task class. But I am unable to register this flow using the prefect command
prefect register --project Test -m <http://my_module.flows.my|my_module.flows.my>_flow
I've tried many different combinations of the prefect register
command and it just continues to generate a ModuleNotFoundError: No module named my my_module
However, if I just use flow.register()
inside of my_flow.py
and execute it using python my_module/flows/my_flow.py
it registers just fine.Muddassir Shaikh
03/24/2022, 5:03 AMAbuzar Shaikh
03/24/2022, 6:48 AMIevgenii Martynenko
03/24/2022, 9:20 AMIevgenii Martynenko
03/24/2022, 9:32 AMIevgenii Martynenko
03/24/2022, 9:44 AM# with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
# thread_name_prefix="flow_") as executor:
# futures = [executor.submit(self._run, df) for df in self.dfs]
#
# for future in as_completed(futures):
# try:
# future.result()
# except Exception as exc:
# logger.exception(exc)
Florian Guily
03/24/2022, 10:48 AMVadym Dytyniak
03/24/2022, 12:19 PMephemeralStorage
in ECSRun configuration. Can I use run_task_kwargs
to modify it? If yes - what is the format of this dict to pass ephemeralStorage
?Joshua Greenhalgh
03/24/2022, 12:28 PMjobs.batch is forbidden: User \"system:serviceaccount:default:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"
Didier Marin
03/24/2022, 1:33 PM16:46:32 Task 'x': Starting task run...
16:46:33 Task 'x': Finished task run for task with final state: 'TriggerFailed'
01:39:32 Task 'x': Starting task run...
01:39:46 Task 'x': Starting task run...
Don't know if it could be linked, but I had to retry the parent task that failed (hence the "TriggerFailed" that you can see above).
I'm running using a k8s executor, with core version 0.14.20.
Any idea what could have happened?Raimundo Pereira De Souza Neto
03/24/2022, 2:21 PMfrom prefect import flow
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta
@flow(name="TestingFlow")
async def testing_flow(name="raimundo"):
try:
print(f"Hello {name}!")
except Exception as e:
print(e)
DeploymentSpec(
flow=testing_flow,
name="hw-30s",
schedule=IntervalSchedule(interval=timedelta(seconds=20)),
tags=["rai", "20s"],
)
when I run prefect deployment create my_file.py
, that creates it correctly, but the tasks don't run.
Anyone help me please 💙Nick Hart
03/24/2022, 2:24 PMRamzi A
03/24/2022, 3:17 PMSteve R
03/24/2022, 3:21 PM