Stephen Lloyd
04/16/2024, 3:25 AMStephen Lloyd
04/16/2024, 5:39 AMprefect_deployment.py
file which defines a deploy()
function. We call this from a deployment.py
file defined for each flow. If a flow has multiple deployments, we specify them all in deployment.py
.
Our long running agent (now a worker) polls the work pool looking for jobs and then spins up new ECS tasks for any job.
We define a block "ecs-infra" as:
ecs = ECSTask(
aws_credentials=aws_credentials_block,
image=config["ecr_address"] + ":latest",
vpc_id=config["vpc_id"],
env={"ENV": os.getenv("ENV", "dev"), "PREFECT_API_ENABLE_HTTP2": False, "PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER": "json"},
# stream_output=None,
# configure_cloudwatch_logs=None,
cluster=config["cluster_name"],
launch_type="FARGATE",
execution_role_arn=config["execution_role_arn"],
task_role_arn=config["task_role_arn"],
task_customizations=[
{
"op": "add",
"path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
"value": [config["security_group"]],
},
{"op": "add", "path": "/networkConfiguration/awsvpcConfiguration/subnets", "value": config["subnets"]},
],
task_watch_poll_interval=5,
task_start_timeout_seconds=120,
task_definition={
"cpu": 512,
"memory": 1024,
"networkMode": "awsvpc",
"taskRoleArn": config["task_role_arn"],
"executionRoleArn": config["execution_role_arn"],
"containerDefinitions": [{
"name": "log_router",
"image": "<http://ouraccount.dkr.ecr.us-region.amazonaws.com/newrelic/logging-firelens-fluentbit|ouraccount.dkr.ecr.us-region.amazonaws.com/newrelic/logging-firelens-fluentbit>",
"essential": True,
"firelensConfiguration": {
"type": "fluentbit",
"options": {
"enable-ecs-log-metadata": "true"
}
}
},
{
"name": "prefect",
"cpu": 512,
"memory": 1024,
"image": config['ecr_address'] + ":latest",
"essential": True,
"logConfiguration": {
"options": {
"Name": "newrelic",
"apiKey": newrelic_license_key,
"endpoint": "<https://log-api.newrelic.com/log/v1>"
},
"logDriver": "awsfirelens"
}
}],
"placementConstraints": [],
"requiresCompatibilities": [
"FARGATE"
]
}
)
ecs.save("ecs-infra", overwrite=True)
This seems similar to the base job template.
Which we then override for specific flows as needed via prefect_deployment.deploy()
when we call it from a flows deployment.py
.
return Deployment.build_from_flow(
flow=flow,
name=deploy_name,
infrastructure=ecs_task_block,
infra_overrides=override,
description=description,
tags=tags,
work_queue_name="prefect-agent-queue",
output=False,
apply=True,
storage=storage_block,
path=path,
**kwargs,
)
I found some more docs that say workers are essentially (or very similar to) what we have done, but wrap the base infra as part of the work-pool definition. This makes sense and is conceptually the same.
I think we just need to define a base work-pool config, figure a way to spin it up from github actions, and then change a few lines of deployment code. Am I missing anything big?Nate
04/16/2024, 1:14 PMI think we just need to define a base work-pool config, figure a way to spin it up from github actions, and then change a few lines of deployment code. Am I missing anything big?I dont think so! most of your process should not need to change - this previous message might be helpful for you
Stephen Lloyd
04/17/2024, 3:35 AMNate
04/17/2024, 4:01 AMpublish_as_work_pool
method on infra blocks is intended as a 1-time-use method - as in, it translates your infra block into a work pool that you can use going forward (as opposed to copying your infra block config over piecemeal to the analogous fields on the work pool yourself)
infra blocks are deprecated and will eventually be removed from new versions of prefect
you can create work pools via the SDK if you want, although you can also dynamically create work pools by simply starting a corresponding worker
for example
prefect worker start --pool my-k8s-pool --type kubernetes --base-job-template my-template.json
Stephen Lloyd
04/18/2024, 8:24 AMorchestration.create_work_pool
function, actually. I guess I didn’t look close enough. Now that I looked at WorkPoolCreate
I see that I can pass the template in. Thanks so much.
I am having trouble calling orchestration.get_client
(not sure what httpx_settings I should pass in) as well as PrefectClient
directly. Any insights that might be helpful?Nate
04/18/2024, 3:48 PMNate
04/18/2024, 3:48 PMasync with get_client() as client:
await client.hello()
etc