CA Lee
03/14/2021, 4:42 AM0.14.12
,
Python version: 3.8.8
,
Docker base image: prefecthq/prefect:0.14.12-python3.8
My flows are registered using CI (Github Actions) with the following shell command in a loop over my flows, to avoid re-registration of flows on each push (adapted from this guide):
prefect register flow --skip-if-flow-metadata-unchanged
I develop my tasks separately from my flows, as a Python package, which then gets built as a Docker container and pushed to ECR.
_Expected behavior_:
---------------------
Flow pulls the latest flow code from S3 (from the most recent push), which then runs the latest task code from ECR
_Observed behavior_:
----------------------
On each flow registration, a new file is uploaded to S3.
However, on inspecting flow logs, the flow does not pull the latest version of the flow code stored in S3. This was verified by looking at the slugified-current-timestamp
of the S3 flow - an earlier version of the flow is being pulled instead.
I have also pulled image:latest
from ECR on my machine, dropped into a shell using local run
and confirmed that my task code in the Docker container was updated, but the flow code stored on S3 is not pulling the updated task code from ECR.
Storage:
STORAGE = S3(bucket="aws-ecs-flows")
Run config:
RUN_CONFIG = ECSRun(
image="repository-url/image:latest",
labels=['ecs']
)
Re-registering the flows without specifying --skip-if-flow-metadata-unchanged
results in the expected behavior, but also leads to unwanted flow version bumps in the Prefect Cloud UI.
The strange thing is that even though the tasks were developed (as a Docker container) separately from the flows, the flows doesn’t pull the latest task code from the Docker image, even when the image tag latest
is specified (the task code seems to stop at the version which identified that the flow metadata was unchanged i.e. idempotency_key = flow.serialized_hash()
, even if the task code was modified).
Appreciate if anyone has any insight on how to make this work for ECS! Kubernetes has a flag called imagePullPolicy: always
which may solve this issue, but I’m not aware if there is something similar for ECS.CA Lee
03/15/2021, 2:12 AM--skip-if-flow-metadata-unchanged
using S3 storage
2. On git push, initial image is built and flows are deployed to Prefect Cloud
3. Add an additional print statement to the task code which is built in the Docker image. Flows remain unchanged.
4. Git commit and push again to trigger CI
5. Run flow in Prefect Cloud (which calls the task code from the Docker container) - print statement does not appear
6. To verify print statement has been built in Docker image, on a local machine, docker pull image:latest
, docker run -it image:latest
, inspect code that was copied into the Docker container to see that print statement appears
7. If flow deleted and re-registered, print statement appears when the flow is run in Prefect CloudCA Lee
03/16/2021, 8:38 AMKubernetesRun
as well and ran into the same issue.
I inspected the kube config file generated by prefect agent kubernetes install
and found that there was a default setting: imagePullPolicy: Always
However, if the flow is not re-registered, the latest image is not pulled.CA Lee
03/17/2021, 8:17 AM@task
def task_func():
do something
return something
vs using a Task class:
class TaskFunc(Task):
def run(self):
do something
return something
And your flow.py:
from tasks import task_func
with(...) as flow:
task_func()
flow.register(project_name="xxx", idempotency_key=flow.serialized_hash())
The registered flow does not pick up any changes in the task_func code.
If, however, you use a Task class, and your flow.py:
from tasks import TaskFunc
task_func = TaskFunc()
with(...) as flow:
task_func()
flow.register(project_name="xxx", idempotency_key=flow.serialized_hash())
And you subsequently change your task func code, the flow will pick up changes in the task func code.
This is desired behavior as the flow code and metadata can remain unchanged on each subsequent flow registration, while you can make changes to the task code which is then built and pushed to a container repository, and the changes are then picked up by subsequent flow runs.
Hoping someone can enlighten me on why this is happening, or how to make it work with @task decorators...Marko Jamedzija
04/06/2021, 10:02 AMCA Lee
04/08/2021, 10:19 AMflow.py
- Deployed to Prefect Cloud
from my_package import my_func
@task
def task_func():
my_func()
my_package.py
- built inside Docker image
def my_func():
print('1')
If I subsequently change print('1')
to print('2')
in my_package.py
and re-deploy the flow (which had no changes), the flow run would still show print('1')
If I re-write my code to use a Task class instead, things work as expected (subsequent flow runs will show the updated print('2')
statement.
Is there any way to achieve the same result, while using @task decorators instead of re-writing my package as Task classes?Zanie
Zanie
STORAGE = S3(bucket="aws-ecs-flows")
to STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
Marko Jamedzija
04/08/2021, 5:05 PMstored_as_script
).
However, for GCS and S3 there’s an option local_script_path
so it can be done automatically during the registration (at least I understood it like that), but not for Azure (which I use).
So, 1. Did I understood this correctly and 2. Will Azure have this option in the future as well? Thanks!Zanie
local_script_path
should be inferred from the context without you providing it. Azure appears to be missing this functionality but it definitely could be added easily.CA Lee
05/03/2021, 2:44 AMlocal_script_path
to be inferred automatically?
Am running into this error if I use stored_as_script=True
for S3 storage:
ValueError: A `key` must be provided to show where flow `.py` file is stored in S3 or provide a `local_script_path` pointing to a local script that contains the flow.
To bypass the error, I could pass the absolute path of the flow.py into `local_script_path`during flow registration.
To try to do this dynamically, I could do :
from pathlib import Path
local_script_path=str(Path(__file__).absolute()
However, if the flow is subsequently run using ECS, this error occurs during execution:
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
Appreciate any guidance at all, thank youCA Lee
05/11/2021, 2:30 AMlocal_script_path
to be inferred automatically for S3 storage?
If the arg local_script_path
is not passed, the below error is encountered:
ValueError: A `key` must be provided to show where flow `.py` file is stored in S3 or provide a `local_script_path` pointing to a local script that contains the flow.
I have managed to get S3 storage working, but the solution seems clunky (pass the dynamic file path using the following code):
import os, inspect
S3(
bucket="my-bucket-name",
local_script_path=os.path.abspath(inspect.stack()[0][1]),
stored_as_script=True,
key="my-key-name"
)
Zanie
Zanie
Marvin
05/11/2021, 2:59 PMZanie