Hi Prefect, I am trying to get a working demo runn...
# prefect-community
b
Hi Prefect, I am trying to get a working demo running with all python implementation. Our org uses AWS - I have deployed an agent to an ECS Service following some of the prefect recipes here I am now attempting to run a silly example flow from your docs with an
ECSTask
that I have successfully deployed :
Copy code
import sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE

from prefect_aws.ecs import ECSTask

ecs_task_block = ECSTask.load("staging-test")


@task
def log_task(name):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", name)
    <http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
    logger.debug(AN_IMPORTED_MESSAGE)


@flow()
def log_flow(name: str):
    log_task(name)


if __name__ == "__main__":
    name = sys.argv[1]
    log_flow(name)
Our org uses 1.0 at present and we have never had to pass AWS Credentials - we dont use credentials like this as we usually use roles that you can assume. I believe this was the case in Prefect 1.0 if I remember correctly. All my agent has to do is assume a role ( and it has access to whatever I need it to with its task role that I set up in deployment ). So with these blocks all requiring credentials, I am wondering if these are optional and would be picked up by the default AWS environment variables if I leave them blank or I will need to configure some type of user access for a prefect machine user? Secondly: I am trying a deployment with storage - but it is coming up with an error:
Copy code
from flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3

storage_block = S3(bucket_path="prefect-2-test")


deployment = Deployment.build_from_flow(
    flow=log_flow,
    name="log-simple",
    parameters={"name": "Marvin"},
    infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
    work_queue_name="staging",
    storage=S3.load("staging-test-block"),
)

if __name__ == "__main__":
    storage_block.save("staging-test-block", overwrite=True)
    deployment.apply()
ERROR:
Copy code
File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
    self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
Any ideas to help me overcome this?
Answering part of my own question with https://docs.prefect.io/concepts/filesystems/#handling-credentials-for-cloud-object-storage-services But still not sure how to configure it and avoid the errors 🙏
Hi @Anna Geller, hoping this wasn't missed, have you ever seen this error before?
a
I have not but I also never use Python Deployment objects, cool kids use CLI 😎
b
Do they though? I feel like the whole prefect selling point is its python compatible. I have noticed the docs are very much lacking for the python objects part of prefect. What would be my best course of action to get help with this?
a
Python for business logic YES, deployments is just a config of metadata you sent to the API so this doesn't have to be in Python
j
Hi Ben. I modified your code to save the block outside the if name =main block and it works. I think that the code in that block gets run first, causing issues.
Copy code
from logflow import log_it
from prefect.deployments import Deployment
from prefect.filesystems import S3
from time import sleep


storage_block = S3(bucket_path="my-first-bucket34")
storage_block.save("myawsblock3", overwrite=True)

deployment = Deployment.build_from_flow(
    flow=log_it,
    name="log_it",
    parameters={"name": "Marvin"},
    infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
    work_queue_name="staging",
    storage=S3.load("myawsblock3"),
)

if __name__ == "__main__":
    deployment.apply()
👍 2
gratitude thank you 2
b
Thank you @Jeff Hale!
@Anna Geller that's fair enough, but do prefect disagree with this view point? Otherwise why build out the capabilities for both options?
a
Prefect is not opinionated Anna Geller is opinionated
😆 1
if you ask Prefect, I'll tell you: we have both options, use one that you prefer if you ask me, I'll tell you: CLI is way easier to integrate with CI and I prefer that
j
And just to be clear - I changed some variables in your code, so just follow the pattern, but copy-pasting my code won’t work.
b
Yep, thanks @Jeff Hale
I'll retort @Anna Geller. If you have a repo of flows that the data scientists use in your organisation, whom are used to python but find cli tools, yml and scripts overwhelming and you want them to self serve their own flows. Python wins. Writing a flow is super easy, so why should you have to wear a completely different 'hat' in order to deploy that flow?
Shouldn't the lines not be black and white like they are in your head and actually adapt to what is suitable for the best use case?
Hi @Jeff Hale I changed my code to this
Copy code
from flows.log_flow import log_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3

storage_block = S3(bucket_path="prefect-2-test")

storage_block.save("staging-test-block", overwrite=True)

deployment = Deployment.build_from_flow(
    flow=log_flow,
    name="log-simple",
    parameters={"name": "Marvin"},
    infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
    work_queue_name="staging",
    storage=S3.load("staging-test-block"),
)

if __name__ == "__main__":
    deployment.apply()
