https://prefect.io logo
Title
b

Ben Muller

11/01/2022, 6:09 AM
Hi Prefect, I am trying to get a working demo running with all python implementation. Our org uses AWS - I have deployed an agent to an ECS Service following some of the prefect recipes here I am now attempting to run a silly example flow from your docs with an
ECSTask
that I have successfully deployed :
import sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE

from prefect_aws.ecs import ECSTask

ecs_task_block = ECSTask.load("staging-test")


@task
def log_task(name):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", name)
    <http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
    logger.debug(AN_IMPORTED_MESSAGE)


@flow()
def log_flow(name: str):
    log_task(name)


if __name__ == "__main__":
    name = sys.argv[1]
    log_flow(name)
Our org uses 1.0 at present and we have never had to pass AWS Credentials - we dont use credentials like this as we usually use roles that you can assume. I believe this was the case in Prefect 1.0 if I remember correctly. All my agent has to do is assume a role ( and it has access to whatever I need it to with its task role that I set up in deployment ). So with these blocks all requiring credentials, I am wondering if these are optional and would be picked up by the default AWS environment variables if I leave them blank or I will need to configure some type of user access for a prefect machine user? Secondly: I am trying a deployment with storage - but it is coming up with an error:
from flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3

storage_block = S3(bucket_path="prefect-2-test")


deployment = Deployment.build_from_flow(
    flow=log_flow,
    name="log-simple",
    parameters={"name": "Marvin"},
    infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
    work_queue_name="staging",
    storage=S3.load("staging-test-block"),
)

if __name__ == "__main__":
    storage_block.save("staging-test-block", overwrite=True)
    deployment.apply()
ERROR:
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
    self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
Any ideas to help me overcome this?
Answering part of my own question with https://docs.prefect.io/concepts/filesystems/#handling-credentials-for-cloud-object-storage-services But still not sure how to configure it and avoid the errors 🙏
Hi @Anna Geller, hoping this wasn't missed, have you ever seen this error before?
a

Anna Geller

11/01/2022, 6:36 PM
I have not but I also never use Python Deployment objects, cool kids use CLI 😎
b

Ben Muller

11/01/2022, 6:55 PM
Do they though? I feel like the whole prefect selling point is its python compatible. I have noticed the docs are very much lacking for the python objects part of prefect. What would be my best course of action to get help with this?
a

Anna Geller

11/01/2022, 8:03 PM
Python for business logic YES, deployments is just a config of metadata you sent to the API so this doesn't have to be in Python
j

Jeff Hale

11/01/2022, 8:27 PM
Hi Ben. I modified your code to save the block outside the if name =main block and it works. I think that the code in that block gets run first, causing issues.
from logflow import log_it
from prefect.deployments import Deployment
from prefect.filesystems import S3
from time import sleep


storage_block = S3(bucket_path="my-first-bucket34")
storage_block.save("myawsblock3", overwrite=True)

deployment = Deployment.build_from_flow(
    flow=log_it,
    name="log_it",
    parameters={"name": "Marvin"},
    infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
    work_queue_name="staging",
    storage=S3.load("myawsblock3"),
)

if __name__ == "__main__":
    deployment.apply()
👍 2
:gratitude-thank-you: 2
b

Ben Muller

11/01/2022, 8:29 PM
Thank you @Jeff Hale!
@Anna Geller that's fair enough, but do prefect disagree with this view point? Otherwise why build out the capabilities for both options?
a

Anna Geller

11/01/2022, 8:30 PM
Prefect is not opinionated Anna Geller is opinionated
😆 1
if you ask Prefect, I'll tell you: we have both options, use one that you prefer if you ask me, I'll tell you: CLI is way easier to integrate with CI and I prefer that
j

Jeff Hale

11/01/2022, 8:33 PM
And just to be clear - I changed some variables in your code, so just follow the pattern, but copy-pasting my code won’t work.
b

Ben Muller

11/01/2022, 8:40 PM
Yep, thanks @Jeff Hale
I'll retort @Anna Geller. If you have a repo of flows that the data scientists use in your organisation, whom are used to python but find cli tools, yml and scripts overwhelming and you want them to self serve their own flows. Python wins. Writing a flow is super easy, so why should you have to wear a completely different 'hat' in order to deploy that flow?
Shouldn't the lines not be black and white like they are in your head and actually adapt to what is suitable for the best use case?
Hi @Jeff Hale I changed my code to this
from flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3

