I am working on moving an agent-based deployment t...
# prefect-aws
s
I am working on moving an agent-based deployment to a worker-based deployment. Previously we used
Copy code
return Deployment.build_from_flow(
        flow=flow,
        name=deploy_name,
        infrastructure=ecs_task_block,
        infra_overrides=override,
        description=description,
        tags=tags,
        work_queue_name="prefect-agent-queue",
        output=False,
        apply=True,
        storage=storage_block,
        path=path,
        **kwargs,
    )
Now I'm trying to switch to
flow.deploy()
. However, there are no examples of how to do this with s3 and I'm running into problems. Any suggestions or examples?
n
hi @Stephen Lloyd - this message may be useful in your case you’ll want to convert your ecs block into a pool with publish_as_work_pool and then have something like flow.from_source(source=storage_block, entrypoint=…as before…).deploy(…)
s
I get this actually. I'm assuming that's because your endpoint is something like
flow.py:flow
Copy code
Traceback (most recent call last):
  File "/home/runner/work/prefect-orchestration/prefect-orchestration/flows/heartbeat/deployment.py", line 12, in <module>
    deployment = deploy(
                 ^^^^^^^
  File "/home/runner/work/prefect-orchestration/prefect-orchestration/utilities/prefect_deployment.py", line 65, in deploy
    flow.from_source(source=storage_block, entrypoint=path).deploy(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 259, in coroutine_wrapper
    return call()
           ^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
           ^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/flows.py", line 940, in from_source
    flow: "Flow" = await from_async.wait_for_call_in_new_thread(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 164, in wait_for_call_in_new_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/flows.py", line 1665, in load_flow_from_entrypoint
    path, func_name = entrypoint.rsplit(".", maxsplit=1)
    ^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 2, got 1)
I don't understand why it is necessary to restate the flows function if we are already using the flow function to execute the deployment. flow.py
Copy code
import platform
from prefect import flow, task, get_run_logger, __version__
from prefect.server.api.server import SERVER_API_VERSION
from emoji import emojize
from utilities.example import Do_Something

@task
def log_platform_info():
    logger = get_run_logger()
    emoji = emojize(":thumbs_up:")
    wiki = Do_Something()
    <http://logger.info|logger.info>(f"Hello from Wiki HeartBeat with ecr push changes: {wiki}")
    <http://logger.info|logger.info>("Python version = %s", platform.python_version())
    <http://logger.info|logger.info>(f"Prefect Version = {__version__}, Prefect API Version = {SERVER_API_VERSION} 🚀, I am {emoji}")

@flow
def heartbeat_flow():
    log_platform_info()
deployment.py
Copy code
from flow import heartbeat_flow
from utilities.prefect_deployment import deploy
from prefect.server.schemas.schedules import CronSchedule
import os

schedule = (
    CronSchedule(cron="0 * * * *")
    if os.getenv("ENV", "dev").lower() == "prod"
    else None
)

deployment = deploy(
    flow=heartbeat_flow,
    tags=["poc", "heartbeat", "NOOP", "hourly", "workpool"],
    description="Health check flow",
    schedule=schedule,
)

if __name__ == "__main__":
    deployment.apply()
prefect_deployment.py
Copy code
import os
from prefect.flows import Flow
from prefect.deployments import Deployment
from prefect_aws.ecs import ECSTask
from prefect.filesystems import S3
from collections import OrderedDict
from typing import Optional, Iterable

ACCOUNT_ID = {"dev": "1234", "prod": "6789"}


def deploy(
    flow: Flow,
    suffix_name: Optional[str] = None,
    cpu: Optional[int] = None,
    memory: Optional[int] = None,
    tags: Optional[Iterable[str]] = None,
    description: Optional[str] = None,
    **kwargs,
):
    """
    Configure a deployment for a given flow.
    Args:
        flow (required): A flow function to deploy
        suffix_name (optional): A name for the deployment
        cpu (optional): Number of CPUs to run the task. Default: 1024
        memory (optional): Size of memory to run the task. Default: 2048
        description (optional): Description of the flow.
        tags (optional): Tags associated with the flow.
        **kwargs: other keyword arguments to pass to the constructor for the `Deployment` class
    """

    run_env = os.getenv("ENV", "dev")

    tag_name = os.getenv("TAG_NAME")
    if tag_name is None:
        print("TAG_NAME is not set")
        exit(1)

    name = os.getenv("DEPLOYMENT_NAME")
    if name is None:
        print("DEPLOYMENT_NAME is not set")
        exit(1)
    list_name = [name, suffix_name]
    list_name = list(OrderedDict.fromkeys(list_name))
    deploy_name = "-".join(filter(None, list_name))

    path = os.getenv("STORAGE_PATH")
    if path is None:
        print("STORAGE_PATH is not set")
        exit(1)

    ecs_task_block = ECSTask.load("ecs-infra")
    storage_block = S3.load("prefect-s3-storage")
    image = f"{ACCOUNT_ID[run_env]}.<http://dkr.ecr.us-east-1.amazonaws.com/prefect:{tag_name}|dkr.ecr.us-east-1.amazonaws.com/prefect:{tag_name}>"

    override = {
        "image": image,
    }
    if cpu is not None:
        override["cpu"] = cpu
    if memory is not None:
        override["memory"] = memory

    flow.from_source(source=storage_block, entrypoint=path).deploy(
        name=deploy_name,
        job_variables=override,
        description=description,
        work_pool_name="engineering-work-pool",
        image=image,
    )
    # return Deployment.build_from_flow(
    #     flow=flow,
    #     name=deploy_name,
    #     infrastructure=ecs_task_block,
    #     job_variables=override,
    #     description=description,
    #     tags=tags,
    #     work_pool_name="engineering-work-pool",
    #     output=False,
    #     apply=True,
    #     storage=storage_block,
    #     path=path,
    #     **kwargs,
    # )
n
Copy code
File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/prefect/flows.py", line 1665, in load_flow_from_entrypoint
    path, func_name = entrypoint.rsplit(".", maxsplit=1)
    ^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 2, got 1)
this seems to be because you're not specifying a
:flow_decorated_function
at the end of your entrypoint
I don't understand why it is necessary to restate the flows function if we are already using the flow function to execute the deployment.
this should not be necessary what is the second file
deployment.py
for? the
apply()
method was used for agent based deployments but is not valid for workers
s
deployment.py allows us to have many deployments described in a single py file. We have some flows with 30+ deployments in production when we have multiple clients on a certain system.
this seems to be because you're not specifying a
:flow_decorated_function
at the end of your entrypoint
exactly. my question was more why do I need to have an object that is my
@flow
decorated function subsequently call
from_source()
which references my flow decorated function explicitly again. It seems like perhaps our utility
prefect_deployment.py
isn't totally necessary now and that I still need to remove other agent based deployment legacy stuff. I don't like the idea of referencing the function explicity in our py file after we are already importing it, but I'm not sure exactly why. I'l have to think more about that. Any other pointers?
@Nate I received this error
Copy code
Failed to submit flow run 'ded982c5-ccc0-4ed4-8cb3-1ca9eba37211' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 904, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 638, in run
    ) = await run_sync_in_worker_thread(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 95, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 750, in _create_task_and_wait_for_start
    self._report_task_run_creation_failure(configuration, task_run_request, exc)
  File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 746, in _create_task_and_wait_for_start
    task = self._create_task_run(ecs_client, task_run_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_aws/workers/ecs_worker.py", line 1632, in _create_task_run
    task = ecs_client.run_task(**task_run_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 553, in _api_call
    return self._make_api_call(operation_name, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 962, in _make_api_call
    request_dict = self._convert_to_request_dict(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 1036, in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/botocore/validate.py", line 381, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in networkConfiguration.awsvpcConfiguration: "subnets"
Unknown parameter in networkConfiguration.awsvpcConfiguration: "awsvpcConfiguration", must be one of: subnets, securityGroups, assignPublicIp
But
subnets
is definitely included under
awsvpcConfiguration
. This is from the flow run debug..
Copy code
"networkConfiguration": {
    "awsvpcConfiguration": {
      "awsvpcConfiguration": {
        "subnets": [
          "subnet-subone",
          "subnet-subtwo"
        ],
        "securityGroups": [
          "sg-mysg"
        ]
      }
and is in the workpool definition (image) Any ideas?
Interesting, Prefect is adding a layer in the dict it is using. Why is Prefect adding the
awsvpcConfiguration
key inside
awsvpcConfiguration
.
n
yeah this
"awsvpcConfiguration": {
"awsvpcConfiguration": {
looks weird to me is this from your work pools job template?
s
Yes.
The job template itself looks fine, as shown in the image. The definition that is printed out in a flow debug has the "double" nesting.
FYI - I'd prefer to dialog here and work things out, but I also opened a support ticket.
o
@Stephen Lloyd did you get this sorted out? We’re having a bit similar trouble with awsvpcConfiguration in ECS push work pools. It seems that the subnets we are setting in work pool settings > networkConfiguration are not respected.
s
Yes. Essentially,
publish_as_work_pool()
doesn't work quite right, it messed up a few things in the work pool base template. networkConfiguration needed manual editing. Remove any reference to awsvpcconfiguration and just insert the object with subnets key and value
Copy code
{
  "subnets": [
    "subnet-number1",
    "subnet-number2"
  ],
  "securityGroups": [
    "sg-number3"
  ]
}
Custom Images These were not being referenced correctly. We provide a custom image for every unique flow or flow source system. 1. We had to go into the advanced tab and edit the json for the image with a jinja reference to the image variable
"{{ image }}"
2. We had to remove the hard-coded image reference on the Defaults page of the base template. This hard-coded value overrode the advanced json value. job_variables vs overrides One half-confession/half-takeaway is that when I read the docs I assumed
job_variables
was the same as
overrides
, but this is not the case. This was a silly mistake on my part. They are actually variables and as far as I could tell, you need to reference those variables specifically somewhere in the template. Previously, one could just override an existing key's value, but now you need to use a variable to do that. We used an
image
variable to override the image, but I think we could have made it
xyz
if we wanted. I think the new system will be much more flexible for a number of reasons, but it felt counterintuitive to me at first for some reason. from_source().deploy() As far as I can tell this does not deploy code to a source like s3. We were already providing an s3 path in our deployment script and
build_from_flow()
would actually upload the relevant flow files to s3 for us. With the new method, Ihad to explicitly do this upload. Fortunately, the new
S3Bucket
class in
prefect-aws
made it easy with
put_directory
.
Copy code
path = os.getenv("STORAGE_PATH")
    if path is None:
        print("STORAGE_PATH is not set")
        exit(1)
    full_path = path + "/" + entrypoint

    storage_block = S3Bucket.load("prefect-aws-s3-storage")
    storage_block.put_directory(
        to_path=path,
        ignore_file=".prefectignore",
    )
prefect.filesystems is deprecated We had to move from
prefect.filesystems.s3
to
prefect-aws.S3Bucket
as mentioned above.
👍 2
🙏 1
j
Thank you @Stephen Lloyd. I used an excerpt of the above to create this issue. Feel free to add any additional context and follow there. We’ve got a member of the team assigned. Thanks again!
PR fix is merged! Should be released next Thurs. Thank you.
s
Awesome, thanks!