Ben Muller
11/01/2022, 6:09 AMECSTask
that I have successfully deployed :
import sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE
from prefect_aws.ecs import ECSTask
ecs_task_block = ECSTask.load("staging-test")
@task
def log_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", name)
<http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
logger.debug(AN_IMPORTED_MESSAGE)
@flow()
def log_flow(name: str):
log_task(name)
if __name__ == "__main__":
name = sys.argv[1]
log_flow(name)
Our org uses 1.0 at present and we have never had to pass AWS Credentials - we dont use credentials like this as we usually use roles that you can assume. I believe this was the case in Prefect 1.0 if I remember correctly. All my agent has to do is assume a role ( and it has access to whatever I need it to with its task role that I set up in deployment ).
So with these blocks all requiring credentials, I am wondering if these are optional and would be picked up by the default AWS environment variables if I leave them blank or I will need to configure some type of user access for a prefect machine user?
Secondly: I am trying a deployment with storage - but it is coming up with an error:
from flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3
storage_block = S3(bucket_path="prefect-2-test")
deployment = Deployment.build_from_flow(
flow=log_flow,
name="log-simple",
parameters={"name": "Marvin"},
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_queue_name="staging",
storage=S3.load("staging-test-block"),
)
if __name__ == "__main__":
storage_block.save("staging-test-block", overwrite=True)
deployment.apply()
ERROR:
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
Any ideas to help me overcome this?Ben Muller
11/01/2022, 9:19 AMBen Muller
11/01/2022, 6:29 PMAnna Geller
Ben Muller
11/01/2022, 6:55 PMAnna Geller
Jeff Hale
11/01/2022, 8:27 PMfrom logflow import log_it
from prefect.deployments import Deployment
from prefect.filesystems import S3
from time import sleep
storage_block = S3(bucket_path="my-first-bucket34")
storage_block.save("myawsblock3", overwrite=True)
deployment = Deployment.build_from_flow(
flow=log_it,
name="log_it",
parameters={"name": "Marvin"},
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_queue_name="staging",
storage=S3.load("myawsblock3"),
)
if __name__ == "__main__":
deployment.apply()
Ben Muller
11/01/2022, 8:29 PMBen Muller
11/01/2022, 8:30 PMAnna Geller
Anna Geller
Jeff Hale
11/01/2022, 8:33 PMBen Muller
11/01/2022, 8:40 PMBen Muller
11/01/2022, 8:43 PMBen Muller
11/01/2022, 8:46 PMBen Muller
11/01/2022, 9:09 PMfrom flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3
storage_block = S3(bucket_path="prefect-2-test")
storage_block.save("staging-test-block", overwrite=True)
deployment = Deployment.build_from_flow(
flow=log_flow,
name="log-simple",
parameters={"name": "Marvin"},
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_queue_name="staging",
storage=S3.load("staging-test-block"),
)
if __name__ == "__main__":
deployment.apply()
And I still am hit with the same errorBen Muller
11/01/2022, 9:10 PMTraceback (most recent call last):
File "/Users/benmuller/code/prefect_two/deployments/log_flow.py", line 9, in <module>
deployment = Deployment.build_from_flow(
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/deployments.py", line 724, in build_from_flow
await deployment.upload_to_storage(ignore_file=ignore_file)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/deployments.py", line 572, in upload_to_storage
file_count = await self.storage.put_directory(
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/filesystems.py", line 481, in put_directory
return await self.filesystem.put_directory(
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/filesystems.py", line 358, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
raise return_result
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 1056, in _put_file
await self._call_s3(
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 331, in _call_s3
await self.set_session()
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 507, in set_session
self._s3 = await s3creator.__aenter__()
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/session.py", line 26, in __aenter__
self._client = await self._coro
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/session.py", line 193, in _create_client
client = await client_creator.create_client(
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
Anna Geller
Ben Muller
11/01/2022, 9:39 PMAnna Geller
Anna Geller
Ben Muller
11/01/2022, 9:45 PMJeff Hale
11/01/2022, 9:48 PMfrom flows.log_flow import log_flow
there might be a name shadowing thing going on here somewhere. I don’t think so, but maybe. flows and log_flow are being used/popular. Namespacing should mean not an issue, but renaming might help.
2. You can pass the parameter for your flow when you create the deployment in build_from_flow( )
- I don’t know if that is an issue, but it might be. So you don’t need the following:
name = sys.argv[1]
Example and discussion of making deployments here.Ben Muller
11/01/2022, 9:49 PMBen Muller
11/01/2022, 9:51 PMJeff Hale
11/01/2022, 9:51 PMprefect version
?Ben Muller
11/01/2022, 9:53 PMVersion: 2.6.5
API version: 0.8.3
Python version: 3.10.6
Git commit: 9fc2658f
Built: Thu, Oct 27, 2022 2:24 PM
OS/Arch: darwin/x86_64
Profile: staging
Server type: cloud
Jeff Hale
11/01/2022, 9:57 PMBen Muller
11/01/2022, 9:58 PMJeff Hale
11/01/2022, 9:58 PMJeff Hale
11/01/2022, 9:58 PM# Name Version Build Channel
boto3 1.24.74 pypi_0 pypi
s3fs 2022.5.0 pypi_0 pypi
Ben Muller
11/01/2022, 9:59 PMName: s3fs
Version: 2022.10.0
Summary: Convenient Filesystem interface over S3
Home-page: <http://github.com/fsspec/s3fs/>
Author:
Author-email:
License: BSD
Location: /usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages
Requires: aiobotocore, aiohttp, fsspec
Required-by:
---
Name: boto3
Version: 1.25.5
Summary: The AWS SDK for Python
Home-page: <https://github.com/boto/boto3>
Author: Amazon Web Services
Author-email:
License: Apache License 2.0
Location: /usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages
Requires: botocore, jmespath, s3transfer
Required-by: prefect-aws
Ben Muller
11/01/2022, 9:59 PMJeff Hale
11/01/2022, 10:00 PMBen Muller
11/01/2022, 10:00 PMBen Muller
11/01/2022, 10:01 PMJeff Hale
11/01/2022, 10:01 PMBen Muller
11/01/2022, 10:01 PMBen Muller
11/01/2022, 10:01 PMJeff Hale
11/01/2022, 10:02 PMJeff Hale
11/01/2022, 10:03 PMBen Muller
11/01/2022, 10:03 PMJeff Hale
11/01/2022, 10:04 PMBen Muller
11/01/2022, 10:06 PMBen Muller
11/01/2022, 10:07 PM2.4.0
and it downgraded it to 2.3.4
Ben Muller
11/01/2022, 10:10 PMJeff Hale
11/01/2022, 10:12 PMBen Muller
11/01/2022, 10:12 PMBen Muller
11/01/2022, 10:13 PM"""
pip install prefect -U
pip install prefect-aws
prefect block register -m prefect_aws.ecs
"""
from prefect_aws.ecs import ECSTask
ecs = ECSTask(
image="prefecthq/prefect:2.6.5-python3.9", # example image
cpu=256,
memory=512,
stream_output=True,
configure_cloudwatch_logs=True,
cluster="prefect-cluster",
execution_role_arn="arn:aws:iam::XXX:role/prefect-execution-role",
task_role_arn="arn:aws:iam::XXX:role/prefect-task-role",
launch_type="FARGATE",
vpc_id="vpc-XXX",
env={
"PYTHONPATH": "$PYTHONPATH:.",
"AWS_RETRY_MODE": "adaptive",
"AWS_MAX_ATTEMPTS": "100",
},
task_customizations=[
{
"op": "replace",
"path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp",
"value": "DISABLED",
},
{
"op": "add",
"path": "/networkConfiguration/awsvpcConfiguration/subnets",
"value": ["subnet-XX", "subnet-XXX", "subnet-XXX"],
},
{
"op": "add",
"path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
"value": ["sg-XXX"],
},
],
)
ecs.save("staging-test", overwrite=True)
Ben Muller
11/01/2022, 10:13 PMinfrastructure=ecs,
Ben Muller
11/01/2022, 10:23 PMBen Muller
11/01/2022, 10:24 PMBen Muller
11/01/2022, 10:24 PMKeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
Ben Muller
11/01/2022, 10:25 PMprefect.agent - Failed to get infrastructure for flow run '1557a914-9f95-48f5-bea6-b7a470a7617a'.
Jeff Hale
11/01/2022, 10:28 PMpip install prefect-aws
installs aiobotocore 2.4.0.Ben Muller
11/01/2022, 10:33 PMBen Muller
11/01/2022, 10:34 PMBen Muller
11/01/2022, 10:34 PMJeff Hale
11/01/2022, 10:37 PMBen Muller
11/01/2022, 10:37 PMBen Muller
11/01/2022, 10:38 PMprefecthq/prefect:2.6.5-python3.9
Jeff Hale
11/01/2022, 10:48 PMJeff Hale
11/01/2022, 10:50 PMBen Muller
11/01/2022, 10:53 PMMason Menges
11/01/2022, 11:08 PMBen Muller
11/02/2022, 2:06 AM