storage_block = S3(bucket_path="prefect-2-test")

storage_block.save("staging-test-block", overwrite=True)

deployment = Deployment.build_from_flow(
    flow=log_flow,
    name="log-simple",
    parameters={"name": "Marvin"},
    infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
    work_queue_name="staging",
    storage=S3.load("staging-test-block"),
)

if __name__ == "__main__":
    deployment.apply()
And I still am hit with the same error
Traceback (most recent call last):
  File "/Users/benmuller/code/prefect_two/deployments/log_flow.py", line 9, in <module>
    deployment = Deployment.build_from_flow(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/deployments.py", line 724, in build_from_flow
    await deployment.upload_to_storage(ignore_file=ignore_file)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/deployments.py", line 572, in upload_to_storage
    file_count = await self.storage.put_directory(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/filesystems.py", line 481, in put_directory
    return await self.filesystem.put_directory(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/filesystems.py", line 358, in put_directory
    self.filesystem.put_file(f, fpath, overwrite=True)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
    raise return_result
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 1056, in _put_file
    await self._call_s3(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 331, in _call_s3
    await self.set_session()
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 507, in set_session
    self._s3 = await s3creator.__aenter__()
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/session.py", line 26, in __aenter__
    self._client = await self._coro
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/session.py", line 193, in _create_client
    client = await client_creator.create_client(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
    self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
a

Anna Geller

11/01/2022, 9:33 PM
haha 😄 no different hats needed, in the end you need CLI anyways to run this Python script, you can't avoid CLI and why would you
b

Ben Muller

11/01/2022, 9:39 PM
because huge yml files are 🤮
a

Anna Geller

11/01/2022, 9:45 PM
Oh I don't advocate for YAML, only for the terminal
Agree with the YAML sentiment
b

Ben Muller

11/01/2022, 9:45 PM
ok cool - I assumed you were advocating for your CI set up here
👍 1
j

Jeff Hale

11/01/2022, 9:48 PM
I’m trying to figure out the source of the error. A similar setup seems to work fine for me. Two ideas 1.
from flows.log_flow import log_flow
there might be a name shadowing thing going on here somewhere. I don’t think so, but maybe. flows and log_flow are being used/popular. Namespacing should mean not an issue, but renaming might help. 2. You can pass the parameter for your flow when you create the deployment in
build_from_flow( )
- I don’t know if that is an issue, but it might be. So you don’t need the following:
name = sys.argv[1]
Example and discussion of making deployments here.
:gratitude-thank-you: 1
b

Ben Muller

11/01/2022, 9:49 PM
let me try quickly @Jeff Hale
nope, it is none of those issues
j

Jeff Hale

11/01/2022, 9:51 PM
what’s the result of
prefect version
?
b

Ben Muller

11/01/2022, 9:53 PM
Version:             2.6.5
API version:         0.8.3
Python version:      3.10.6
Git commit:          9fc2658f
Built:               Thu, Oct 27, 2022 2:24 PM
OS/Arch:             darwin/x86_64
Profile:             staging
Server type:         cloud
👍 1
j

Jeff Hale

11/01/2022, 9:57 PM
That’s almost exactly the same as mine. hmm
b

Ben Muller

11/01/2022, 9:58 PM
what is your botocore version?
j

Jeff Hale

11/01/2022, 9:58 PM
ha, ha - thinking alike looking at that and s3fs now
# Name                    Version                   Build  Channel
boto3                     1.24.74                  pypi_0    pypi
s3fs                      2022.5.0                 pypi_0    pypi
b

Ben Muller

11/01/2022, 9:59 PM
Name: s3fs
Version: 2022.10.0
Summary: Convenient Filesystem interface over S3
Home-page: <http://github.com/fsspec/s3fs/>
Author: 
Author-email: 
License: BSD
Location: /usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages
Requires: aiobotocore, aiohttp, fsspec
Required-by: 
---
Name: boto3
Version: 1.25.5
Summary: The AWS SDK for Python
Home-page: <https://github.com/boto/boto3>
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages
Requires: botocore, jmespath, s3transfer
Required-by: prefect-aws
🤷
j

Jeff Hale

11/01/2022, 10:00 PM
I did see that s3fs made some changes in their changelog. let me try upgrading and see what happens.
b

Ben Muller

11/01/2022, 10:00 PM
I will try downgrading!
THAT WAS THE ISSUE!!
🙌 1
j

Jeff Hale

11/01/2022, 10:01 PM
aiobotocore is the library in particular, it appears
b

Ben Muller

11/01/2022, 10:01 PM
dam you s3fs
🤣 1
ah ok
j

Jeff Hale

11/01/2022, 10:02 PM
I’ll raise it with our team that we’ll need to specify a dependency range (or adjust the current spec). The joy of 3rd-party open source library integrations!
Thanks for working through it! 🙂
b

Ben Muller

11/01/2022, 10:03 PM
thanks for your help!
👍 1
j

Jeff Hale

11/01/2022, 10:04 PM
What was your aiobotocore version?
b

Ben Muller

11/01/2022, 10:06 PM
Version: 2.3.4
sorry it was
2.4.0
and it downgraded it to
2.3.4
:thank-you: 1
While I have you @Jeff Hale - the deployment now works and everything is in my cloud account, but when I go to run the flow it is pending for a few seconds and then fails with no logs or anything. What is the best way to debug something like this?
j

Jeff Hale

11/01/2022, 10:12 PM
Are you running an agent locally?
b

Ben Muller

11/01/2022, 10:12 PM
I have one as a service in ECS
"""
pip install prefect -U
pip install prefect-aws
prefect block register -m prefect_aws.ecs
"""
from prefect_aws.ecs import ECSTask

ecs = ECSTask(
    image="prefecthq/prefect:2.6.5-python3.9",  # example image
    cpu=256,
    memory=512,
    stream_output=True,
    configure_cloudwatch_logs=True,
    cluster="prefect-cluster",
    execution_role_arn="arn:aws:iam::XXX:role/prefect-execution-role",
    task_role_arn="arn:aws:iam::XXX:role/prefect-task-role",
    launch_type="FARGATE",
    vpc_id="vpc-XXX",
    env={
        "PYTHONPATH": "$PYTHONPATH:.",
        "AWS_RETRY_MODE": "adaptive",
        "AWS_MAX_ATTEMPTS": "100",
    },
    task_customizations=[
        {
            "op": "replace",
            "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp",
            "value": "DISABLED",
        },
        {
            "op": "add",
            "path": "/networkConfiguration/awsvpcConfiguration/subnets",
            "value": ["subnet-XX", "subnet-XXX", "subnet-XXX"],
        },
        {
            "op": "add",
            "path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
            "value": ["sg-XXX"],
        },
    ],
)
ecs.save("staging-test", overwrite=True)
then in the deployment I added the kwarg
infrastructure=ecs,
ahh my agent must have to have the extra packages installed?
👍 1
I can see these logs on the agent
KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
prefect.agent - Failed to get infrastructure for flow run '1557a914-9f95-48f5-bea6-b7a470a7617a'.
j

Jeff Hale

11/01/2022, 10:28 PM
Were you able to save the block? I’m not an ECS pro, but
pip install prefect-aws
installs aiobotocore 2.4.0.
b

Ben Muller

11/01/2022, 10:33 PM
yeah the block is in the cloud UI
is prefect-aws in that image already?
j

Jeff Hale

11/01/2022, 10:37 PM
It’s not in prefecthq/prefect:2-latest
b

Ben Muller

11/01/2022, 10:37 PM
ok - I guess that might be an issue?
I use
prefecthq/prefect:2.6.5-python3.9
j

Jeff Hale

11/01/2022, 10:48 PM
I have to run, Ben. But I’ve asked my ECS pros to take a look. 🙂
🙌 1
But report issue on the aiobotcore submitted here.
b

Ben Muller

11/01/2022, 10:53 PM
thanks @Jeff Hale - I will keep an eye out for a reply
m

Mason Menges

11/01/2022, 11:08 PM
Hey @Ben Muller I think you hit the nail on the head a second ago the prefect-aws collection isn't part of the base image by default so when it tries to kick off the ECStask in your flow the block type doesn't exist, you should be able to define a custom image that includes the prefect-aws collection for this to run 😄
🙌 3
b

Ben Muller

11/02/2022, 2:06 AM
That did the job, cheers @Mason Menges
👍 1
🙌 1