https://prefect.io logo
Title
a

Alon Barad

05/09/2023, 10:24 AM
quotes_flow.py
from prefect import flow, get_run_logger
from prefect.blocks.system import Secret

from common.blocks.mongodb import MongoDB
from common.subflows.crawl.crawl import crawl
from flows.quotes.parse import parse


@flow(name="quotes", flow_run_name="quotes-page-{page_number}")
def quotes_flow(run_id: str, page_number: int):
    logger = get_run_logger()
    conn_str = Secret.load('database-connection-string').get()
    database = MongoDB(conn_str=conn_str)

    run_config = database.find_one(
        database="configs",
        collection="run_configs",
        query={
            "run_id": run_id,
        },
    )

    actions = run_config["actions"]

    formatted_url = f"<https://quotes.toscrape.com/page/{page_number}/>"

    if 'CRAWL' in actions:
        crawl(url=formatted_url)
    else:
        <http://logger.info|logger.info>("Skipping crawl step")

    if 'PARSE' in actions:
        parse(url=formatted_url)
    else:
        <http://logger.info|logger.info>("Skipping parse step")


if __name__ == "__main__":
    quotes_flow(
        run_id="3227",
        page_number=1,
    )