David Salgado
04/12/2023, 6:51 AMcleanup
task must happen after all the my_task
tasks have completed. To make that happen, I'm passing the results of the mapped my_task
tasks, even though cleanup
doesn't do anything with them.
@flow
def my_flow():
results = my_task.map([1,2,3])
cleanup(results)
@task
def my_task(x, dummy):
# ...do whatever
return value
@task
def cleanup(dummy):
# ...do something essential (without using `dummy`)
Sometimes, one of the my_task
tasks fails, and the cleanup
task stays stuck in NotReady
state, according to the Prefect Cloud web UI.
Is this expected behaviour? If so, what's the recommended way to ensure that cleanup
always runs after all the my_task
tasks have finished, whether they succeeded or not?Leela Surya Teja Mangamuri
04/13/2023, 2:43 AMAndy Dienes
04/13/2023, 2:41 PMx
. when I do a quick run
there is a UI prompt to fill in x
. when I set a cron schedule, I see no such prompt. what value is it using for x
, just None
? how do I set this per-deployment? even better, how do I set it relative to the scheduled run time, like how Airflow has {{ ds_nodash }}
for a daily run?Andy Dienes
04/13/2023, 2:53 PMprefect.context.get_run_context().start_time
, but that seems a little dangerous if the run is late; I'd rather pull the scheduled time if possibleAndy Dienes
04/13/2023, 5:48 PMBraun Reyes
04/13/2023, 8:39 PMDownloading flow code from storage at 'orchestration/main-orchestration-flow'
03:31:38 PM
prefect.flow_runs
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 879, in exec_module
File "<frozen importlib._bootstrap_external>", line 1016, in get_code
File "<frozen importlib._bootstrap_external>", line 1073, in get_data
FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/main_orchestration_flow.py'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 276, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 217, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, str(import_path))
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect/flows.py", line 809, in load_flow_from_entrypoint
flow = import_object(entrypoint)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 201, in import_object
module = load_script_as_module(script_path)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 164, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'main_orchestration_flow.py' encountered an exception: FileNotFoundError(2, 'No such file or directory')
I saw that a bug was fixed in 2.10.3, but this flow is running on that version. The files are in the remote storage.Scott Cressi
04/14/2023, 3:55 PMLeela Surya Teja Mangamuri
04/14/2023, 6:27 PMLeela Surya Teja Mangamuri
04/14/2023, 6:29 PMnakul bajaj
04/16/2023, 3:44 AMData Ops
04/17/2023, 6:14 PMTheo Sjöstedt
04/18/2023, 11:25 AMLate
. How can I debug what the problem is?Theo Sjöstedt
04/18/2023, 12:59 PMjack
04/18/2023, 8:58 PMSubmission failed. KeyError: 'env'
when attempting a "Hello World" prefect flow on ECS
Using the prefect-aws guide and using this invocation to build a deployment:
prefect deployment build run.py:main \
-n jack-test-1 \
-ib ecs-task/ecs-block \
-sb s3/flow-storage-block \
--pool some-pool \
--override env.EXTRA_PIP_PACKAGES=prefect-aws
Jarod Xue
04/19/2023, 2:00 AMJarod Xue
04/19/2023, 2:03 AM___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from work pool 'default-agent-pool'...
Failed the last 3 attempts. Please check your environment and configuration.
Examples of recent errors:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/h2/connection.py", line 224, in
process_input
func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/http2.py", line
116, in handle_async_request
await self._send_request_headers(request=request, stream_id=stream_id)
File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/http2.py", line
213, in _send_request_headers
self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
File "/usr/local/lib/python3.10/dist-packages/h2/connection.py", line 766, in
send_headers
self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
File "/usr/local/lib/python3.10/dist-packages/h2/connection.py", line 228, in
process_input
raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in
state ConnectionState.CLOSED
Jarod Xue
04/19/2023, 2:05 AMUbuntu 22.04
$ prefect version
Version: 2.10.3
API version: 0.8.4
Python version: 3.10.6
Git commit: f9ddd259
Built: Tue, Apr 11, 2023 11:55 AM
OS/Arch: linux/x86_64
Profile: default
Server type: cloud
Jarod Xue
04/19/2023, 2:07 AMjack
04/19/2023, 2:20 PM$ prefect deployment build run.py:main -n jack-test-3 -ib ecs-task/ecs-block -sb s3/flow-storage-block --pool some-pool --override env.EXTRA_PIP_PACKAGES='prefect-aws s3fs'
Found flow 'main'
Deployment YAML created at 'V:\some-dir\ecs-test-3\main-deployment.yaml'.
Successfully uploaded 2 files to <s3://some-bucket/some-path/>
jack
04/19/2023, 2:32 PMtask_role_arn
or the execution_role_arn
that needs read access to the S3 bucket where the flow is deployed?Scott Condo
04/19/2023, 2:57 PMjack
04/19/2023, 6:21 PMTheo Sjöstedt
04/20/2023, 8:31 AMThomas DUSSOUILLEZ
04/20/2023, 8:36 AMSlackbot
04/20/2023, 2:40 PMAndrew
04/20/2023, 8:00 PMSantiago Gonzalez
04/21/2023, 12:33 PMreturn fn(*args, **kwargs)
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
return call()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
return self.result()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/cli/worker.py", line 142, in start
started_event = await worker._emit_worker_started_event()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/home/sgonzalez/venv/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
await coro
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/utilities/services.py", line 104, in critical_service_loop
raise RuntimeError("Service exceeded error threshold.")
RuntimeError: Service exceeded error threshold.
An exception occurred.
And the thing is that, no flow has been running last night. Do you know what is the cause of the crash of the worker? Prefect version 2.10.5
.Scott Condo
04/21/2023, 2:22 PMBlue Radar
04/21/2023, 2:33 PMprefect agent start -p <WORK_POOL> --api <WORK_SPACE_API>
But when i try to do the same in the Kubernetes deployment it is not working.Scott Condo
04/24/2023, 1:10 PM