Ben Welsh
01/30/2022, 10:48 PMAnna Geller
01/30/2022, 11:36 PMBen Welsh
01/31/2022, 1:09 AMAnna Geller
01/31/2022, 9:28 AMprefect run -p path/to/flow.py
Ben Welsh
01/31/2022, 12:55 PMdef get_storage(env: str = "production"):
"""Get the storage method used by the flow.
Args:
env (str): the environment where the task is running. Options are 'development' and 'production'.
Returns Prefect Storage instance
"""
logger = prefect.context.get("logger")
logger.debug(f"Loading {env} storage method")
options = {
"development": Local(
# The flow is stored as a file here on your laptop
# path="./flow.py",
# stored_as_script=True,
add_default_labels=False,
),
"production": Docker(
# An image containing the flow's code, as well as our Python dependencies,
# will be compiled when `pipenv run prefect register` is run and then
# uploaded to our repository on Google Artifact Registry.
registry_url="us-west2-docker.pkg.dev",
image_name="big-local-news-267923/warn-prefect-flow/warn-act-notices-etl-flow",
python_dependencies=["warn-scraper"],
),
}
return options[env]
with prefect.Flow(
"WARN Act Notices ETL",
storage=get_storage(os.getenv("PREFECT_FLOW_ENV", "production")),
run_config=UniversalRun(
env={
# Print logs from our dependencies
"PREFECT__LOGGING__EXTRA_LOGGERS": "['warn',]",
# Print debugging level code from Prefect
"PREFECT__LOGGING__LEVEL": "DEBUG",
},
# Tag the task
labels=["etl"],
),
executor=DaskExecutor(),
) as flow:
# Get logger
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running WARN Act Notices ETL flow")
# Get the list of all scrapers
scraper_list = prefect.Parameter(
"scrapers", default=warn.utils.get_all_scrapers(), required=True
)
delete = prefect.Parameter("delete", default=False)
# If the delete option has been set, then delete
runner = get_scrape_runner(delete)
# Map `scrape` across our sources
scrape.map(prefect.unmapped(runner), scraper_list)
Anna Geller
01/31/2022, 1:07 PMprefect.context.get("running_with_backend")
# "running_with_backend": true,