Tom Klein
12/22/2021, 11:51 AMAnna Geller
Tom Klein
12/22/2021, 12:57 PMinput
or output
interfaces, and the Prefect docker tasks don’t add in that functionality either when it wraps them
our flows can have many other tasks besides the ML task itself, my team lead suggested that the ML tasks are wrapped in containers -- not that everything around them (e.g. pulling from snowflake, sending results to some other service) also happens in containers necessarily
what i’m trying to verify is that if a task lives in a docker container, then - as it’s currently designed - Prefect offers only a low-level interaction with that Docker container and offers no functional interaction with whatever happens inside itAnna Geller
Tom Klein
12/22/2021, 2:22 PMKubernetesRun
jobs - might contain too many dependencies?
i.e. it needs to contain by default for example a docker daemon in order to be able to run docker
tasks, etc. - is that the case?
(cause they were wondering if we’d need to define a different image per flow in order to save resources)
or are the dependencies dynamically inferred somehow?Anna Geller
from prefect.run_configs import KubernetesRun
flow.run_config = KubernetesRun(env={"EXTRA_PIP_PACKAGES": "scikit-learn matplotlib"})
Tom Klein
12/22/2021, 2:46 PMdocker daemon
(from what you’re saying and from the docs i understand that the general recommendation is to use dedicated images for flows based on the dependencies they need)
but i’m still curious about whether every image that inherits from the default prefect image would also run a docker daemon
— and suddenly i realized (from when i was working on the POC) that you said the image doesn’t actually run docker at all and instead only interfaces with it using the socket (and relies on there being some daemon running on the machine)
so, in general - if we wanted to run Docker
tasks (e.g. pull image, run container, wait on container, etc.) - we’d have to launch the daemon on the image (used by the flow) ourselves, right?Tom Klein
12/22/2021, 3:34 PMPullImage
task never actually invoked in the flow in the example code?Anna Geller
Tom Klein
12/22/2021, 4:12 PMimage
isn’t a reference to a task? (rather than the result of one)?
and doesn’t it need to be executed itself (like create_container
is invoked with paranthesis create_container(…)
in the body of the flow
block )?Tom Klein
12/22/2021, 4:15 PMAnna Geller
Anna Geller
Tom Klein
12/22/2021, 4:19 PMCreateContainer(...)
yields a task reference which can then be placed in a var named create_container
(this isn’t the result of executing the task, it’s just a reference to the task that was just defined)
while for PullImage
- doing the exact same thing supposedly immediately yields a result — even though it seems to be executed outside of the flow
block?
i feel like i’m missing something about how Prefect derives the actual DAG flow from the create_container(image)
invocation, i.e. - that it implicitly understands it needs to execute some unnamed task that was created using PullImage(…)
even though it’s never explicitly invoked within the flow block….Tom Klein
12/22/2021, 4:21 PMpull_image = PullImage(...)
with Flow(...) as flow:
image = pull_image()
container_id = create_container(image)
...
be 100% equivalent?Anna Geller
pull_image = PullImage(
docker_server_url="<tcp://localhost:2375>",
repository="prefecthq/prefect",
tag="latest",
)
create_container = CreateContainer(
docker_server_url="<tcp://localhost:2375>",
image_name="prefecthq/prefect:latest",
command='''python -c "from prefect import Flow; f = Flow('empty'); f.run()"''',
)
start_container = StartContainer(docker_server_url="<tcp://localhost:2375>")
wait_on_container = WaitOnContainer(docker_server_url="<tcp://localhost:2375>")
# We pass `trigger=always_run` here so the logs will always be retrieved, even
# if upstream tasks fail
get_logs = GetContainerLogs(
docker_server_url="<tcp://localhost:2375>", trigger=always_run
)
with Flow("Docker sidecar example") as flow:
# Create and start the docker container
container_id = create_container(pull_image)
started = start_container(container_id=container_id)
# Once the docker container has started, wait until it's completed and get the status
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
# Once the status code has been retrieved, retrieve the logs
logs = get_logs(container_id=container_id, upstream_tasks=[status_code])
flow.visualize() # 1.
with Flow("Docker sidecar example") as flow:
# Create and start the docker container
image = pull_image()
container_id = create_container(image)
started = start_container(container_id=container_id)
# Once the docker container has started, wait until it's completed and get the status
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
# Once the status code has been retrieved, retrieve the logs
logs = get_logs(container_id=container_id, upstream_tasks=[status_code])
flow.visualize() # 2.
Tom Klein
12/22/2021, 6:19 PMAnna Geller
Tom Klein
12/22/2021, 8:59 PMTom Klein
12/22/2021, 9:50 PMRunNamespacedJob
?
(assuming i could just launch the same image exactly as a job in k8s instead of using the sidecar?)Anna Geller
Tom Klein
12/24/2021, 9:49 AMKubernetesPodOperator
allows for “native” [XCOM] communication to and from the k8s pod via a json
in the container)
i wish there was a similar functionality for Docker
or Kubernetes
run tasks 🤔Anna Geller