CA Lee
03/17/2021, 1:11 AMfrom prefect import task, Task
# Example 1
class Example(Task):
def run(self, **kwargs):
do something
# Example 2
@task
def example():
do something
In particular, how do I force a rebuild of the second task (using decorator)? Running into some issues running flows using ECS / ECR, where the task would not get rebuilt if using decorator formatnicholas
CA Lee
03/17/2021, 1:26 AMfrom tasks import Example
task_result = Example()
with Flow("flow-name", storage=S3, run_config=ECSRun) as flow:
run_task_result = task_result()
flow.register(project_name="project", idempotency_key=serialized_hash())
task.py
from prefect import Task
class Example(Task):
def run(self, **kwargs):
do something
Dockerfile:
FROM prefecthq/prefect:0.14.12-python3.8
RUN mkdir -p /prefect/tasks/
COPY tasks/ /prefect/tasks/
ENV PYTHONPATH="${PYTHONPATH}:/prefect/tasks/"
WORKDIR prefect
I have CI process (tested locally too), which builds, tags and pushes a Docker image to a repo.
Everything runs fine with the above 3 file setup.
If, however, I rewrite the task using decorator syntax:
task.py
from prefect import task
@task
def example():
do something
...
make some changes
If I make any changes to the task decorator, even after a new container has been built from the changed code, running a flow from Prefect Cloud will not pick up any changes.
If I use the Task class, the code changes are reflected.
I suspect the Task class is built at runtime, and the task decorator is not. I am more inclined to use decorators though as they simplify my development.
Any insight into how to reflect code changes using decorators would be appreciated !nicholas
Example()()
- if you’re calling the same line with a decorated task instead, there is no class instantiation step, which means your line task_result = Example()
is actually calling the run method of your task.CA Lee
03/17/2021, 1:43 AMexample
?
Or should my question be - Is there any way to instantiate a Prefect Task using the task decorator instead of using the Task class?
flow.py
from tasks import example
with Flow(...) as flow:
example()
task.py
@task
def example():
do something
nicholas
task_result = example
?