Stephen Lloyd
04/23/2024, 10:17 AMreturn Deployment.build_from_flow(
flow=flow,
name=deploy_name,
infrastructure=ecs_task_block,
infra_overrides=override,
description=description,
tags=tags,
work_queue_name="prefect-agent-queue",
output=False,
apply=True,
storage=storage_block,
path=path,
**kwargs,
)
Now I'm trying to switch to flow.deploy()
. However, there are no examples of how to do this with s3 and I'm running into problems. Any suggestions or examples?Nate
04/23/2024, 11:01 AMStephen Lloyd
04/23/2024, 11:23 AMflow.py:flow
Traceback (most recent call last):
File "/home/runner/work/prefect-orchestration/prefect-orchestration/flows/heartbeat/deployment.py", line 12, in <module>
deployment = deploy(
^^^^^^^
File "/home/runner/work/prefect-orchestration/prefect-orchestration/utilities/prefect_deployment.py", line 65, in deploy
flow.from_source(source=storage_block, entrypoint=path).deploy(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 259, in coroutine_wrapper
return call()
^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
return self.result()
^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/flows.py", line 940, in from_source
flow: "Flow" = await from_async.wait_for_call_in_new_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 164, in wait_for_call_in_new_thread
return call.result()
^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/flows.py", line 1665, in load_flow_from_entrypoint
path, func_name = entrypoint.rsplit(".", maxsplit=1)
^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 2, got 1)
Stephen Lloyd
04/23/2024, 11:28 AMimport platform
from prefect import flow, task, get_run_logger, __version__
from prefect.server.api.server import SERVER_API_VERSION
from emoji import emojize
from utilities.example import Do_Something
@task
def log_platform_info():
logger = get_run_logger()
emoji = emojize(":thumbs_up:")
wiki = Do_Something()
<http://logger.info|logger.info>(f"Hello from Wiki HeartBeat with ecr push changes: {wiki}")
<http://logger.info|logger.info>("Python version = %s", platform.python_version())
<http://logger.info|logger.info>(f"Prefect Version = {__version__}, Prefect API Version = {SERVER_API_VERSION} 🚀, I am {emoji}")
@flow
def heartbeat_flow():
log_platform_info()
deployment.py
from flow import heartbeat_flow
from utilities.prefect_deployment import deploy
from prefect.server.schemas.schedules import CronSchedule
import os
schedule = (
CronSchedule(cron="0 * * * *")
if os.getenv("ENV", "dev").lower() == "prod"
else None
)
deployment = deploy(
flow=heartbeat_flow,
tags=["poc", "heartbeat", "NOOP", "hourly", "workpool"],
description="Health check flow",
schedule=schedule,
)
if __name__ == "__main__":
deployment.apply()
prefect_deployment.py
import os
from prefect.flows import Flow
from prefect.deployments import Deployment
from prefect_aws.ecs import ECSTask
from prefect.filesystems import S3
from collections import OrderedDict
from typing import Optional, Iterable
ACCOUNT_ID = {"dev": "1234", "prod": "6789"}
def deploy(
flow: Flow,
suffix_name: Optional[str] = None,
cpu: Optional[int] = None,
memory: Optional[int] = None,
tags: Optional[Iterable[str]] = None,
description: Optional[str] = None,
**kwargs,
):
"""
Configure a deployment for a given flow.
Args:
flow (required): A flow function to deploy
suffix_name (optional): A name for the deployment
cpu (optional): Number of CPUs to run the task. Default: 1024
memory (optional): Size of memory to run the task. Default: 2048
description (optional): Description of the flow.
tags (optional): Tags associated with the flow.
**kwargs: other keyword arguments to pass to the constructor for the `Deployment` class
"""
run_env = os.getenv("ENV", "dev")
tag_name = os.getenv("TAG_NAME")
if tag_name is None:
print("TAG_NAME is not set")
exit(1)
name = os.getenv("DEPLOYMENT_NAME")
if name is None:
print("DEPLOYMENT_NAME is not set")
exit(1)
list_name = [name, suffix_name]
list_name = list(OrderedDict.fromkeys(list_name))
deploy_name = "-".join(filter(None, list_name))
path = os.getenv("STORAGE_PATH")
if path is None:
print("STORAGE_PATH is not set")
exit(1)
ecs_task_block = ECSTask.load("ecs-infra")
storage_block = S3.load("prefect-s3-storage")
image = f"{ACCOUNT_ID[run_env]}.<http://dkr.ecr.us-east-1.amazonaws.com/prefect:{tag_name}|dkr.ecr.us-east-1.amazonaws.com/prefect:{tag_name}>"
override = {
"image": image,
}
if cpu is not None:
override["cpu"] = cpu
if memory is not None:
override["memory"] = memory
flow.from_source(source=storage_block, entrypoint=path).deploy(
name=deploy_name,
job_variables=override,
description=description,
work_pool_name="engineering-work-pool",
image=image,
)
# return Deployment.build_from_flow(
# flow=flow,
# name=deploy_name,
# infrastructure=ecs_task_block,
# job_variables=override,
# description=description,
# tags=tags,
# work_pool_name="engineering-work-pool",
# output=False,
# apply=True,
# storage=storage_block,
# path=path,
# **kwargs,
# )
Nate
04/23/2024, 11:41 AMFile "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/flows.py", line 1665, in load_flow_from_entrypoint
path, func_name = entrypoint.rsplit(".", maxsplit=1)
^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 2, got 1)
this seems to be because you're not specifying a :flow_decorated_function
at the end of your entrypoint
I don't understand why it is necessary to restate the flows function if we are already using the flow function to execute the deployment.this should not be necessary what is the second file
deployment.py
for? the apply()
method was used for agent based deployments but is not valid for workersStephen Lloyd
04/23/2024, 12:09 PMthis seems to be because you're not specifying aexactly. my question was more why do I need to have an object that is myat the end of your entrypoint:flow_decorated_function
@flow
decorated function subsequently call from_source()
which references my flow decorated function explicitly again.
It seems like perhaps our utility prefect_deployment.py
isn't totally necessary now and that I still need to remove other agent based deployment legacy stuff.
I don't like the idea of referencing the function explicity in our py file after we are already importing it, but I'm not sure exactly why. I'l have to think more about that.
Any other pointers?Stephen Lloyd
04/24/2024, 5:41 AMFailed to submit flow run 'ded982c5-ccc0-4ed4-8cb3-1ca9eba37211' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 904, in _submit_run_and_capture_errors
result = await self.run(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 638, in run
) = await run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 95, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 750, in _create_task_and_wait_for_start
self._report_task_run_creation_failure(configuration, task_run_request, exc)
File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 746, in _create_task_and_wait_for_start
task = self._create_task_run(ecs_client, task_run_request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 289, in wrapped_f
return self(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 379, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 325, in iter
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 158, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 382, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 1632, in _create_task_run
task = ecs_client.run_task(**task_run_request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 553, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 962, in _make_api_call
request_dict = self._convert_to_request_dict(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 1036, in _convert_to_request_dict
request_dict = self._serializer.serialize_to_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/validate.py", line 381, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in networkConfiguration.awsvpcConfiguration: "subnets"
Unknown parameter in networkConfiguration.awsvpcConfiguration: "awsvpcConfiguration", must be one of: subnets, securityGroups, assignPublicIp
But subnets
is definitely included under awsvpcConfiguration
. This is from the flow run debug..
"networkConfiguration": {
"awsvpcConfiguration": {
"awsvpcConfiguration": {
"subnets": [
"subnet-subone",
"subnet-subtwo"
],
"securityGroups": [
"sg-mysg"
]
}
and is in the workpool definition (image)
Any ideas?Stephen Lloyd
04/24/2024, 6:02 AMawsvpcConfiguration
key inside awsvpcConfiguration
.Nate
04/24/2024, 1:39 PM"awsvpcConfiguration": {
"awsvpcConfiguration": {looks weird to me is this from your work pools job template?
Stephen Lloyd
04/25/2024, 3:17 AMStephen Lloyd
04/25/2024, 3:19 AMStephen Lloyd
04/25/2024, 3:25 AMOlli Kavén
05/03/2024, 9:34 AMStephen Lloyd
05/06/2024, 4:55 AMpublish_as_work_pool()
doesn't work quite right, it messed up a few things in the work pool base template.
networkConfiguration needed manual editing.
Remove any reference to awsvpcconfiguration and just insert the object with subnets key and value
Custom Images These were not being referenced correctly. We provide a custom image for every unique flow or flow source system. 1. We had to go into the advanced tab and edit the json for the image with a jinja reference to the image variableCopy code{ "subnets": [ "subnet-number1", "subnet-number2" ], "securityGroups": [ "sg-number3" ] }
"{{ image }}"
2. We had to remove the hard-coded image reference on the Defaults page of the base template. This hard-coded value overrode the advanced json value.
job_variables vs overrides
One half-confession/half-takeaway is that when I read the docs I assumed job_variables
was the same as overrides
, but this is not the case. This was a silly mistake on my part. They are actually variables and as far as I could tell, you need to reference those variables specifically somewhere in the template. Previously, one could just override an existing key's value, but now you need to use a variable to do that. We used an image
variable to override the image, but I think we could have made it xyz
if we wanted. I think the new system will be much more flexible for a number of reasons, but it felt counterintuitive to me at first for some reason.
from_source().deploy()
As far as I can tell this does not deploy code to a source like s3. We were already providing an s3 path in our deployment script and build_from_flow()
would actually upload the relevant flow files to s3 for us. With the new method, Ihad to explicitly do this upload. Fortunately, the new S3Bucket
class in prefect-aws
made it easy with put_directory
.
path = os.getenv("STORAGE_PATH")
if path is None:
print("STORAGE_PATH is not set")
exit(1)
full_path = path + "/" + entrypoint
storage_block = S3Bucket.load("prefect-aws-s3-storage")
storage_block.put_directory(
to_path=path,
ignore_file=".prefectignore",
)
prefect.filesystems is deprecated
We had to move from prefect.filesystems.s3
to prefect-aws.S3Bucket
as mentioned above.Jeff Hale
05/06/2024, 1:35 PMJeff Hale
05/06/2024, 2:25 PMStephen Lloyd
05/07/2024, 8:13 AM