David Charles
03/23/2022, 11:23 AMflows
package and leverage shared stuff in common
.
. # <-- repo root
├── common
│ ├── __init__.py
│ ├── config.py # Reusable config
│ ├── schedules # Reusable schedules
│ │ ├── __init__.py
│ │ └── simple_schedule.py
│ └── tasks # Reusable tasks
│ └── __init__.py
└── flows
├── __init__.py
└── simple_flow
├── __init__.py
└── src
├── __init__.py
├── __main__.py
├── config.py
├── core.py
├── flow.py
└── tasks.py
flow.py
looks a like this:
from prefect import Flow, Parameter
from prefect.storage import GitLab
from flows.simple_flow.src import config
from flows.simple_flow.src.tasks import (
get_source,
decode_source,
store_decoded_data
)
# just to prove import works
from common.schedules.simple_schedule import schedule_daily
data_source_url = Parameter("data_source_url", default=config.data_source_url)
storage = GitLab(
host="<https://private-gitlab-host.com>",
repo="the-repo",
path="flows/simple_flow/src/flow.py",
ref="main",
access_token_secret="TOKEN_SECRET",
)
with Flow(name="simple_flow", storage=storage) as flow:
data = get_source(data_source_url=data_source_url)
decoded_data = decode_source(data=data)
store_decoded_data(decoded_data=decoded_data)
if __name__ == "__main__":
flow.register(project_name="default", add_default_labels=False)
This all works fine running a Prefect core server locally with a local agent. We’ve deployed into AWS and have an ECS agent. I have updated my local ~/.prefect/config.toml
as follows:
backend = "server"
[server]
host = "<https://our-aws-prefect-apollo.domain.com>"
port = "443"
endpoint = "${server.host}:${server.port}"
[server.ui]
endpoint = "<https://our-aws-prefect-ui.domain.com>"
I register the flow from a local machine using (python interpreter):
>>> from flows.simple_flow.src.flow import flow
>>> from prefect.run_configs import UniversalRun
>>> flow.name
'simple_flow'
>>> flow.storage
<Storage: GitLab>
>>> flow.run_config = UniversalRun(labels=["dev"])
>>> flow.register(project_name="default", labels=["dev"])
Flow URL: <https://our-aws-prefect-ui.domain.com/main/flow/53ae2776-36d1-4bed-8f9a-87ce95fad866>
└── ID: 182b65b6-5ad1-42c4-98ea-eac767b3b867
└── Project: default
└── Labels: ['dev']
'182b65b6-5ad1-42c4-98ea-eac767b3b867'
This registers without error - then when I try to execute I see this in the flow LOGS:
Failed to load and execute Flow's environment: KeyError("'__name__' not in globals")
flow.py
references? (in my example, the config.py
, core.py
and tasks.py
modules in the same package as flow.py
, as well as stuff in the top level common
package?Kevin Kho
David Charles
03/23/2022, 1:14 PMKyle McChesney
03/23/2022, 2:07 PMDavid Charles
03/23/2022, 2:46 PMTomer Cagan
03/24/2022, 10:14 AMKevin Kho
git clone
and pip install -e .
Kyle McChesney
03/24/2022, 2:08 PMDavid Charles
04/01/2022, 3:45 PMstorages
module that has a get_storage
callable. It returns an appropriate storage based on environment, so for local flow execution it defaults to Local()
but we have options to add other storages down the line. When we register a flow (e.g. in a CI pipeline) it will get a Docker
storage that’s been instanced with dockerfile
and registry_url
etc for the docker build and push that ensues.
Main issue encountered doing this from a mono-repo was we wanted to have a single “parameterised” Dockerfile (i.e. using ARG flow_name
) so we can build per-flow deps. However seems there’s a bug in prefect/storage/docker.py
where build_kwargs
is incorrectly passed into the Python Docker client. I’ve raised this issue: https://github.com/PrefectHQ/prefect/issues/5630Kevin Kho