Nico Neumann

    Nico Neumann

    1 month ago
    Code Contest I am currently exploring the prefect 2.0 features (blocks, async, deployments, ui, schedules, etc.) and thought it would be interesting to do it in a fun way. This flow fetches random quotes from the internet using async/await and combines them with a random cat gif. The result is then send to a Slack channel using Prefect Slack Block. The user can then vote which one they like the most, of course counting starts at 0. The flow is scheduled to run every day through Prefect schedules. Currently it is only possible to send text to Slack, that’s why I provided a pull request which allows to send e.g. images or gifs using Slack blocks: https://github.com/PrefectHQ/prefect/pull/6200
    import asyncio
    
    import aiohttp
    from prefect import flow, task
    from prefect.blocks.notifications import SlackWebhook
    from prefect.logging import get_run_logger
    
    
    @task(retries=3)
    async def fetch_random_quotes(
        session: aiohttp.ClientSession, url: str, amount: int
    ) -> str:
        tasks = []
        for _ in range(amount):
            task = asyncio.create_task(fetch_random_quote.fn(session, url=url))
            tasks.append(task)
        return await asyncio.gather(*tasks)
    
    
    @task(retries=3)
    async def fetch_random_quote(session: aiohttp.ClientSession, url: str) -> str:
        logger = get_run_logger()
        async with session.get(url) as response:
            if response.status != 200:
                logger.error(f"Failed to fetch quote: {response.status_code}")
                response.raise_for_status()
    
            response_data = await response.json()
            text = response_data["content"]
            author = response_data["author"]
            quote = f'"{text}" - {author}'
            <http://logger.info|logger.info>(f"Fetched quote: {quote}")
            return quote
    
    
    @task(retries=3)
    async def send_slack_notification(blocks: list[dict]) -> None:
        slack_webhook_block = await SlackWebhook.load("slack-fun-channel")
        await slack_webhook_block.notify("Daily fun", blocks=blocks)
    
    
    @flow(
        name="Slack Fun Flow",
        description="""A flow that sends a random quote combined with a random cat gif to a Slack channel
        using a saved Slack webhook. The users can then vote which one they like the most""",
    )
    async def slack_fun_flow(
        cat_url: str = "<https://cataas.com/cat/gif/says>",
        quote_url: str = "<https://api.quotable.io/random>",
        amount: int = 3,
    ):
        logger = get_run_logger()
        <http://logger.info|logger.info>("Starting slack fun flow")
    
        async with aiohttp.ClientSession() as session:
            quotes = await fetch_random_quotes(session, quote_url, amount)
    
            blocks = [
                {
                    "type": "image",
                    "title": {"type": "plain_text", "text": quote},
                    "image_url": f"{cat_url}/vote for {i}",
                    "alt_text": "",
                }
                for i, quote in enumerate(quotes)
            ]
    
            await send_slack_notification(blocks)
    
    
    if __name__ == "__main__":
        asyncio.run(slack_fun_flow(amount=3))
    Chris Reuter

    Chris Reuter

    1 month ago
    Awesome submission @Nico Neumann!! Thanks for the PR as well πŸ™