Hi all, using S3 for storage, ECS for execution, E...
# ask-community
c
Hi all, using S3 for storage, ECS for execution, ECR for image repo. Prefect version:
0.14.12
, Python version:
3.8.8
, Docker base image:
prefecthq/prefect:0.14.12-python3.8
My flows are registered using CI (Github Actions) with the following shell command in a loop over my flows, to avoid re-registration of flows on each push (adapted from this guide):
prefect register flow --skip-if-flow-metadata-unchanged
I develop my tasks separately from my flows, as a Python package, which then gets built as a Docker container and pushed to ECR. _Expected behavior_: --------------------- Flow pulls the latest flow code from S3 (from the most recent push), which then runs the latest task code from ECR _Observed behavior_: ---------------------- On each flow registration, a new file is uploaded to S3. However, on inspecting flow logs, the flow does not pull the latest version of the flow code stored in S3. This was verified by looking at the
slugified-current-timestamp
of the S3 flow - an earlier version of the flow is being pulled instead. I have also pulled
image:latest
from ECR on my machine, dropped into a shell using
local run
and confirmed that my task code in the Docker container was updated, but the flow code stored on S3 is not pulling the updated task code from ECR. Storage:
Copy code
STORAGE = S3(bucket="aws-ecs-flows")
Run config:
Copy code
RUN_CONFIG = ECSRun(
    image="repository-url/image:latest",
    labels=['ecs']
)
Re-registering the flows without specifying
--skip-if-flow-metadata-unchanged
results in the expected behavior, but also leads to unwanted flow version bumps in the Prefect Cloud UI. The strange thing is that even though the tasks were developed (as a Docker container) separately from the flows, the flows doesn’t pull the latest task code from the Docker image, even when the image tag
latest
is specified (the task code seems to stop at the version which identified that the flow metadata was unchanged i.e.
idempotency_key = flow.serialized_hash()
, even if the task code was modified). Appreciate if anyone has any insight on how to make this work for ECS! Kubernetes has a flag called
imagePullPolicy: always
which may solve this issue, but I’m not aware if there is something similar for ECS.
Here is how I would replicate the error: 1. CI tool is set up to do the following: a. Build, tag, push Docker image from Dockerfile to ECR b. Register flows to Prefect Cloud with arg
--skip-if-flow-metadata-unchanged
using S3 storage 2. On git push, initial image is built and flows are deployed to Prefect Cloud 3. Add an additional print statement to the task code which is built in the Docker image. Flows remain unchanged. 4. Git commit and push again to trigger CI 5. Run flow in Prefect Cloud (which calls the task code from the Docker container) - print statement does not appear 6. To verify print statement has been built in Docker image, on a local machine,
docker pull image:latest
,
docker run -it image:latest
, inspect code that was copied into the Docker container to see that print statement appears 7. If flow deleted and re-registered, print statement appears when the flow is run in Prefect Cloud
I've tested the above using
KubernetesRun
as well and ran into the same issue. I inspected the kube config file generated by
prefect agent kubernetes install
and found that there was a default setting:
imagePullPolicy: Always
However, if the flow is not re-registered, the latest image is not pulled.
The error seems to be from how the tasks are defined, and subsequently imported in a flow. If you use a task decorator:
Copy code
@task
def task_func():
  do something
  return something
vs using a Task class:
Copy code
class TaskFunc(Task):
  def run(self):
    do something
    return something
And your flow.py:
Copy code
from tasks import task_func

with(...) as flow:
  task_func()

flow.register(project_name="xxx", idempotency_key=flow.serialized_hash())
The registered flow does not pick up any changes in the task_func code. If, however, you use a Task class, and your flow.py:
Copy code
from tasks import TaskFunc

task_func = TaskFunc()

with(...) as flow:
  task_func()

flow.register(project_name="xxx", idempotency_key=flow.serialized_hash())
And you subsequently change your task func code, the flow will pick up changes in the task func code. This is desired behavior as the flow code and metadata can remain unchanged on each subsequent flow registration, while you can make changes to the task code which is then built and pushed to a container repository, and the changes are then picked up by subsequent flow runs. Hoping someone can enlighten me on why this is happening, or how to make it work with @task decorators...
m
Hi, I’m experiencing the same problem as you. Did you manage to find the solution with using decorators? Thanks! 🙂
c
Hello all, sorry for bumping this thread again - I haven't been able to find a solution to this and was wondering if anyone else had insight as there is someone else having the same problem as me • Flow storage: S3 • Executor: ECS • Image: ECR Idea is to register the flow once, and make changes to the packaged code inside the Docker image instead of the flow code.
flow.py
- Deployed to Prefect Cloud
Copy code
from my_package import my_func

@task
def task_func():
  my_func()
my_package.py
- built inside Docker image
Copy code
def my_func():
  print('1')
If I subsequently change
print('1')
to
print('2')
in
my_package.py
and re-deploy the flow (which had no changes), the flow run would still show
print('1')
If I re-write my code to use a Task class instead, things work as expected (subsequent flow runs will show the updated
print('2')
statement. Is there any way to achieve the same result, while using @task decorators instead of re-writing my package as Task classes?
z
Hey @CA Lee -- can you try using script-based flow storage instead of pickle-based? https://docs.prefect.io/orchestration/flow_config/storage.html#script-based-storage
i.e. change
STORAGE = S3(bucket="aws-ecs-flows")
to
STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
1
m
Hello 🙂 I tried this but after reading a bit through docs found out that flow modules have to be uploaded there manually prior to usage of this option (
stored_as_script
). However, for GCS and S3 there’s an option
local_script_path
so it can be done automatically during the registration (at least I understood it like that), but not for Azure (which I use). So, 1. Did I understood this correctly and 2. Will Azure have this option in the future as well? Thanks!
z
Yes I believe you've understood this correctly, although I presume the
local_script_path
should be inferred from the context without you providing it. Azure appears to be missing this functionality but it definitely could be added easily.
👍 1
c
Dear @Zanie, is there any way for
local_script_path
to be inferred automatically? Am running into this error if I use
stored_as_script=True
for S3 storage:
Copy code
ValueError: A `key` must be provided to show where flow `.py` file is stored in S3 or provide a `local_script_path` pointing to a local script that contains the flow.
To bypass the error, I could pass the absolute path of the flow.py into `local_script_path`during flow registration. To try to do this dynamically, I could do :
Copy code
from pathlib import Path
local_script_path=str(Path(__file__).absolute()
However, if the flow is subsequently run using ECS, this error occurs during execution:
Copy code
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
Appreciate any guidance at all, thank you
Dear @Zanie, is there any way for the
local_script_path
to be inferred automatically for S3 storage? If the arg
local_script_path
is not passed, the below error is encountered:
Copy code
ValueError: A `key` must be provided to show where flow `.py` file is stored in S3 or provide a `local_script_path` pointing to a local script that contains the flow.
I have managed to get S3 storage working, but the solution seems clunky (pass the dynamic file path using the following code):
Copy code
import os, inspect

S3(
  bucket="my-bucket-name",
  local_script_path=os.path.abspath(inspect.stack()[0][1]),
  stored_as_script=True,
  key="my-key-name"
)
z
Hey @CA Lee -- sorry I missed your last message. This is outside the scope of what I can help with here in Slack chat, we'll have to look into this more in depth. Thanks for exploring and sharing some solutions, I really appreciate it.
@Marvin open "S3 storage should be able to infer `local_script_path`"