Hi guys, I am a SDE and my organization is trying ...
# ask-community
s
Hi guys, I am a SDE and my organization is trying to use Prefect along with ECS to run tasks more effectively. I am facing an error for quite some time now and trying to debug it, was wondering if someone can point me to something that I may be missing. I am trying to run a Prefect agent on an EC2 instance, which polls the prefect server running on the same EC2 instance. I am using a simple Python script to send tasks to the server from my local machine, and the agent should pick up these tasks, send it to an ECS cluster where Fargate instances should pick them up and start running them. Most of the flow runs fine except the Fargate instance is not able to pick Flow metadata from the S3 storage. The error I get is
Failed to load and execute Flow's environment: UnpicklingError("invalid load key, '{'.")
I have configured the roles to have complete S3, ECS access. I have checked the file on S3 as well and it gets uploaded fine,
{"flow": "gASVPgk....aFtdlHViLg==", "versions": {"cloudpickle": "2.0.0", "prefect": "0.15.6", "python": "3.7.10"}}
. I think the only place it is messing up is that the Fargate instance is not able to pick up the task properly. Any idea what I may be missing? Really appreciate the help. Thanks in advance.
k
Hey @Sanil Khurana, I would suspect this is related to a mismatch of versions. (maybe cloudpickle). It looks like the EC2 may have different versions of Prefect or cloudpickle? Or could you show me how you define the S3 storage? Just remove sensitive information. IT could be that as well.
s
@Kevin Kho Thanks for replying. Here is the relevant info, For the EC2 instance running the
Copy code
cloudpickle==2.0.0
prefect==0.15.6
Here is my python script
Copy code
import prefect
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow
import pandas as pd

RUN_CONFIG = ECSRun(
    task_role_arn="arn:aws:iam::***:role/ecsTaskExecutionRole",
    image="anisienia/prefect-pydata",
    memory=512, cpu=256
)
STORAGE = S3(bucket = '***')

@task
def say_hello():
    logger = prefect.context.get("logger")
    df = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
    <http://logger.info|logger.info>(f"Hello form prefect {df}")
    
with Flow("s3_pandas", storage = STORAGE, run_config = RUN_CONFIG) as flow:
    say_hello()
    
flow.register(project_name = my_proj")
I think you may be correct, the image I am using was built 8 months ago, from the prefect version
0.14.1
. Thanks for replying! I will take a look at it if that is the case
k
Ah yeah check that registration versions, agent versions, and the image versions are aligned
s
@Kevin Kho Thanks Kevin! It was a versioning issue after all, fixed both prefect's docker image version and python version and it is finally working.
k
Nice work!