I feel like this diagram from the docs could be th...
# prefect-aws
s
I feel like this diagram from the docs could be the same for the agent based workflow as well. What do we need to change to shift to the new workflow? What are the fundamental differences? What are some of the implications that we need to consider?
I have a
prefect_deployment.py
file which defines a
deploy()
function. We call this from a
deployment.py
file defined for each flow. If a flow has multiple deployments, we specify them all in
deployment.py
. Our long running agent (now a worker) polls the work pool looking for jobs and then spins up new ECS tasks for any job. We define a block "ecs-infra" as:
Copy code
ecs = ECSTask(
    aws_credentials=aws_credentials_block,
    image=config["ecr_address"] + ":latest",
    vpc_id=config["vpc_id"],
    env={"ENV": os.getenv("ENV", "dev"), "PREFECT_API_ENABLE_HTTP2": False, "PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER": "json"},
    # stream_output=None,
    # configure_cloudwatch_logs=None,
    cluster=config["cluster_name"],
    launch_type="FARGATE",
    execution_role_arn=config["execution_role_arn"],
    task_role_arn=config["task_role_arn"],
    task_customizations=[
        {
            "op": "add",
            "path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
            "value": [config["security_group"]],
        },
        {"op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": config["subnets"]},
    ],
    task_watch_poll_interval=5,
    task_start_timeout_seconds=120,
    task_definition={
        "cpu": 512,
        "memory": 1024,
        "networkMode": "awsvpc",
        "taskRoleArn": config["task_role_arn"],
        "executionRoleArn": config["execution_role_arn"],
        "containerDefinitions": [{
            "name": "log_router",
            "image": "<http://ouraccount.dkr.ecr.us-region.amazonaws.com/newrelic/logging-firelens-fluentbit|ouraccount.dkr.ecr.us-region.amazonaws.com/newrelic/logging-firelens-fluentbit>",
            "essential": True,
            "firelensConfiguration": {
                "type": "fluentbit",
                "options": {
                "enable-ecs-log-metadata": "true"
                }
            }
        },
        {
            "name": "prefect",
             "cpu": 512,
             "memory": 1024,
            "image": config['ecr_address'] + ":latest",
            "essential": True,
            "logConfiguration": {
                "options": {
                "Name": "newrelic",
                "apiKey": newrelic_license_key,
                "endpoint": "<https://log-api.newrelic.com/log/v1>"
                },
                "logDriver": "awsfirelens"
            }
        }],
        "placementConstraints": [],
        "requiresCompatibilities": [
            "FARGATE"
        ]
    }
)
ecs.save("ecs-infra", overwrite=True)
This seems similar to the base job template. Which we then override for specific flows as needed via
prefect_deployment.deploy()
when we call it from a flows
deployment.py
.
Copy code
return Deployment.build_from_flow(
        flow=flow,
        name=deploy_name,
        infrastructure=ecs_task_block,
        infra_overrides=override,
        description=description,
        tags=tags,
        work_queue_name="prefect-agent-queue",
        output=False,
        apply=True,
        storage=storage_block,
        path=path,
        **kwargs,
    )
I found some more docs that say workers are essentially (or very similar to) what we have done, but wrap the base infra as part of the work-pool definition. This makes sense and is conceptually the same. I think we just need to define a base work-pool config, figure a way to spin it up from github actions, and then change a few lines of deployment code. Am I missing anything big?
n
I think we just need to define a base work-pool config, figure a way to spin it up from github actions, and then change a few lines of deployment code. Am I missing anything big?
I dont think so! most of your process should not need to change - this previous message might be helpful for you
s
@Nate I saw a function that published a work pool from an infra block. I'm assuming that infra blocks aren't going away and it's ok to just leverage that function using our current setup. Am I safe in that assumption? Are there any plans on the horizon that would require us to do more rework later? I couldn't find any other way to define a work pool using the python api. I actually would prefer that we define a work pool in python rather than resort to creating it from an infra block. Did I miss a function that can do that?
n
the
publish_as_work_pool
method on infra blocks is intended as a 1-time-use method - as in, it translates your infra block into a work pool that you can use going forward (as opposed to copying your infra block config over piecemeal to the analogous fields on the work pool yourself) infra blocks are deprecated and will eventually be removed from new versions of prefect you can create work pools via the SDK if you want, although you can also dynamically create work pools by simply starting a corresponding worker for example
Copy code
prefect worker start --pool my-k8s-pool --type kubernetes --base-job-template my-template.json
s
I did see the
orchestration.create_work_pool
function, actually. I guess I didn’t look close enough. Now that I looked at
WorkPoolCreate
I see that I can pass the template in. Thanks so much. I am having trouble calling
orchestration.get_client
(not sure what httpx_settings I should pass in) as well as
PrefectClient
directly. Any insights that might be helpful?
n
you have to use the client as an async context manager
so
Copy code
async with get_client() as client:
  await client.hello()
etc