https://prefect.io logo
#prefect-server
Title
# prefect-server
r

Ray Tang

04/18/2022, 3:25 PM
Hi all, I am trying to follow this medium post to create a dockerized flow, and I am now able to dockerize the flow and upload it to our image repository. https://medium.com/the-prefect-blog/the-simple-guide-to-productionizing-data-workflows-with-docker-31a5aae67c0a But I don't know how to register it to a docker agent? I have a docker agent running using
prefect agent docker start --show-flow-logs
I have tried to run
prefect register --project test-project --path hello_word_dockerized.py
, but the registration failed because I don't have pandas installed on my docker agent's host (and I am not intended to because I want to have isolated docker images per flows).
Copy code
import prefect
from prefect import task, Flow
from prefect.run_configs import DockerRun
from prefect.storage import Local
import pandas

def get_logger():
    from prefect.utilities.logging import get_logger
    import logging
    import sys

    logger = get_logger()
    logger.setLevel('INFO')
    log_stream = logging.StreamHandler(sys.stdout)
    logger.addHandler(log_stream)

    return logger

logger = get_logger()

@task(log_stdout=True)
def hello_world():
    <http://logger.info|logger.info>("Hello, World (Dockerized)")

with Flow("Hello World Dockerized") as flow:
    hello_world()

IMAGE_NAME = "prefect-hello-world-dockerized"
IMAGE_TAG = "latest"

import os
print(os.path.basename(__file__))
flow.storage = Local(path="/opt/prefect/flows/hello_word_dockerized.py", stored_as_script=True)

flow.run_config = DockerRun(image=f"{IMAGE_NAME}:{IMAGE_TAG}", labels=["uat", "docker"])
k

Kevin Kho

04/18/2022, 3:27 PM
Hey @Ray Tang, have you need to local packages installed to register because it evaluates the Python script. Something you can do though to bypass this is run the import inside a task so that’s it deferred to execution. You need to register this, and then you’ll be able to run it on the Docker agent
r

Ray Tang

04/18/2022, 3:37 PM
@Kevin Kho I can do that. But in some cases, some tasks are in a shared module, and I need to import them at the top level (which I can't defer the import), what should we do?
k

Kevin Kho

04/18/2022, 3:40 PM
Then you really need the registration environment to have them installed or wrap them in a try-except
Also though, why would importing it inside a task not make it global? I think the import will last throughout other tasks
r

Ray Tang

04/18/2022, 3:51 PM
we don't want to install any package globally, because we don't want to deal with the dependency conflict messes, and we want to have the flexibility of upgrading some dependencies in some flows for POC.
so based on our conversations, I will need to create a registration environment for registration, and a docker agent for running the flows
k

Kevin Kho

04/18/2022, 3:55 PM
Yes you do need the registration environment, unless you use
try-except
instead because
prefect register …
opens the Python file and evaluates it to create that Flow object
r

Ray Tang

04/18/2022, 4:00 PM
so in this case, will prefect register the flow correctly?
Copy code
with Flow("Hello World Dockerized") as flow:
    try:
        from flow_common import hello_workld_task
        hello_workld_task()
    except:
        pass
given that module
flow_common
is not available during registration
k

Kevin Kho

04/18/2022, 4:24 PM
Ah this will not work because you really need that task code to exist to register.
a

Anurag Bajpai

04/19/2022, 6:07 AM
you could make a flow to register your flows. That way the registration inside that flow will always run in a base image that can contain your shared modules. Something like this
Copy code
from prefect.utilities.storage import extract_flow_from_file

@task(checkpoint=False,log_stdout=True,max_retries=3,retry_delay=dt.timedelta(seconds=10))
def register_flow(path, project_name):
    flow = extract_flow_from_file(path)
    flow.register(project_name=project_name)

with Flow("Register Flow", run_config=DockerRun(image='your-image-here')) as flow:
    flow_path = Parameter("flow_path")
    project_name = Parameter("project_name")
    register_flow(flow_path, project_name)
upvote 1
r

Ray Tang

04/19/2022, 8:18 AM
Thanks @Anurag Bajpai, we are looking into this.
We have copied the flow file into the docker image, and I can see it when using
docker run
Copy code
$ docker run -it prefect-hello-world-dockerized:latest /bin/sh
# cd /opt/prefect/build/hello_world_dockerized/flows/                        
# ls
hello_word_dockerized.py  register_flow.py
but when we run the register_flow.py (which is provided by @Anurag Bajpai), it throws
FileNotFoundError: [Errno 2] No such file or directory:
and we are not sure if
DockerRun
is not running 🤔 (because when we put in some bad image name, no error was thrown)
k

Kevin Kho

04/19/2022, 1:35 PM
When you go to the UI, I think you should be able to see the registered RunConfig
r

Ray Tang

04/19/2022, 3:06 PM
I have modified @Anurag Bajpai a bit, aiming to make it a reusable flow for future deployment use. https://gist.github.com/raytangRT/3ee77bdfccac80e317e45d3c430402cd but the problem is that when call this by
python register_flow.py --project_name demo-project-dockerized/hello-world-dockerized --flow_path /opt/prefect/build/hello_world_dockerized/flows/ --image_name prefect-hello-world-dockerized:latested/flows/ --image_name aprefect-hello-world-dockerized:latest
it doesn't throw an error with a incorrect image_name
a

Anurag Bajpai

04/19/2022, 3:51 PM
i don't think flow.run() uses the flow's run_config by default - @Kevin Kho can correct me if i'm wrong.
k

Kevin Kho

04/19/2022, 3:53 PM
Yes that’s right.
flow.run()
does not respect RunConfig. Only agent backed runs do. But I think Ray is registering?
Ah just saw the gist. Yes
flow.run()
uses executor but not RunConfig
2 Views