VIPIN SOMA
09/16/2025, 3:52 PMMarvin
09/16/2025, 3:52 PMMarvin
09/16/2025, 3:54 PMfrom prefect.blocks.system import Secret
Secret(value="YOUR_TENANT_ID").save("ms-tenant-id", overwrite=True)
Secret(value="YOUR_CLIENT_ID").save("ms-client-id", overwrite=True)
Secret(value="YOUR_CLIENT_SECRET").save("ms-client-secret", overwrite=True)
Flow code:
```
import os
import base64
from pathlib import Path
import requests
import msal
from prefect import flow, task
from prefect.blocks.system import Secret
GRAPH_SCOPE = ["https://graph.microsoft.com/.default"]
GRAPH_BASE = "https://graph.microsoft.com/v1.0"
@task(retries=3, retry_delay_seconds=30)
def get_graph_token(tenant_id: str, client_id: str, client_secret: str) -> str:
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=GRAPH_SCOPE)
if "access_token" not in result:
raise RuntimeError(f"Failed to get token: {result.get('error')}: {result.get('error_description')}")
return result["access_token"]
@task
def find_latest_message_with_attachments(access_token: str, mailbox: str) -> str:
url = f"{GRAPH_BASE}/users/{mailbox}/messages"
params = {
"$filter": "hasAttachments eq true",
"$orderby": "receivedDateTime desc",
"$top": "1",
"$select": "id,subject,receivedDateTime"
}
headers = {"Authorization": f"Bearer {access_token}"}
r = requests.get(url, headers=headers, params=params, timeout=30)
r.raise_for_status()
items = r.json().get("value", [])
if not items:
raise RuntimeError("No messages with attachments found.")
return items[0]["id"]
@task
def download_attachments(access_token: str, mailbox: str, message_id: str, download_dir: str = "./downloads") -> list[str]:
Path(download_dir).mkdir(parents=True, exist_ok=True)
url = f"{GRAPH_BASE}/users/{mailbox}/messages/{message_id}/attachments"
params = {"$filter": "isInline eq false"}
headers = {"Authorization": f"Bearer {access_token}"}
r = requests.get(url, headers=headers, params=params, timeout=30)
r.raise_for_status()
saved = []
for att in r.json().get("value", []):
# Only FileAttachment has contentBytes; ItemAttachment will notMarvin
09/16/2025, 3:54 PM