Florian Guily
03/23/2022, 10:39 AMNelson Griffiths
03/23/2022, 1:20 PMTraceback (most recent call last):
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/cli/base.py", line 59, in wrapper
return fn(*args, **kwargs)
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
return asynclib.run(func, *args, **backend_options)
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
return native_run(wrapper(), debug=debug)
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
return await func(*args)
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/cli/agent.py", line 71, in start
await agent.get_and_submit_flow_runs()
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/agent.py", line 88, in get_and_submit_flow_runs
submittable_runs = await self.client.get_runs_in_work_queue(
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/client.py", line 747, in get_runs_in_work_queue
response = await <http://self._client.post|self._client.post>(
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/httpx.py", line 137, in post
return await self.request(
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/httpx.py", line 80, in request
response.raise_for_status()
File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/httpx/_models.py", line 1510, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '403 Forbidden' for url '<https://api-beta.prefect.io/api/accounts/df4b7089-cc2a-48ae-b4ce-baea44b163d6/workspaces/b22af91f-f810-4bc3-ac90-a1fa0e042c55/work_queues/c91e1439-be7e-4a98-8df0-da39515197b2/get_runs>'
For more information check: <https://httpstatuses.com/403>
An exception occurred.
Any ideas what might be causing this?Shrikkanth
03/23/2022, 2:16 PMChris Reuter
03/23/2022, 2:33 PMKyle Austin
03/23/2022, 2:34 PMDaniel Komisar
03/23/2022, 3:44 PMDomenico Di Gangi
03/23/2022, 3:44 PM@task(log_stdout=True)
gives an error in orion. Is there an alternative way to log the stdout in orion ?Jake
03/23/2022, 3:44 PMFlorian Guily
03/23/2022, 4:33 PMRachel Funk
03/23/2022, 5:20 PMno heartbeat detected
error that I've seen a few other folks flag. I read through the discourse documentation and the FAQ. I don't think it's a memory issue because a much larger Flow is running from the same agent + Google Cloud VM without any issues.
I can't seem to use the proposed solutions to configure heartbeats to use threads because I'm realizing that my Flows do not use:
*from* prefect.run_configs
So, I'm not using Universal Run or ECSRun. I'm guessing the setup of my flow might pre-date that library. Is that possible? Should I import that library if my Flows have been running fine without it?
I should mention that this heartbeat issue only started to crop up a few days ago.Jason Bertman
03/23/2022, 5:26 PMTraceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1922, in set_task_run_state
result = self.graphql(
...
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefecthq-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f12486259d0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
This error would seem to indicate that the service can't even be resolved, but this DNS name is perfectly resolvable from the dask pods, and many tasks succeed before a couple fail. I see signs of distress from the apollo service in the form of:
│ BadRequestError: request aborted │
│ at IncomingMessage.onAborted (/apollo/node_modules/raw-body/index.js:231:10) │
│ at IncomingMessage.emit (events.js:315:20) │
│ at abortIncoming (_http_server.js:561:9) │
│ at socketOnClose (_http_server.js:554:3) │
│ at Socket.emit (events.js:327:22) │
│ at TCP.<anonymous> (net.js:673:12)
repeated pretty much as many times as the dask pod tries it. Light research on this case points to the service being overwhelmed. Is this case covered by the request retrier here? https://github.com/PrefectHQ/prefect/blob/a2041c7ff1a619e611f614ed3394fccd05bb2005/src/prefect/client/client.py#L633
If not, what's the best way to handle this case? Any configuration changes I could be making the Apollo pod?Ken Nguyen
03/23/2022, 6:08 PMAnatoly Myachev
03/23/2022, 6:21 PMprefect orion kubernetes-manifest | kubectl apply -f -
command, kubernetes
queue is not created, although according to the documentation it should. Has anyone had similar?Hedgar
03/23/2022, 6:37 PMRajan Subramanian
03/23/2022, 6:45 PMChris Reuter
03/23/2022, 6:55 PMkevin
03/23/2022, 8:06 PMwith Flow('foo') as f:
a = task(lambda x: {'key': x})('value')
b = task(lambda y: y.pop('key'))(a)
c = task(lambda z: z.get('key'))(a)
Anatoly Myachev
03/23/2022, 8:20 PMDaskTaskRunner
that use dask_cloudprovider.aws.FargateCluster
? There is no logs in Orion UI except 'task-b610b3b0-0' - Crash detected! Execution was interrupted by an unexpected exception.
and RuntimeError: IOLoop is closed
but it's possible to see it in CloudWatch console. It contains the following error: Exception: 'RuntimeError("Cannot orchestrate task run \'9987d480-fad1-433a-9749-077a63fbdc0a\'. Failed to connect to API at <http://orion:4200/api/>.")'
. Note: Orion is deployed on Kubernetes via orion kubernetes-manifest
command.Kevin Kho
03/23/2022, 8:27 PMAdam
03/23/2022, 8:32 PMEdward Chen
03/23/2022, 8:46 PMKen Nguyen
03/23/2022, 11:21 PMwith Flow("flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
json_data = get_json_data(
url, query, headers,
task_args={"name": "Getting Flow Data"}
)
How can I then access json_data as a python object, rather than a FunctionTask object?Richard Freeman
03/24/2022, 1:25 AMKen Nguyen
03/24/2022, 2:23 AMStartFlowRun
vs create_flow_run
? They seem to be slightly different in the documentation but has the same end resultJeff Kehler
03/24/2022, 2:52 AMmy_module/
flows/
my_flow.py
tasks/
lib/
tasks/
shared_task.py
So in my_flow.py
I am importing from my_module.lib.tasks.shared_task.py
where i've created a reusable Task class. But I am unable to register this flow using the prefect command
prefect register --project Test -m <http://my_module.flows.my|my_module.flows.my>_flow
I've tried many different combinations of the prefect register
command and it just continues to generate a ModuleNotFoundError: No module named my my_module
However, if I just use flow.register()
inside of my_flow.py
and execute it using python my_module/flows/my_flow.py
it registers just fine.Muddassir Shaikh
03/24/2022, 5:03 AMAbuzar Shaikh
03/24/2022, 6:48 AMIevgenii Martynenko
03/24/2022, 9:20 AMIevgenii Martynenko
03/24/2022, 9:32 AMIevgenii Martynenko
03/24/2022, 9:44 AM# with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
# thread_name_prefix="flow_") as executor:
# futures = [executor.submit(self._run, df) for df in self.dfs]
#
# for future in as_completed(futures):
# try:
# future.result()
# except Exception as exc:
# logger.exception(exc)
Ievgenii Martynenko
03/24/2022, 9:44 AM# with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
# thread_name_prefix="flow_") as executor:
# futures = [executor.submit(self._run, df) for df in self.dfs]
#
# for future in as_completed(futures):
# try:
# future.result()
# except Exception as exc:
# logger.exception(exc)
Anna Geller
03/24/2022, 1:34 PMLocalDaskExecutor
that uses multithreading under the hood by default, then Prefect will make sure that the context stays thread safeIevgenii Martynenko
03/24/2022, 2:23 PMKevin Kho
03/24/2022, 2:46 PMlogger
as an input into your multiprocessing function, it will work, but really just use LocalDask and then you get observability into your tasks as wellAnna Geller
03/24/2022, 3:34 PMYeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine.Actually, using
LocalDaskExecutor
doesn't use Dask, it uses multithreading. Check out https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374