Alon Barad
05/09/2023, 10:24 AMquotes_flow.py
from prefect import flow, get_run_logger
from prefect.blocks.system import Secret
from common.blocks.mongodb import MongoDB
from common.subflows.crawl.crawl import crawl
from flows.quotes.parse import parse
@flow(name="quotes", flow_run_name="quotes-page-{page_number}")
def quotes_flow(run_id: str, page_number: int):
logger = get_run_logger()
conn_str = Secret.load('database-connection-string').get()
database = MongoDB(conn_str=conn_str)
run_config = database.find_one(
database="configs",
collection="run_configs",
query={
"run_id": run_id,
},
)
actions = run_config["actions"]
formatted_url = f"<https://quotes.toscrape.com/page/{page_number}/>"
if 'CRAWL' in actions:
crawl(url=formatted_url)
else:
<http://logger.info|logger.info>("Skipping crawl step")
if 'PARSE' in actions:
parse(url=formatted_url)
else:
<http://logger.info|logger.info>("Skipping parse step")
if __name__ == "__main__":
quotes_flow(
run_id="3227",
page_number=1,
)