CA Lee

    CA Lee

    1 year ago
    Hi all, using S3 for storage, ECS for execution, ECR for image repo. Prefect version:
    0.14.12
    , Python version:
    3.8.8
    , Docker base image:
    prefecthq/prefect:0.14.12-python3.8
    My flows are registered using CI (Github Actions) with the following shell command in a loop over my flows, to avoid re-registration of flows on each push (adapted from this guide):
    prefect register flow --skip-if-flow-metadata-unchanged
    I develop my tasks separately from my flows, as a Python package, which then gets built as a Docker container and pushed to ECR.

    _Expected behavior_:

    Flow pulls the latest flow code from S3 (from the most recent push), which then runs the latest task code from ECR

    _Observed behavior_:

    On each flow registration, a new file is uploaded to S3. However, on inspecting flow logs, the flow does not pull the latest version of the flow code stored in S3. This was verified by looking at the
    slugified-current-timestamp
    of the S3 flow - an earlier version of the flow is being pulled instead. I have also pulled
    image:latest
    from ECR on my machine, dropped into a shell using
    local run
    and confirmed that my task code in the Docker container was updated, but the flow code stored on S3 is not pulling the updated task code from ECR. Storage:
    STORAGE = S3(bucket="aws-ecs-flows")
    Run config:
    RUN_CONFIG = ECSRun(
        image="repository-url/image:latest",
        labels=['ecs']
    )
    Re-registering the flows without specifying
    --skip-if-flow-metadata-unchanged
    results in the expected behavior, but also leads to unwanted flow version bumps in the Prefect Cloud UI. The strange thing is that even though the tasks were developed (as a Docker container) separately from the flows, the flows doesn’t pull the latest task code from the Docker image, even when the image tag
    latest
    is specified (the task code seems to stop at the version which identified that the flow metadata was unchanged i.e.
    idempotency_key = flow.serialized_hash()
    , even if the task code was modified). Appreciate if anyone has any insight on how to make this work for ECS! Kubernetes has a flag called
    imagePullPolicy: always
    which may solve this issue, but I’m not aware if there is something similar for ECS.
    Here is how I would replicate the error: 1. CI tool is set up to do the following: a. Build, tag, push Docker image from Dockerfile to ECR b. Register flows to Prefect Cloud with arg
    --skip-if-flow-metadata-unchanged
    using S3 storage 2. On git push, initial image is built and flows are deployed to Prefect Cloud 3. Add an additional print statement to the task code which is built in the Docker image. Flows remain unchanged. 4. Git commit and push again to trigger CI 5. Run flow in Prefect Cloud (which calls the task code from the Docker container) - print statement does not appear 6. To verify print statement has been built in Docker image, on a local machine,
    docker pull image:latest
    ,
    docker run -it image:latest
    , inspect code that was copied into the Docker container to see that print statement appears 7. If flow deleted and re-registered, print statement appears when the flow is run in Prefect Cloud
    I've tested the above using
    KubernetesRun
    as well and ran into the same issue. I inspected the kube config file generated by
    prefect agent kubernetes install
    and found that there was a default setting:
    imagePullPolicy: Always
    However, if the flow is not re-registered, the latest image is not pulled.
    The error seems to be from how the tasks are defined, and subsequently imported in a flow. If you use a task decorator:
    @task
    def task_func():
      do something
      return something
    vs using a Task class:
    class TaskFunc(Task):
      def run(self):
        do something
        return something
    And your flow.py:
    from tasks import task_func
    
    with(...) as flow:
      task_func()
    
    flow.register(project_name="xxx", idempotency_key=flow.serialized_hash())
    The registered flow does not pick up any changes in the task_func code. If, however, you use a Task class, and your flow.py:
    from tasks import TaskFunc
    
    task_func = TaskFunc()
    
    with(...) as flow:
      task_func()
    
    flow.register(project_name="xxx", idempotency_key=flow.serialized_hash())
    And you subsequently change your task func code, the flow will pick up changes in the task func code. This is desired behavior as the flow code and metadata can remain unchanged on each subsequent flow registration, while you can make changes to the task code which is then built and pushed to a container repository, and the changes are then picked up by subsequent flow runs. Hoping someone can enlighten me on why this is happening, or how to make it work with @task decorators...
    m

    Marko Jamedzija

    1 year ago
    Hi, I’m experiencing the same problem as you. Did you manage to find the solution with using decorators? Thanks! 🙂
    CA Lee

    CA Lee

    1 year ago
    Hello all, sorry for bumping this thread again - I haven't been able to find a solution to this and was wondering if anyone else had insight as there is someone else having the same problem as me • Flow storage: S3 • Executor: ECS • Image: ECR Idea is to register the flow once, and make changes to the packaged code inside the Docker image instead of the flow code.
    flow.py
    - Deployed to Prefect Cloud
    from my_package import my_func
    
    @task
    def task_func():
      my_func()
    my_package.py
    - built inside Docker image
    def my_func():
      print('1')
    If I subsequently change
    print('1')
    to
    print('2')
    in
    my_package.py
    and re-deploy the flow (which had no changes), the flow run would still show
    print('1')
    If I re-write my code to use a Task class instead, things work as expected (subsequent flow runs will show the updated
    print('2')
    statement. Is there any way to achieve the same result, while using @task decorators instead of re-writing my package as Task classes?
    Michael Adkins

    Michael Adkins

    1 year ago
    Hey @CA Lee -- can you try using script-based flow storage instead of pickle-based? https://docs.prefect.io/orchestration/flow_config/storage.html#script-based-storage
    i.e. change
    STORAGE = S3(bucket="aws-ecs-flows")
    to
    STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
    m

    Marko Jamedzija

    1 year ago
    Hello 🙂 I tried this but after reading a bit through docs found out that flow modules have to be uploaded there manually prior to usage of this option (
    stored_as_script
    ). However, for GCS and S3 there’s an option
    local_script_path
    so it can be done automatically during the registration (at least I understood it like that), but not for Azure (which I use). So, 1. Did I understood this correctly and 2. Will Azure have this option in the future as well? Thanks!
    Michael Adkins

    Michael Adkins

    1 year ago
    Yes I believe you've understood this correctly, although I presume the
    local_script_path
    should be inferred from the context without you providing it. Azure appears to be missing this functionality but it definitely could be added easily.
    CA Lee

    CA Lee

    1 year ago
    Dear @Michael Adkins, is there any way for
    local_script_path
    to be inferred automatically? Am running into this error if I use
    stored_as_script=True
    for S3 storage:
    ValueError: A `key` must be provided to show where flow `.py` file is stored in S3 or provide a `local_script_path` pointing to a local script that contains the flow.
    To bypass the error, I could pass the absolute path of the flow.py into local_script_pathduring flow registration. To try to do this dynamically, I could do :
    from pathlib import Path
    local_script_path=str(Path(__file__).absolute()
    However, if the flow is subsequently run using ECS, this error occurs during execution:
    Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
    Appreciate any guidance at all, thank you
    Dear @Michael Adkins, is there any way for the
    local_script_path
    to be inferred automatically for S3 storage? If the arg
    local_script_path
    is not passed, the below error is encountered:
    ValueError: A `key` must be provided to show where flow `.py` file is stored in S3 or provide a `local_script_path` pointing to a local script that contains the flow.
    I have managed to get S3 storage working, but the solution seems clunky (pass the dynamic file path using the following code):
    import os, inspect
    
    S3(
      bucket="my-bucket-name",
      local_script_path=os.path.abspath(inspect.stack()[0][1]),
      stored_as_script=True,
      key="my-key-name"
    )
    Michael Adkins

    Michael Adkins

    1 year ago
    Hey @CA Lee -- sorry I missed your last message. This is outside the scope of what I can help with here in Slack chat, we'll have to look into this more in depth. Thanks for exploring and sharing some solutions, I really appreciate it.
    @Marvin open "S3 storage should be able to infer local_script_path"
    Marvin

    Marvin

    1 year ago
    Michael Adkins

    Michael Adkins

    1 year ago