Pedro Machado
05/23/2023, 8:10 PMcheck_data_availability
task that calls the API. If the data is not ready, it should raise an exception.
• If the data is ready, it will continue
Can you think of a better pattern? If I go with this approach, I was thinking that it would be good to define a new state waiting for data availability
or something along those lines to differentiate it from a failure. Does this make sense? Are custom states possible? Can Prefect Cloud render custom states? How would I specify a custom state?
Thanks!jhenry
05/23/2023, 9:19 PMJacob Goldberg
05/23/2023, 10:01 PMfrom prefect.storage import Docker
from prefect.run_configs import DockerRun
import os
from prefect.utilities.storage import extract_flow_from_file
# locate all flows and extract from files
flow_directory = "..."
flow_file_names = [...]
all_flows = [
extract_flow_from_file(os.path.join(flow_directory, file_name))
for file_name in flow_file_names
]
# define flow storage/config
STORAGE = Docker(...)
RUN_CONFIG = DockerRun(...)
def build_and_register_all_flows(
flow_list,
storage=STORAGE,
run_config=RUN_CONFIG,
):
# assign run config and storage to all flows
for flow in flow_list:
flow.run_config = run_config
storage.add_flow(flow)
# build the Docker Image
storage = storage.build()
for flow in flow_list:
# Reassign the new storage object to each Flow
flow.storage = storage
# Register each flow without building a second time
flow.register(...)
if __name__ == "__main__":
build_and_register_all_flows(all_flows)
This pattern relied heavily on the extract_flow_from_file
utility from prefect 1.x which I am not sure exists in prefect 2.x
I have found this deployment tutorial on the prefect site that shows a reasonably familiar pattern in the “Deployment creation with Python” section that I hope to follow. However, it only applies to one flow.
Can someone help me understand how to apply this to multiple flows? is there a utility similar to the extract_flow_from_file
utility from prefect 1.x that can help me to bundle up a bunch of flows and deploy them together?Shaoyi Zhang
05/23/2023, 10:16 PMNikhil Jain
05/23/2023, 11:50 PMMatt Alhonte
05/24/2023, 1:40 AMprefect register <filepath>
in Prefect 1? Assume it's a bunch of py
files with Deployments defined in Python code.Josh Lopez
05/24/2023, 2:31 AMprefect server start
results in server starting then immediately stopping after prefect reinstall in new conda environmentNitin Bansal
05/24/2023, 7:12 AMDenys Volokh
05/24/2023, 7:22 AMBen Muller
05/24/2023, 9:36 AMstorage.get_directory()
is triggered I get the following error in 🧵eli yosef
05/24/2023, 9:42 AMasmundo
05/24/2023, 9:43 AMAndreas
05/24/2023, 10:44 AMIvan Ksaver Šušnjara
05/24/2023, 12:07 PMGiorgio Basile
05/24/2023, 2:12 PMSam Debruyn
05/24/2023, 3:46 PM-h
instead of --help
because every other CLI allows this as well. It would be nice if prefect
would also just understand -h
Faizul
05/24/2023, 4:33 PMJosh Lopez
05/24/2023, 5:18 PMJimmy Le
05/24/2023, 6:17 PMBrayyan Yanes
05/24/2023, 8:26 PMBrayyan Yanes
05/24/2023, 8:27 PMBrayyan Yanes
05/24/2023, 8:27 PMBrayyan Yanes
05/24/2023, 8:28 PMBrayyan Yanes
05/24/2023, 8:28 PMBrayyan Yanes
05/24/2023, 8:29 PMRobert Banick
05/24/2023, 10:06 PM21:53:15.511 | INFO | prefect.agent - Submitting flow run '7bec67b1-ddba-429a-a3a6-dc071401433b'
21:53:17.265 | INFO | prefect.infrastructure.ecs-task - ECSTask 'pretty-mussel': Retrieving task definition 'arn:aws:ecs:us-east-1:421396523132:task-definition/zarr-etl-ec2-efs:44'...
21:53:17.517 | INFO | prefect.infrastructure.ecs-task - ECSTask 'pretty-mussel': Retrieving task definition 'zarr-etl-ec2-efs'...
21:53:17.540 | WARNING | prefect.infrastructure.ecs-task - ECSTask 'pretty-mussel': Settings require changes to the linked task definition. A new task definition will be registered.
21:53:17.603 | INFO | prefect.infrastructure.ecs-task - ECSTask 'pretty-mussel': Creating task run...
21:53:17.726 | ERROR | prefect.agent - Failed to submit flow run '7bec67b1-ddba-429a-a3a6-dc071401433b' to infrastructure.
Traceback (most recent call last):
File "/root/.local/lib/python3.10/site-packages/prefect/agent.py", line 490, in _submit_run_and_capture_errors
result = await infrastructure.run(task_status=task_status)
File "/root/.local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 621, in run
) = await run_sync_in_worker_thread(
File "/root/.local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/root/.local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/root/.local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/root/.local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/root/.local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 814, in _create_task_and_wait_for_start
self._report_task_run_creation_failure(task_run, exc)
File "/root/.local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 810, in _create_task_and_wait_for_start
task = self._run_task(ecs_client, task_run)
File "/root/.local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 1487, in _run_task
return ecs_client.run_task(**task_run)["tasks"][0]
IndexError: list index out of range
We would like to avoid a one-deployment-per-flow setup if at all possible as it’s considerably clunkier code-wise and on the Cloud interface. We must run some of our ETLs on EC2 however.
We encountered this error running on prefect 2.10.10
with flow runs using ECSTasks.
Any ideas?
Note that this appears a repeat of a previous, unanswered issue in this channel https://linen.prefect.io/t/10301091/hi-i-am-running-a-deployment-multiple-times-from-a-master-fl. @Tibs did you ever find a solution?jack
05/24/2023, 10:20 PMSET "PREFECT_LOGGING_FORMATTERS_STANDARD_TASK_RUN_FMT=%(asctime)s.%(msecs)03d | %(levelname)-7s | %(task_name)s | %(message)s"
Walter Cavinaw
05/24/2023, 11:34 PMGiacomo Chiarella
05/25/2023, 6:23 AMGiacomo Chiarella
05/25/2023, 7:20 AM