Nico Neumann
08/03/2022, 2:57 PMimport asyncio
import aiohttp
from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
from prefect.logging import get_run_logger
@task(retries=3)
async def fetch_random_quotes(
session: aiohttp.ClientSession, url: str, amount: int
) -> str:
tasks = []
for _ in range(amount):
task = asyncio.create_task(fetch_random_quote.fn(session, url=url))
tasks.append(task)
return await asyncio.gather(*tasks)
@task(retries=3)
async def fetch_random_quote(session: aiohttp.ClientSession, url: str) -> str:
logger = get_run_logger()
async with session.get(url) as response:
if response.status != 200:
logger.error(f"Failed to fetch quote: {response.status_code}")
response.raise_for_status()
response_data = await response.json()
text = response_data["content"]
author = response_data["author"]
quote = f'"{text}" - {author}'
<http://logger.info|logger.info>(f"Fetched quote: {quote}")
return quote
@task(retries=3)
async def send_slack_notification(blocks: list[dict]) -> None:
slack_webhook_block = await SlackWebhook.load("slack-fun-channel")
await slack_webhook_block.notify("Daily fun", blocks=blocks)
@flow(
name="Slack Fun Flow",
description="""A flow that sends a random quote combined with a random cat gif to a Slack channel
using a saved Slack webhook. The users can then vote which one they like the most""",
)
async def slack_fun_flow(
cat_url: str = "<https://cataas.com/cat/gif/says>",
quote_url: str = "<https://api.quotable.io/random>",
amount: int = 3,
):
logger = get_run_logger()
<http://logger.info|logger.info>("Starting slack fun flow")
async with aiohttp.ClientSession() as session:
quotes = await fetch_random_quotes(session, quote_url, amount)
blocks = [
{
"type": "image",
"title": {"type": "plain_text", "text": quote},
"image_url": f"{cat_url}/vote for {i}",
"alt_text": "",
}
for i, quote in enumerate(quotes)
]
await send_slack_notification(blocks)
if __name__ == "__main__":
asyncio.run(slack_fun_flow(amount=3))
Chris Reuter
08/03/2022, 3:09 PM