https://prefect.io logo
Title
n

Nico Neumann

08/03/2022, 2:57 PM
Code Contest I am currently exploring the prefect 2.0 features (blocks, async, deployments, ui, schedules, etc.) and thought it would be interesting to do it in a fun way. This flow fetches random quotes from the internet using async/await and combines them with a random cat gif. The result is then send to a Slack channel using Prefect Slack Block. The user can then vote which one they like the most, of course counting starts at 0. The flow is scheduled to run every day through Prefect schedules. Currently it is only possible to send text to Slack, that’s why I provided a pull request which allows to send e.g. images or gifs using Slack blocks: https://github.com/PrefectHQ/prefect/pull/6200
🐱 4
:catjam: 3
πŸŽ‰ 6
πŸ™Œ 3
πŸ‘ 3
:thank-you: 2
:party-parrot: 5
import asyncio

import aiohttp
from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
from prefect.logging import get_run_logger


@task(retries=3)
async def fetch_random_quotes(
    session: aiohttp.ClientSession, url: str, amount: int
) -> str:
    tasks = []
    for _ in range(amount):
        task = asyncio.create_task(fetch_random_quote.fn(session, url=url))
        tasks.append(task)
    return await asyncio.gather(*tasks)


@task(retries=3)
async def fetch_random_quote(session: aiohttp.ClientSession, url: str) -> str:
    logger = get_run_logger()
    async with session.get(url) as response:
        if response.status != 200:
            logger.error(f"Failed to fetch quote: {response.status_code}")
            response.raise_for_status()

        response_data = await response.json()
        text = response_data["content"]
        author = response_data["author"]
        quote = f'"{text}" - {author}'
        <http://logger.info|logger.info>(f"Fetched quote: {quote}")
        return quote


@task(retries=3)
async def send_slack_notification(blocks: list[dict]) -> None:
    slack_webhook_block = await SlackWebhook.load("slack-fun-channel")
    await slack_webhook_block.notify("Daily fun", blocks=blocks)


@flow(
    name="Slack Fun Flow",
    description="""A flow that sends a random quote combined with a random cat gif to a Slack channel
    using a saved Slack webhook. The users can then vote which one they like the most""",
)
async def slack_fun_flow(
    cat_url: str = "<https://cataas.com/cat/gif/says>",
    quote_url: str = "<https://api.quotable.io/random>",
    amount: int = 3,
):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting slack fun flow")

    async with aiohttp.ClientSession() as session:
        quotes = await fetch_random_quotes(session, quote_url, amount)

        blocks = [
            {
                "type": "image",
                "title": {"type": "plain_text", "text": quote},
                "image_url": f"{cat_url}/vote for {i}",
                "alt_text": "",
            }
            for i, quote in enumerate(quotes)
        ]

        await send_slack_notification(blocks)


if __name__ == "__main__":
    asyncio.run(slack_fun_flow(amount=3))
c

Chris Reuter

08/03/2022, 3:09 PM
Awesome submission @Nico Neumann!! Thanks for the PR as well πŸ™
:thank-you: 1