And I still am hit with the same error
Copy code
Traceback (most recent call last):
  File "/Users/benmuller/code/prefect_two/deployments/log_flow.py", line 9, in <module>
    deployment = Deployment.build_from_flow(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/deployments.py", line 724, in build_from_flow
    await deployment.upload_to_storage(ignore_file=ignore_file)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/deployments.py", line 572, in upload_to_storage
    file_count = await self.storage.put_directory(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/filesystems.py", line 481, in put_directory
    return await self.filesystem.put_directory(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/prefect/filesystems.py", line 358, in put_directory
    self.filesystem.put_file(f, fpath, overwrite=True)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
    raise return_result
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 1056, in _put_file
    await self._call_s3(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 331, in _call_s3
    await self.set_session()
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/s3fs/core.py", line 507, in set_session
    self._s3 = await s3creator.__aenter__()
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/session.py", line 26, in __aenter__
    self._client = await self._coro
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/session.py", line 193, in _create_client
    client = await client_creator.create_client(
  File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
    self._register_s3_control_events(
TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
a
haha 😄 no different hats needed, in the end you need CLI anyways to run this Python script, you can't avoid CLI and why would you
b
because huge yml files are 🤮
a
Oh I don't advocate for YAML, only for the terminal
Agree with the YAML sentiment
b
ok cool - I assumed you were advocating for your CI set up here
👍 1
j
I’m trying to figure out the source of the error. A similar setup seems to work fine for me. Two ideas 1.
from flows.log_flow import log_flow
there might be a name shadowing thing going on here somewhere. I don’t think so, but maybe. flows and log_flow are being used/popular. Namespacing should mean not an issue, but renaming might help. 2. You can pass the parameter for your flow when you create the deployment in
build_from_flow( )
- I don’t know if that is an issue, but it might be. So you don’t need the following:
Copy code
name = sys.argv[1]
Example and discussion of making deployments here.
gratitude thank you 1
b
let me try quickly @Jeff Hale
nope, it is none of those issues
j
what’s the result of
prefect version
?
b
Copy code
Version:             2.6.5
API version:         0.8.3
Python version:      3.10.6
Git commit:          9fc2658f
Built:               Thu, Oct 27, 2022 2:24 PM
OS/Arch:             darwin/x86_64
Profile:             staging
Server type:         cloud
👍 1
j
That’s almost exactly the same as mine. hmm
b
what is your botocore version?
j
ha, ha - thinking alike looking at that and s3fs now
Copy code
# Name                    Version                   Build  Channel
boto3                     1.24.74                  pypi_0    pypi
s3fs                      2022.5.0                 pypi_0    pypi
b
Copy code
Name: s3fs
Version: 2022.10.0
Summary: Convenient Filesystem interface over S3
Home-page: <http://github.com/fsspec/s3fs/>
Author: 
Author-email: 
License: BSD
Location: /usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages
Requires: aiobotocore, aiohttp, fsspec
Required-by: 
---
Name: boto3
Version: 1.25.5
Summary: The AWS SDK for Python
Home-page: <https://github.com/boto/boto3>
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages
Requires: botocore, jmespath, s3transfer
Required-by: prefect-aws
🤷
j
I did see that s3fs made some changes in their changelog. let me try upgrading and see what happens.
b
I will try downgrading!
THAT WAS THE ISSUE!!
🙌 1
j
aiobotocore is the library in particular, it appears
b
dam you s3fs
🤣 1
ah ok
j
I’ll raise it with our team that we’ll need to specify a dependency range (or adjust the current spec). The joy of 3rd-party open source library integrations!
Thanks for working through it! 🙂
b
thanks for your help!
👍 1
j
What was your aiobotocore version?
b
Version: 2.3.4
sorry it was
2.4.0
and it downgraded it to
2.3.4
🙏 1
While I have you @Jeff Hale - the deployment now works and everything is in my cloud account, but when I go to run the flow it is pending for a few seconds and then fails with no logs or anything. What is the best way to debug something like this?
j
Are you running an agent locally?
b
I have one as a service in ECS
Copy code
"""
pip install prefect -U
pip install prefect-aws
prefect block register -m prefect_aws.ecs
"""
from prefect_aws.ecs import ECSTask

ecs = ECSTask(
    image="prefecthq/prefect:2.6.5-python3.9",  # example image
    cpu=256,
    memory=512,
    stream_output=True,
    configure_cloudwatch_logs=True,
    cluster="prefect-cluster",
    execution_role_arn="arn:aws:iam::XXX:role/prefect-execution-role",
    task_role_arn="arn:aws:iam::XXX:role/prefect-task-role",
    launch_type="FARGATE",
    vpc_id="vpc-XXX",
    env={
        "PYTHONPATH": "$PYTHONPATH:.",
        "AWS_RETRY_MODE": "adaptive",
        "AWS_MAX_ATTEMPTS": "100",
    },
    task_customizations=[
        {
            "op": "replace",
            "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp",
            "value": "DISABLED",
        },
        {
            "op": "add",
            "path": "/networkConfiguration/awsvpcConfiguration/subnets",
            "value": ["subnet-XX", "subnet-XXX", "subnet-XXX"],
        },
        {
            "op": "add",
            "path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
            "value": ["sg-XXX"],
        },
    ],
)
ecs.save("staging-test", overwrite=True)
then in the deployment I added the kwarg
infrastructure=ecs,
ahh my agent must have to have the extra packages installed?
👍 1
I can see these logs on the agent
Copy code
KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
Copy code
prefect.agent - Failed to get infrastructure for flow run '1557a914-9f95-48f5-bea6-b7a470a7617a'.
j
Were you able to save the block? I’m not an ECS pro, but
pip install prefect-aws
installs aiobotocore 2.4.0.
b
yeah the block is in the cloud UI
is prefect-aws in that image already?
j
It’s not in prefecthq/prefect:2-latest
b
ok - I guess that might be an issue?
I use
prefecthq/prefect:2.6.5-python3.9
j
I have to run, Ben. But I’ve asked my ECS pros to take a look. 🙂
🙌 1
But report issue on the aiobotcore submitted here.
b
thanks @Jeff Hale - I will keep an eye out for a reply
m
Hey @Ben Muller I think you hit the nail on the head a second ago the prefect-aws collection isn't part of the base image by default so when it tries to kick off the ECStask in your flow the block type doesn't exist, you should be able to define a custom image that includes the prefect-aws collection for this to run 😄
🙌 3
b
That did the job, cheers @Mason Menges
👍 1
🙌 1