Jawaad Mahmood
12/22/2021, 7:08 PMfrom prefect import task, Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import DockerRun
from prefect.storage import Docker
import docker
with Flow("some_flow") as flow:
do_something
docker_client = docker.DockerClient()
docker_client.login(username=<env_user>,password=<env_pass>)
flow.storage = Docker(
registry_url='<http://registry.hub.docker.com/repository/docker/<user>/<repo>|registry.hub.docker.com/repository/docker/<user>/<repo>>'
,image_name='<some_flow>'
,files={
<origin path>:<dest path>
}
,python_dependencies = ['pandas','numpy','prefect']
,env_vars={
"PYTHONPATH": "$PYTHONPATH:assets/:root/:data/:image"
}
,base_image='python:3.7.3'
)
flow.run_config = DockerRun(labels=['my-label']
)
flow.executor = LocalExecutor()
flow.register(project_name="some_project")
Anna Geller
12/22/2021, 7:36 PMwith Flow("some_flow") as flow:
do_something
docker_client = docker.DockerClient()
docker_client.login(username=<env_user>,password=<env_pass>)
flow.storage = Docker(
registry_url='<http://registry.hub.docker.com/repository/docker/<user>/<repo>|registry.hub.docker.com/repository/docker/<user>/<repo>>'
,image_name='<some_flow>'
,files={
<origin path>:<dest path>
}
,python_dependencies = ['pandas','numpy','prefect']
,env_vars={
"PYTHONPATH": "$PYTHONPATH:assets/:root/:data/:image"
}
,base_image='python:3.7.3'
)
flow.run_config = DockerRun(labels=['my-label']
)
flow.executor = LocalExecutor()
flow.register(project_name="some_project")
or:
with Flow(
"some_flow",
run_config=DockerRun(labels=["my-label"]),
storage=Docker(
registry_url="<http://registry.hub.docker.com/repository/docker/<user>/<repo>|registry.hub.docker.com/repository/docker/<user>/<repo>>",
image_name="<some_flow>",
python_dependencies=["pandas", "numpy", "prefect"],
env_vars={"PYTHONPATH": "$PYTHONPATH:assets/:root/:data/:image"},
base_image="python:3.7.3",
),
) as flow:
pass
3. Lastly, within your Flow constructor, you can only construct a DAG, i.e. you can’t have any non-task code. Within the “with Flow” block you should only be calling tasks, so you can remove the lines below from your Flow block and replace that with login via terminal in the same terminal session in which you register your flow:
docker_client = docker.DockerClient()
docker_client.login(username=<env_user>,password=<env_pass>)
Jawaad Mahmood
12/22/2021, 8:11 PMAnna Geller
12/22/2021, 8:16 PMJawaad Mahmood
12/23/2021, 1:06 AMfrom prefect import task, Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import DockerRun
from prefect.storage import Docker
import docker
with Flow("some_flow") as flow:
do_something
flow.storage = Docker(
registry_url='<user>'
,image_name='<repo>'
,image_tag='<some_flow>'
,files={
<origin path>:<dest path>
}
,python_dependencies = ['pandas','numpy','prefect']
,env_vars={
"PYTHONPATH": "$PYTHONPATH:assets/:root/:data/:image"
}
,base_image='python:3.7.3'
)
flow.run_config = DockerRun(labels=['my-label']
)
flow.executor = LocalExecutor()
flow.register(project_name="some_project")
Anna Geller
12/23/2021, 10:46 AM