Tom Klein
12/16/2021, 5:06 PMDockerfile
)
• check if the dedicated container for it exists. if there is, remove it.
• create a new container for it
• run the container with some command
• wait for the run to finish
• log the results
• remove the container
ideally, i'd like to reach a point where i could just point Prefect at a local or remote code repository that contains a Dockerfile and have it basically just turn it into a single task (or, alternatively, point it at a pre-built image on ECR and rest would be the same) -
do you think it would make more sense to turn this into a custom task - or - sort of a parameterized generic flow that i can incorporate into other flows using create_flow_run
?
my issues with making the generic flow is that it cannot easily be shared with the community + it requires to register such a flow + requires to name all the tasks dynamically to make it clear (from the outside) what it's doing (and even then, i can only cause the tasks to be named dynamically, not the flow). E.g. if this image does some data preprocessing for some model, the flow name would still be something like run docker
which doesn't do much to indicate that it's actually running this specific docker...
the problem with creating a custom task is that i could not re-use the existing task-library Docker tasks...
thoughts? 🤔Kevin Kho
run
method. It just treats it like normal Python instead of a TaskTom Klein
12/16/2021, 5:10 PMKevin Kho
create_flow_run
and the subflow can utilize the tasks in the task library and you get observability into each oneKevin Kho
Tom Klein
12/16/2021, 5:11 PMrun_docker
flow runs all over the place and it would be difficult (from outside) to know what it's actually running until i delve into the params etc.Tom Klein
12/16/2021, 5:12 PMKevin Kho
RenameFlowRun
task to change the name, and then you can use the Python underneath by calling the .run()
inside a state handler so that it gets used before the first task runsTom Klein
12/16/2021, 5:18 PMKevin Kho
prefect.context.parameters
Anna Geller
from prefect import Flow, task
from prefect.tasks.prefect import RenameFlowRun
import prefect
def rename_handler(obj, new_state, old_state):
if new_state.is_running():
param = prefect.context.parameters.get("your_parameter_name")
RenameFlowRun().run(flow_run_name=f"new_name_{param}")
return
@task
def first_task():
return 1
with Flow("test-flow", state_handlers=[rename_handler]) as flow:
first_task()