Gustavo Fonseca
10/18/2025, 6:18 PMMarvin
10/18/2025, 6:18 PMMarvin
10/18/2025, 6:21 PMfrom prefect.blocks.system import Secret
# Create and save once (e.g., in a setup script or REPL)
Secret(value="YOUR_API_KEY").save("my-api-key")
# Load and use in your flow
from prefect import flow
from prefect.blocks.system import Secret
@flow
def my_flow():
api_key = Secret.load("my-api-key").get()
# use api_key safely here
if __name__ == "__main__":
my_flow()
Prefer external managers when you can (AWS Secrets Manager, GCP Secret Manager, Vault) via Prefect integrations:
- prefect-aws
- prefect-gcp
- prefect-snowflake
On âwhy other people can see what I ask you?â
- If youâre chatting in a public Slack channel (like the Prefect Community Slack), everyone in that channel can see your messages. For privacy, send me a direct message or move to a private channel with only the people you want included. And regardless, never paste real secrets into Slack.
If youâd like, tell me which variables youâre currently using for keys and I can help you migrate them to Secret blocks.Gustavo Fonseca
10/18/2025, 6:36 PMMarvin
10/18/2025, 6:36 PMMarvin
10/18/2025, 6:41 PMtask_run rows where flow_run_id is NULL, but the migration is trying to enforce NOT NULL on that column. This typically happens when an older DB/schema has inconsistent data and we run a newer migration.
First, a couple quick checks:
- Make sure no Prefect server/API process is running while you reset (stop any running prefect server start or API containers).
- Confirm which DB youâre targeting and that youâre resetting the right one:
uv run prefect config view --show-sources
Look for server.database.connection_url and home. If itâs SQLite, youâll see a sqlite+aiosqlite:///... path to a .db file under $PREFECT_HOME (by default under your home directory).
Ways to fix
Option A â Nuke-and-recreate (SQLite and you donât need the data)
1) Stop Prefect server/API.
2) Back up and delete the SQLite file reported by server.database.connection_url.
3) Recreate:
uv run prefect server database reset -y
Then start the server again.
Option B â Clean bad rows and retry (if you need to keep the rest of the data)
- For SQLite (replace with the actual path you saw in config view):
sqlite3 /path/to/your.db "DELETE FROM task_run WHERE flow_run_id IS NULL;"
- For Postgres (replace with your DSN/connection and schema if needed):
psql "$PREFECT_SERVER_DATABASE_CONNECTION_URL" -c "DELETE FROM task_run WHERE flow_run_id IS NULL;"
Then:
uv run prefect server database reset -y
Option C â Ensure the reset is targeting the right DB
If you manage multiple environments, force the target explicitly:
PREFECT_SERVER_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///absolute/path/to/your.db" uv run prefect server database reset -y
If the error persists, please share:
- The output of:
uv run prefect version
uv run prefect config view --show-sources --show-defaults
- Whether youâre on SQLite or Postgres
- Whether any Prefect server/API process was running when you ran the reset
References:
- Settings and profiles
- Settings referenceGustavo Fonseca
10/18/2025, 6:45 PMMarvin
10/18/2025, 6:45 PMMarvin
10/18/2025, 6:48 PMuv run prefect config view --show-sources --show-secrets | grep DATABASE_CONNECTION_URL
You should see something like:
PREFECT_API_DATABASE_CONNECTION_URL='sqlite+aiosqlite:///.../.prefect/prefect.db'
Note: If youâre using Docker to run the server, that path lives inside the containerâs filesystem unless youâve mounted a volume.
Nuke and recreate safely
1) Stop the server (Ctrl+C if itâs running in your terminal, or stop the process/container).
2) Delete the SQLite file shown by the command above (example for default):
rm -f ~/.prefect/prefect.db
3) Recreate/reset the DB:
uv run prefect server database reset -y
4) Start the server again:
uv run prefect server start
If youâd rather not hunt for the file, you can force a fresh DB by explicitly setting the URL to a new path and running reset:
PREFECT_API_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///$HOME/.prefect/prefect.db" uv run prefect server database reset -y
If anythingâs unclear (OS, Docker vs local, etc.), let me know and Iâll tailor the exact commands.
References:
- Settings and profiles: Docs
- Settings reference (PREFECT_HOME, API DB settings): ReferenceGustavo Fonseca
10/18/2025, 7:16 PMMarvin
10/18/2025, 7:16 PMMarvin
10/18/2025, 7:18 PMuv run prefect version)
- How youâre running the server (the exact command, e.g., uv run prefect server start)
- A minimal snippet of the flow/tasks from the 102 lesson
In the meantime, here are the most common causes and fixes:
1) UI state is stale
- The Task Runs table does not always auto-refresh. Hard refresh the page and toggle âLive updatesâ on the run page.
- Make sure your Python session is pointed at the same API URL youâre viewing in the browser (PREFECT_API_URL=<http://127.0.0.1:4200/api> if youâre truly local).
2) Server was started without services or the server wasnât restarted cleanly
- Ensure youâre not using --no-services.
- Stop the server completely, then start it again:
# stop any running server
# then start it cleanly
uv run prefect server start
3) Local database got into a weird state (especially if you reset while the server was running)
- Yes â stop the server first. Then nuke and recreate:
# stop server first
uv run prefect config view --show-sources --show-secrets | grep DATABASE_CONNECTION_URL
# delete the sqlite file that command shows (typically ~/.prefect/prefect.db)
rm -f ~/.prefect/prefect.db
uv run prefect server database reset -y
uv run prefect server start
4) The second task wasnât actually scheduled/executed (common call patterns)
- In Prefect 3.x, either call tasks directly (sequential) or use .submit() and then .result() for concurrency. Examples:
Direct calls (simple and correct):
from prefect import flow, task
@task
def plus_one(x):
return x + 1
@task
def times_two(x):
return x * 2
@flow
def demo():
a = plus_one(1)
b = times_two(a)
return b
if __name__ == "__main__":
demo()
Concurrent pattern (donât forget to resolve results):
@flow
def demo_concurrent():
a_f = plus_one.submit(1)
b_f = times_two.submit(a_f)
b = b_f.result() # ensure completion
return b
If you share your snippet, I can confirm the pattern. Also, since youâve done DB resets while the server was running, Iâd strongly recommend the âstop server â delete DB file â reset â start serverâ sequence above and then re-run the flow.
Docs for reference:
- Flows and tasks in Prefect 3.x: DocsGustavo Fonseca
10/18/2025, 7:19 PMGustavo Fonseca
10/18/2025, 7:19 PMGustavo Fonseca
10/18/2025, 7:19 PMMarvin
10/18/2025, 7:19 PMMarvin
10/18/2025, 7:20 PMfetch_weather then save_weather.
Given your earlier DB reset issues and that youâre seeing âPendingâ in the list but âFinished in state Completed()â inside the details, this sounds like a UI/state index mismatch caused by the local server/database getting into a weird state.
Do this to get to a clean slate and verify:
1) Stop the local server
- Make sure no prefect server start is running. If youâre using Docker, stop the container.
2) Find and delete the local SQLite DB
- Show the exact DB URL (itâs masked by default):
uv run prefect config view --show-sources --show-secrets | grep DATABASE_CONNECTION_URL
- Delete the SQLite file shown there (typically ~/.prefect/prefect.db), e.g.:
rm -f ~/.prefect/prefect.db
3) Reset and start the server cleanly
uv run prefect server database reset -y
uv run prefect server start
Make sure youâre not using --no-services.
4) Ensure your Python session points at your local API
export PREFECT_API_URL="<http://127.0.0.1:4200/api>"
5) Run your flow with logging captured
from prefect import flow, task
import httpx
@task(log_prints=True)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
temps = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temp C: {forecasted_temp} degrees")
return forecasted_temp
@task(log_prints=True)
def save_weather(temp: float):
print(f"Saving {temp} to weather.csv")
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow(log_prints=True)
def pipeline(lat: float = 38.9, lon: float = -77.0):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
return result
if __name__ == "__main__":
pipeline()
6) In the UI
- Hard refresh the browser and toggle âLive updatesâ on the run page.
- You should see both tasks in the Graph tab and both in the Task Runs list as Completed.
If the second task still doesnât appear in the Graph:
- Please share:
- uv run prefect version
- Confirmation you started with uv run prefect server start (no extra flags)
- A screenshot of the run page (Graph tab + Task Runs tab)
- Also, confirm youâre looking at the same API your Python is pointing to (Local UI at http://127.0.0.1:4200 and PREFECT_API_URL set to http://127.0.0.1:4200/api).Gustavo Fonseca
10/18/2025, 7:33 PMMarvin
10/18/2025, 7:33 PMGustavo Fonseca
10/18/2025, 7:33 PMMarvin
10/18/2025, 7:33 PMMarvin
10/18/2025, 7:34 PMtime.sleep(1) keeps your process alive just a bit longer, giving those background services time to flush updates so the UI graph catches up. Without the delay, the run ends quickly and the UI can momentarily show stale state in the list/graph, even though the taskâs detail page shows Completed.
You donât need the sleep â here are better options
- Refresh and enable Live updates on the run page; give it a few seconds after the run completes.
- Make sure you start the server with services enabled (default). Avoid --no-services.
- Start it cleanly:
uv run prefect server start
- For faster local feedback, reduce the flush interval:
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 uv run prefect server start
- Optional: enable task print logging so you see output in the UI immediately:
@task(log_prints=True)
def save_weather(temp: float):
print(f"Saving {temp}âŠ")
...
Why the mismatch between âPendingâ list vs. âCompletedâ details?
- The list/graph views depend on aggregated artifacts/events that are flushed on a schedule.
- The task detail view queries the task run state directly, which is usually up-to-date first.
- If you recently reset the DB while the server was running, that can also exacerbate temporary inconsistencies.
If you still see oddities after a clean server start, please share:
- uv run prefect version
- How youâre starting the server (the exact command)
- A quick screenshot of the run Graph tab vs. Task Runs tab
Docs:
- Flows and tasks: Flows and tasks
- Settings reference (event persister settings): Settings referenceGustavo Fonseca
10/18/2025, 7:37 PMGustavo Fonseca
10/18/2025, 7:37 PMMarvin
10/18/2025, 7:37 PMMarvin
10/18/2025, 7:40 PMPREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 is a valid env var you can set.
What to set
- Flush interval (seconds): PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL (default 5)
- Batch size: PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE (default 20)
- Enable/disable event persister: PREFECT_API_SERVICES_EVENT_PERSISTER_ENABLED (default True)
- Event stream out to UI: PREFECT_API_EVENTS_STREAM_OUT_ENABLED (default True)
Two ways to apply it
Option A â Per run (environment variable)
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 uv run prefect server start
Option B â Persist to your Prefect profile (CLI)
uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1
uv run prefect config view --show-sources
# restart the server to pick up the new setting
uv run prefect server start
Notes
- You can also use the SERVER-prefixed variants (e.g., PREFECT_SERVER_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL); the API-prefixed ones work well for local prefect server start.
- Lowering flush interval increases write frequency; great for local dev, but keep defaults in prod unless you have a reason.
Settings reference: Settings referenceGustavo Fonseca
10/18/2025, 7:43 PMGustavo Fonseca
10/18/2025, 7:45 PMMarvin
10/18/2025, 7:45 PMMarvin
10/18/2025, 7:46 PMprefect config set ... writes the setting to your current Prefect profile and persists it on disk.
- Location: $PREFECT_HOME/profiles.toml (by default ~/.prefect/profiles.toml).
- It does NOT reset when you close your terminal or kill the CLI; it remains until you change/unset it or switch profiles.
- You can confirm the source:
uv run prefect config view --show-sources
uv run prefect profile ls
- To unset or change later:
uv run prefect config unset PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL -y
# or set a new value
uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=5
What it actually configures
- PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL controls how often the serverâs Event Persister flushes pending events to the database (in seconds).
- The Event Persister batches events (task/flow state changes, logs, artifacts) and flushes when:
- either the batch size is reached (PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE, default 20)
- or the flush interval elapses (default 5s; you set it to 1s)
- Lowering the interval makes the UI reflect updates faster, at the cost of more frequent DB writes. Itâs a good tweak for local dev.
- This is a server-side setting; restart the server to pick up changes:
uv run prefect server start
Tips and notes
- The setting is saved in the active profile only. If you switch profiles (e.g., uv run prefect profile use another-profile) or run in a container without your local profiles, you wonât see the same behavior unless you set it there too.
- You can also use the SERVER-prefixed variant; both are accepted:
- PREFECT_SERVER_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
- Full settings reference: Settings reference
- Profiles and settings overview: Settings and profilesGustavo Fonseca
10/18/2025, 7:55 PMMarvin
10/18/2025, 7:55 PMMarvin
10/18/2025, 7:55 PMuv run prefect config unset PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL -y
Then restart your server to apply the change:
uv run prefect server start
Verify itâs gone (and back to the default of 5 seconds):
uv run prefect config view --show-sources
Notes:
- This removes the setting from your current profile (stored in ~/.prefect/profiles.toml by default).
- If you had also exported it as an environment variable in your shell, unset that too:
unset PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
Docs:
- Settings and profiles
- Settings referenceGustavo Fonseca
10/18/2025, 7:58 PMGustavo Fonseca
10/18/2025, 7:58 PMMarvin
10/18/2025, 7:58 PMMarvin
10/18/2025, 8:00 PMPREFECT_LOGGING_LOG_PRINTS if you want plain print() output to show up as Prefect logs. It applies both locally and in Cloud; itâs not Cloud-specific.
Key points:
- Default is False. By default, print() output is not routed through Prefect logging.
- You can enable it globally with the setting:
- Temporary (env var): PREFECT_LOGGING_LOG_PRINTS=true
- Persisted (profile): uv run prefect config set PREFECT_LOGGING_LOG_PRINTS=true
- Or enable per-flow/task with the decorator option, which overrides the global:
- @flow(log_prints=True) and/or @task(log_prints=True)
If youâre already seeing INFO logs, thatâs from Prefectâs logger. Set log_prints only if you also want your print() statements to appear alongside those logs in the UI.
Docs:
- Add logging
- Settings referenceGustavo Fonseca
10/18/2025, 8:19 PM{
"lat": [],
"lon": []
}
But when i click on Task Runs and select a task and click on the parameters it shows for save_weather:
{
"temp": [
{
"inputType": "task_run",
"id": "0199f8ee-4053-7689-b7ac-67156b4c3528"
}
]
Giving my code:
from prefect import flow, task
import httpx
import time
@task
def fetch_weather(lat: float, lon: float):
base_url = "https://api.open-meteo.com/v1/forecast/"
temps = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temp C: {forecasted_temp} degrees")
return forecasted_temp
@task
def save_weather(temp: str):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float = 38.9, lon: float = -77.0):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
return result
if name == "__main__":
pipeline()
I was specting to get the values recieved by fetch_weather about len and lon anda astring for save_weather
Why is that?
This is what it shows on the flow parameters:
{
"lat": 38.9,
"lon": -77
}Gustavo Fonseca
10/18/2025, 8:20 PM{
"lat": [],
"lon": []
}
But when i click on Task Runs and select a task and click on the parameters it shows for save_weather:
{
"temp": [
{
"inputType": "task_run",
"id": "0199f8ee-4053-7689-b7ac-67156b4c3528"
}
]
Giving my code:
from prefect import flow, task
import httpx
import time
@task
def fetch_weather(lat: float, lon: float):
base_url = "https://api.open-meteo.com/v1/forecast/"
temps = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temp C: {forecasted_temp} degrees")
return forecasted_temp
@task
def save_weather(temp: str):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float = 38.9, lon: float = -77.0):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
return result
if name == "__main__":
pipeline()
I was specting to get the values recieved by fetch_weather about len and lon anda astring for save_weather
Why is that?
This is what it shows on the flow parameters:
{
"lat": 38.9,
"lon": -77
}Marvin
10/18/2025, 8:20 PMMarvin
10/18/2025, 8:23 PM{
"temp": [
{
"inputType": "task_run",
"id": "<upstream task run id>"
}
]
}
Because temp came from the output of the fetch_weather task run. Thatâs lineage: a task_run -> temp.
- fetch_weather.lat and fetch_weather.lon show empty arrays:
{
"lat": [],
"lon": []
}
Because those were plain primitives from your flow parameters with no trackable upstream dependency. Prefect does not dump literal values there; it only records provenance. In some cases you may see inputType: "parameter" or inputType: "constant" entries, but the UI does not display raw values for task arguments by default (partly for privacy/perf reasons).
If you want to see the actual values at the task level:
- Log them explicitly (best for local dev):
from prefect import task, flow
@task(log_prints=True)
def fetch_weather(lat: float, lon: float):
print(f"lat={lat}, lon={lon}")
...
- Or create an artifact with the inputs/outputs you care about:
from prefect import artifacts
@task
def save_weather(temp: float):
artifacts.create_markdown(f"Temp: {temp}")
...
References:
- UI task run schema (task_inputs provenance): UI schemas
- Flow/task concepts: Flows and tasksGustavo Fonseca
10/18/2025, 8:38 PMMarvin
10/18/2025, 8:38 PMMarvin
10/18/2025, 8:39 PMuv run prefect block type ls
- Generate a creation link for Secret:
uv run prefect block create secret
- Click the URL, fill Name = secret-block, Value = chave, Save.
- Verify:
uv run prefect block inspect secret/secret-block
Option B â One-liner from the shell (Python via uv)
- Do the same thing you wrote in code but as a CLI one-liner:
uv run python -c "from prefect.blocks.system import Secret; Secret(value='chave').save('secret-block')"
- Verify:
uv run prefect block ls
uv run prefect block inspect secret/secret-block
Use in flows:
from prefect.blocks.system import Secret
api_key = Secret.load("secret-block").get()
Notes:
- The secret value will be encrypted at rest and masked in the UI/CLI.
- Be mindful of shell history if you put secrets directly in command lines.
Docs:
- Blocks concept
- Store secretsGustavo Fonseca
10/18/2025, 9:49 PMGustavo Fonseca
10/18/2025, 9:49 PMMarvin
10/18/2025, 9:49 PMMarvin
10/18/2025, 9:53 PM@task(persist_result=True) tells Prefect to serialize the taskâs return value and write it to result storage. What that means in practice:
What is stored
- The returned object (your pandas DataFrame) is serialized and written to a file in your configured result storage.
- By default, thatâs the local filesystem under ~/.prefect/storage/, using a UUID-like key.
- The Prefect database stores only metadata (storage key, serializer, block id), not the actual bytes.
How itâs serialized
- Default serializer is cloudpickle (i.e., pickled via cloudpickle).
- You can change the serializer or storage per-task/flow, or globally (e.g., to S3/GCS/Azure via blocks).
- Note: pickled objects are Python/pandas-version dependent; prefer smaller/JSON-serializable results when possible.
How to load it back (recommended)
- Let Prefect do the unpickling for you. Fetch the task runâs state and ask Prefect to return the Python object:
import asyncio
from prefect.client.orchestration import get_client
from prefect.results import get_state_result
async def load_latest_my_task_result():
async with get_client() as client:
# get the most recent run of 'my_task'
trs = await client.read_task_runs(
task_run_filter={"name": {"any_": ["my_task"]}},
limit=1,
)
state = trs[0].state
df = await get_state_result(state) # fetches from storage and deserializes
return df
df = asyncio.run(load_latest_my_task_result())
print(df)
Manual unpickle (not recommended, but possible)
- If you really want to read the bytes yourself, you need the storage key and the serializer used.
- Typically the file will be at ~/.prefect/storage/<storage_key>. Then:
import os, cloudpickle
path = os.path.expanduser("~/.prefect/storage/<storage_key>")
with open(path, "rb") as f:
df = cloudpickle.load(f)
Caveats:
- The exact on-disk layout and metadata handling are internal details; using get_state_result keeps you insulated from changes and works for remote storage too.
- Large pickled results can be slow/heavy; consider writing big artifacts (like parquet/CSV to object storage) and returning just a reference/URI.
Useful settings and knobs:
- Default result storage block: set globally or per task/flow (result_storage=...)
- Default serializer: PREFECT_RESULTS_DEFAULT_SERIALIZER
- Persist by default (all tasks): PREFECT_TASKS_DEFAULT_PERSIST_RESULT=true
Docs:
- Results overview: https://docs-3.prefect.io/v3/advanced/results
- Serializers API: https://docs-3.prefect.io/v3/api-ref/python/prefect-serializers
- Filesystems/blocks: https://docs-3.prefect.io/v3/api-ref/python/prefect-filesystemsGustavo Fonseca
10/18/2025, 11:08 PMGustavo Fonseca
10/18/2025, 11:15 PMMarvin
10/18/2025, 11:15 PMMarvin
10/18/2025, 11:18 PMfrom prefect.blocks.notifications import SlackWebhook
@flow
def my_flow():
SlackWebhook.load("my-slack").notify("Flow finished!")
- Use deployment-level triggers in prefect.yaml (these are not Automations, but can react to events like schedule/bad states for a deployment).
- Roll your own âautomationâ in Python by listening for events via API or by adding guards in flows/tasks.
References:
- Automations (Cloud): Creating automations
- Notifications blocks: Notification blocks
- Deployment triggers: Deployment triggersGustavo Fonseca
10/19/2025, 12:18 AMMarvin
10/19/2025, 12:18 AMMarvin
10/19/2025, 12:22 AMfrom my_flow_module import my_flow
my_flow() # ad-hoc run
# or
from prefect import flow
@flow
def my_flow(...):
...
my_flow.serve(name="local") # local scheduler, no deployments needed
Create a deployment from GitHub (public repo):
from prefect import flow
from prefect.storage import GitRepository
# Option A: direct URL string
flow.from_source(
source="<https://github.com/owner/repo>",
entrypoint="path/to/flow_file.py:my_flow",
).deploy(
name="my-deployment",
work_pool_name="process-pool" # local process worker
)
# Option B: explicit GitRepository (choose branch/commit)
repo = GitRepository(
url="<https://github.com/owner/repo>",
branch="main", # or commit_sha="abc123"
)
flow.from_source(repo, "path/to/flow_file.py:my_flow").deploy(
name="my-deployment",
work_pool_name="process-pool"
)
Private repositories
- Use a token via GitCredentials or a Secret block. Example:
from prefect.storage import GitRepository, GitCredentials
from prefect.blocks.system import Secret
# Save your token once
Secret(value="ghp_xxx").save("github-token")
creds = GitCredentials(access_token=Secret.load("github-token").get())
repo = GitRepository(url="<https://github.com/owner/private-repo>", credentials=creds)
flow.from_source(repo, "flows/my_flow.py:my_flow").deploy(
name="secure-deployment",
work_pool_name="process-pool"
)
Run it
- Start a local work pool and worker to pick up deployments:
uv run prefect work-pool create process-pool --type process
uv run prefect worker start -p process-pool
- Then kick off a run from the UI or:
uv run prefect deployment run "my-flow/my-deployment"
Notes
- No GitHub storage block needed; do not use deprecated 2.x patterns like Deployment.build_from_flow() or prefect deployment build.
- For purely local iteration, you can skip deployments and Git entirely; flow.serve() is super convenient for local schedules.
- Docs:
- Store flow code: Store flow code
- Deploy flows: Deployment versioning
- GitHub integration notes: prefect-githubGustavo Fonseca
10/19/2025, 12:47 AMMarvin
10/19/2025, 12:47 AMMarvin
10/19/2025, 12:48 AMuv run prefect config view --show-sources
Make sure PREFECT_API_URL is your local server if thatâs where you created the deployment (e.g. http://127.0.0.1:4200/api).
2) Verify the deployment actually exists
uv run prefect deployment ls
uv run prefect deployment inspect "pipeline/my-first-managed-deployment"
If inspect returns ânot foundâ, the deployment doesnât exist in this API. Since you ânukedâ the DB earlier, any existing deployments were deleted â youâll need to recreate them.
3) Create (or recreate) the deployment
- In Python:
from prefect import flow
@flow
def pipeline(lat: float = 38.9, lon: float = -77.0):
...
# run once to create the deployment
pipeline.deploy(
name="my-first-managed-deployment",
work_pool_name="process-pool",
)
- Or via CLI from your project root:
uv run prefect deploy ./path/to/file.py:pipeline --name my-first-managed-deployment
(Youâll still need a work pool/worker; see next step.)
4) Ensure a worker is running for the target work pool
uv run prefect work-pool create process-pool --type process
uv run prefect worker start -p process-pool
5) Call run_deployment with the exact name
Your code is fine:
from prefect import flow
from prefect.deployments import run_deployment
@flow
def run_deployment_from_flow():
run_deployment(
name="pipeline/my-first-managed-deployment",
parameters={"lat": 1, "lon": 2},
)
Tip: If you prefer, you can pass the deployment UUID instead of the âflow-name/deployment-nameâ string.
If this still errors, please share:
- Output of uv run prefect deployment ls
- Output of uv run prefect config view --show-sources
- How you created the deployment (UI, Python, or CLI)
Docs:
- Store flow code for deployments
- Deployment/versioning guideGustavo Fonseca
10/19/2025, 2:15 AMGustavo Fonseca
10/19/2025, 2:15 AMMarvin
10/19/2025, 2:15 AMMarvin
10/19/2025, 2:16 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import httpx
@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def fetch_api1():
return httpx.get("<https://api.example.com/a>").json()
@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def fetch_api2():
return httpx.get("<https://api.example.com/b>").json()
@task(retries=2, retry_delay_seconds=5)
def download_file(url: str) -> str:
# stream to disk; donât return large bytes
path = f"data/{url.split('/')[-1]}"
with httpx.stream("GET", url) as r:
r.raise_for_status()
with open(path, "wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return path # return the path, not file contents
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def etl(urls: list[str]):
a1_f = fetch_api1.submit()
a2_f = fetch_api2.submit()
file_fs = [download_file.submit(u) for u in urls]
a1 = a1_f.result()
a2 = a2_f.result()
paths = [f.result() for f in file_fs]
return a1, a2, paths
if __name__ == "__main__":
etl(["<https://example.com/large1.bin>", "<https://example.com/large2.bin>"])
When to use separate flows (and optionally run them in parallel)
- If API A and API B should be deployed/scheduled separately or run on different workers:
from prefect import flow
from prefect.deployments import run_deployment
@flow
def orchestrator():
# kick off two independent deployed flows in parallel
fr1 = run_deployment(name="api-a-flow/prod")
fr2 = run_deployment(name="api-b-flow/prod")
# optionally wait / poll if you need their results
This launches distinct flow runs that can execute concurrently across workers/machines.
Additional tips
- IO-bound work (HTTP, S3/GCS downloads): ThreadPoolTaskRunner is ideal.
- CPU-bound work (parsing, compression): ProcessPoolTaskRunner is better.
- Avoid returning large payloads; write to storage and return a reference/path.
- Add retries/timeouts to network tasks.
- Persisting results: only persist small/structured outputs; for large files, persist the file path or a URI.
References:
- Flows and tasks: Docs
- Run flows in local processes: Guide
- Results best practices: ResultsGustavo Fonseca
10/19/2025, 2:21 AMMarvin
10/19/2025, 2:21 AMMarvin
10/19/2025, 2:21 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import httpx
@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def api_a():
return httpx.get("<https://api.example.com/a>").json()
@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def api_b():
return httpx.get("<https://api.example.com/b>").json()
@task(retries=2, retry_delay_seconds=5)
def big_file(url: str) -> str:
path = f"data/{url.split('/')[-1]}"
with httpx.stream("GET", url) as r:
r.raise_for_status()
with open(path, "wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return path
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def pipeline(urls: list[str]):
a_f = api_a.submit()
b_f = api_b.submit()
file_fs = [big_file.submit(u) for u in urls]
a = a_f.result()
b = b_f.result()
paths = [f.result() for f in file_fs]
return a, b, paths
Pattern B: async fanâout inside a task (useful when each âmain taskâ needs many concurrent calls)
import asyncio, httpx
from prefect import task
@task
async def download_many(urls: list[str]) -> list[str]:
limits = httpx.Limits(max_connections=10)
timeout = httpx.Timeout(60)
sem = asyncio.Semaphore(5)
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
async def fetch(u):
async with sem:
path = f"data/{u.split('/')[-1]}"
async with client.stream("GET", u) as r:
r.raise_for_status()
with open(path, "wb") as f:
async for chunk in r.aiter_bytes():
f.write(chunk)
return path
return await asyncio.gather(*[fetch(u) for u in urls])
Guidance:
- Itâs fine to combine both (parallel tasks + internal async), but be mindful of total concurrency so you donât overwhelm the APIs or your machine.
- Use retries/timeouts on network tasks.
- For CPUâbound work (parsing/compression), prefer ProcessPoolTaskRunner.
- Return paths/handles for large files instead of the raw bytes.
If you share a quick outline of your three APIs and the download counts, I can suggest concrete max_workers, timeouts, and concurrency caps.Gustavo Fonseca
10/19/2025, 5:54 PMMarvin
10/19/2025, 5:54 PMMarvin
10/19/2025, 5:55 PMfrom pathlib import Path
import httpx
def ensure_dir(path: str | Path) -> Path:
p = Path(path)
p.mkdir(parents=True, exist_ok=True)
return p
def download_stream(url: str, dest_path: str | Path, timeout: float = 60.0) -> str:
dest_path = Path(dest_path)
ensure_dir(dest_path.parent)
with httpx.stream("GET", url, timeout=timeout) as r:
r.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return str(dest_path)
tasks/tse.py
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream
# Replace these with your real endpoints
TSE_ENDPOINTS = {
"tse_a": "<https://example.com/tse/a.csv>",
"tse_b": "<https://example.com/tse/b.csv>",
"tse_c": "<https://example.com/tse/c.csv>",
}
@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def download_tse_endpoint(name: str, url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
dest = Path(out_dir) / f"{name}.csv"
<http://logger.info|logger.info>(f"Downloading TSE endpoint '{name}' -> {dest}")
return download_stream(url, dest)
tasks/congresso/base.py
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream
# Replace with your real manifest URL
CONGRESSO_MANIFEST_URL = "<https://example.com/congresso/manifest.json>"
@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def download_manifest(out_dir: str = "data/congresso") -> str:
logger = get_run_logger()
dest = Path(out_dir) / "manifest.json"
<http://logger.info|logger.info>(f"Downloading Congresso manifest -> {dest}")
return download_stream(CONGRESSO_MANIFEST_URL, dest)
tasks/congresso/subtask_a.py
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream
# Replace these with your real endpoints
CONGRESSO_A_ENDPOINTS = {
"a_1": "<https://example.com/congresso/a_1.json>",
"a_2": "<https://example.com/congresso/a_2.json>",
"a_3": "<https://example.com/congresso/a_3.json>",
}
@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def download_congresso_a_endpoint(name: str, url: str, manifest_path: str, out_dir: str = "data/congresso/a") -> str:
# Use manifest_path if you need to read metadata for requests
logger = get_run_logger()
dest = Path(out_dir) / f"{name}.json"
<http://logger.info|logger.info>(f"[A] Downloading {name} using manifest {manifest_path} -> {dest}")
return download_stream(url, dest)
tasks/congresso/subtask_b.py
```
from pathlib import Path
from prefect import task, get_run_logger
from utils.io import download_stream
# Replace with your real B endpoints that must run sequentially
CONGRESSO_B_ENDPOINTS = [
"https://example.com/congresso/b_1.parquet",
"https://example.com/congresso/b_2.parquet",
]
@task(retries=2, retry_delay_seconds=5, timeout_seconds=600, log_prints=True)
def download_congresso_b_sequential(manifest_path: str, out_dir: str = "data/congresso/b") -> list[str]:
logger = get_run_logger()
results = []
for i, url in enumerate(CONGRESSO_B_ENDPOINTS, start=1):
dest = Path(out_dir) / f"b_{i}.parquet"Marvin
10/19/2025, 5:55 PMflows/pipeline.py
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from tasks.tse import TSE_ENDPOINTS, download_tse_endpoint
from tasks.congresso.base import download_manifest
from tasks.congresso.subtask_a import CONGRESSO_A_ENDPOINTS, download_congresso_a_endpoint
from tasks.congresso.subtask_b import download_congresso_b_sequential
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True)
def pipeline():
logger = get_run_logger()
logger.info("Starting pipeline")
# TSE: three endpoints in parallel
tse_fs = [download_tse_endpoint.submit(name, url) for name, url in TSE_ENDPOINTS.items()]
# Congresso: first download prerequisite (manifest)
manifest_path = download_manifest()
# Then A and B in parallel:
# - A: three endpoints in parallel
a_fs = [download_congresso_a_endpoint.submit(name, url, manifest_path) for name, url in CONGRESSO_A_ENDPOINTS.items()]
# - B: must run sequentially, so a single task that loops internally
b_f = download_congresso_b_sequential.submit(manifest_path)
# Gather results
tse_paths = [f.result() for f in tse_fs]
a_paths = [f.result() for f in a_fs]
b_paths = b_f.result()
logger.info(f"TSE files: {tse_paths}")
logger.info(f"Congresso A files: {a_paths}")
logger.info(f"Congresso B files: {b_paths}")
return {"tse": tse_paths, "congresso_a": a_paths, "congresso_b": b_paths}
if name == "__main__":
pipeline()
Why this matches your requirements
- One flow orchestrates everything.
- TSE endpoints run in parallel.
- Congresso downloads the manifest first, then runs Subtask A (parallel) and Subtask B (sequential) at the same time.
- All downloads stream to disk; tasks return file paths, not large payloads.
- ThreadPoolTaskRunner is ideal for IO-bound HTTP work.
Tuning and tips
- Control parallelism by adjusting `max_workers` in ThreadPoolTaskRunner.
- Add rate limiting if you need it later (Prefect concurrency limits or a simple `asyncio.Semaphore` inside a task if you switch to async).
- Put secrets (API keys) in Secret blocks and read them in tasks â avoid Variables for sensitive data.
- For production, consider persisting only small results and store big files in object storage; return URIs instead of local paths.
Run it
- Start your local server (optional if you just want local logs/observability):
uv run prefect server start
- Execute the flow:
uv run python flows/pipeline.py
- Or serve/schedule locally:
uv run python -c "from flows.pipeline import pipeline; pipeline.serve(name='local-pipeline')"
```Gustavo Fonseca
10/19/2025, 6:18 PMGustavo Fonseca
10/19/2025, 6:18 PMMarvin
10/19/2025, 6:19 PMMarvin
10/19/2025, 6:19 PMdownload_stream for big CSVs, and add a couple of small JSON helpers so you donât buffer large payloads unnecessarily or return huge objects from tasks.
Hereâs a minimal set of utilities I recommend:
- download_stream(url, dest_path): for large CSVs or binaries
- fetch_json(url): for a single small JSON you want in memory
- save_json(url, dest_path): for a single JSON saved to disk
- fetch_json_many_async(urls, out_dir, concurrency): for many small JSONs in parallel with an async client
- write_ndjson(records, dest_path): if you prefer aggregating many small JSONs into one NDJSON file
utils/io.py
from __future__ import annotations
import asyncio
import json
from pathlib import Path
import hashlib
import httpx
def ensure_dir(path: str | Path) -> Path:
p = Path(path)
p.mkdir(parents=True, exist_ok=True)
return p
def download_stream(url: str, dest_path: str | Path, timeout: float = 60.0) -> str:
dest_path = Path(dest_path)
ensure_dir(dest_path.parent)
with httpx.stream("GET", url, timeout=timeout) as r:
r.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return str(dest_path)
def fetch_json(url: str, timeout: float = 30.0) -> dict:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
return r.json()
def save_json(url: str, dest_path: str | Path, timeout: float = 30.0) -> str:
dest_path = Path(dest_path)
ensure_dir(dest_path.parent)
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
data = r.json()
with open(dest_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
return str(dest_path)
def write_ndjson(records: list[dict], dest_path: str | Path) -> str:
dest_path = Path(dest_path)
ensure_dir(dest_path.parent)
with open(dest_path, "w", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
return str(dest_path)
async def fetch_json_many_async(
urls: list[str],
out_dir: str | Path | None = None,
concurrency: int = 10,
timeout: float = 30.0,
) -> list[str] | list[dict]:
"""
- If out_dir is provided, saves each JSON to a file and returns list of file paths.
- Otherwise returns list of parsed dicts in memory.
"""
sem = asyncio.Semaphore(concurrency)
limits = httpx.Limits(max_connections=max(concurrency, 10))
timeout_cfg = httpx.Timeout(timeout)
ensure_dir(out_dir) if out_dir else None
async def one(u: str):
async with sem:
async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client:
r = await client.get(u)
r.raise_for_status()
data = r.json()
if out_dir:
# deterministic filename by URL hash
name = hashlib.sha1(u.encode()).hexdigest() + ".json"
path = Path(out_dir) / name
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
return str(path)
else:
return data
return await asyncio.gather(*[one(u) for u in urls])
Examples of using these in Prefect tasks
- TSE large CSVs: keep streaming to disk (your current use of download_stream is perfect).
- Congresso single small JSON: use save_json if you want a file, or fetch_json if you need the content in-memory downstream.
- Congresso many small JSONs: use an async task that calls fetch_json_many_async with a concurrency cap.
tasks/congresso/subtask_a.py (parallel small JSONs)
```
from prefect import task
from utils.io import fetch_json_many_async
@task(retries=2, retry_delay_seconds=5)
async def download_many_congresso_a(urls: list[str], out_dir: str = "data/congresso/a") -> list[str]:Marvin
10/19/2025, 6:19 PMtasks/congresso/base.py (single JSON manifest)
from prefect import task
from utils.io import save_json
@task(retries=2, retry_delay_seconds=5)
def download_manifest(url: str, out_dir: str = "data/congresso") -> str:
return save_json(url, dest_path=f"{out_dir}/manifest.json", timeout=60.0)
```
Why split like this
- You avoid holding big payloads in memory; tasks return paths.
- Async client + concurrency for âmany small JSONsâ is efficient and controllable.
- You can tune concurrency easily to respect rate limits.
General guidance
- Keep using ThreadPoolTaskRunner at the flow level for IO-bound tasks; async tasks will still work fine inside.
- Add retries/timeouts on tasks (as shown) and keep per-request timeouts reasonable.
- If you later need to aggregate many JSONs, prefer writing NDJSON files with write_ndjson or store them in object storage and return URIs.
If you share some endpoint counts and typical sizes, I can suggest good defaults for concurrency and timeouts for each group.Gustavo Fonseca
10/20/2025, 12:59 AMMarvin
10/20/2025, 12:59 AMMarvin
10/20/2025, 1:01 AM[tse]
base_url = "<https://api.tse.example>"
stream_timeout = 120
[congresso]
base_url = "<https://api.congresso.example>"
json_timeout = 30
json_concurrency = 10
[runner]
max_workers = 16
config/loader.py
import os, json
from pathlib import Path
from dataclasses import dataclass
from typing import Any
from prefect.variables import Variable
try:
import tomllib # Python 3.11+
except ModuleNotFoundError:
import tomli as tomllib # pip install tomli for 3.10-
@dataclass
class Settings:
tse_base_url: str
tse_stream_timeout: float
congresso_base_url: str
congresso_json_timeout: float
congresso_json_concurrency: int
runner_max_workers: int
def _read_toml(path: str | Path) -> dict[str, Any]:
p = Path(path)
if not p.exists():
return {}
with open(p, "rb") as f:
return tomllib.load(f)
def _get_var(name: str) -> str | None:
# Prefect Variables override if set (non-sensitive only)
try:
v = Variable.get(name)
return None if v in ("", None) else v
except Exception:
return None
def load_settings(config_path: str | Path = "config/settings.toml") -> Settings:
cfg = _read_toml(config_path)
def pick(var_name: str, env_name: str, default: Any) -> Any:
# priority: Prefect Variable -> ENV -> TOML -> default
v = _get_var(var_name)
if v is not None:
return v
if (e := os.getenv(env_name)) is not None:
return e
return default
return Settings(
tse_base_url=str(pick("TSE_BASE_URL", "MYAPP_TSE_BASE_URL", cfg.get("tse", {}).get("base_url", ""))),
tse_stream_timeout=float(pick("TSE_STREAM_TIMEOUT", "MYAPP_TSE_STREAM_TIMEOUT", cfg.get("tse", {}).get("stream_timeout", 120))),
congresso_base_url=str(pick("CONGRESSO_BASE_URL", "MYAPP_CONGRESSO_BASE_URL", cfg.get("congresso", {}).get("base_url", ""))),
congresso_json_timeout=float(pick("CONGRESSO_JSON_TIMEOUT", "MYAPP_CONGRESSO_JSON_TIMEOUT", cfg.get("congresso", {}).get("json_timeout", 30))),
congresso_json_concurrency=int(pick("CONGRESSO_JSON_CONCURRENCY", "MYAPP_CONGRESSO_JSON_CONCURRENCY", cfg.get("congresso", {}).get("json_concurrency", 10))),
runner_max_workers=int(pick("RUNNER_MAX_WORKERS", "MYAPP_RUNNER_MAX_WORKERS", cfg.get("runner", {}).get("max_workers", 16))),
)
flows/pipeline.py (using the settings)
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings
@flow
def pipeline():
s = load_settings()
# You can pass these into tasks or construct runners with them
print("Using:", s)
if __name__ == "__main__":
s = load_settings()
pipeline.with_options(task_runner=ThreadPoolTaskRunner(max_workers=s.runner_max_workers))()
Using Prefect Variables for quick overrides (non-sensitive)
uv run prefect variable set TSE_BASE_URL <https://api.tse.example>
uv run prefect variable set RUNNER_MAX_WORKERS 32
Unset to fall back to env/TOML/defaults:
uv run prefect variable unset RUNNER_MAX_WORKERS -y
Where to put secrets
- Use Secret blocks:
```
from prefect.blocks.system import Secret
Secret(value="your-token").save("tse-token")
# laterMarvin
10/20/2025, 1:01 AMGustavo Fonseca
10/20/2025, 2:15 AMMarvin
10/20/2025, 2:15 AMMarvin
10/20/2025, 2:17 AMVariable.get(...) is sync-compatible: its static type is âSync[T]â, which expands to âT | Coroutine[Any, Any, T]â.
- For Prefect Variables, T is StrictVariableValue (a union of JSON-like primitives: str | int | float | bool | None).
- Your function is annotated to return str | None, but the type checker sees âStrictVariableValue | Coroutine[...]â coming back â not assignable to str | None. It also flags that the value might be a float/bool, not a string.
Does it actually return a coroutine?
- Only in async contexts. If you call Variable.get inside an async function, you should await it.
- In sync code, it returns the value directly. The coroutine bit is in the type to support both contexts.
Two clean ways to resolve
Option A â Return the correct Prefect type and convert later
from typing import Optional, cast
from prefect.variables import Variable
from prefect.types import StrictVariableValue
def get_var(name: str) -> Optional[StrictVariableValue]:
# Cast removes the Sync[...] union for type-checkers
v = cast(StrictVariableValue, Variable.get(name, default=None))
return v
Then when you need a string/float/int, convert explicitly:
def get_var_str(name: str) -> str | None:
v = get_var(name)
return None if v is None else str(v)
def get_var_int(name: str, default: int) -> int:
v = get_var(name)
if v is None:
return default
if isinstance(v, bool):
return int(v)
if isinstance(v, (int, float, str)):
return int(v)
raise TypeError(f"Unsupported type for int: {type(v)}")
Option B â Support async usage
If youâre calling this inside async code, make the helper async and await:
from typing import Optional
from prefect.variables import Variable
from prefect.types import StrictVariableValue
async def aget_var(name: str) -> Optional[StrictVariableValue]:
return await Variable.get(name, default=None)
async def aget_var_str(name: str) -> str | None:
v = await aget_var(name)
return None if v is None else str(v)
Why casting helps
- Prefect annotates many functions with a Sync[T] alias (T | Coroutine[..., T]) to be accurate in both sync/async contexts.
- In purely sync helpers, using cast(StrictVariableValue, Variable.get(...)) tells the type checker âtreat this like the sync branch,â removing the coroutine from the union.
Also note
- Prefect Variables can hold numbers and booleans, not just strings. If you want all config as strings, coerce with `str(v)`; if you want typed config, convert thoughtfully (like the int/float helpers above).
- If you truly want your helper to return only strings, reflect that in both the conversion and the annotation.
Docs:
- Variables: https://docs-3.prefect.io/v3/concepts/variables
If you share which exact fields you want as str/int/float, I can sketch small typed getters you can drop into your loader.Gustavo Fonseca
10/20/2025, 2:32 AMGustavo Fonseca
10/20/2025, 2:32 AMMarvin
10/20/2025, 2:32 AMMarvin
10/20/2025, 2:32 AMfrom typing import Optional, cast
from prefect.variables import Variable
from prefect.types import StrictVariableValue
def _get_var(name: str) -> Optional[StrictVariableValue]:
v = cast(StrictVariableValue, Variable.get(name, default=None))
return None if v in ("", None) else v
- Note: Variable.get is âsync-compatibleâ, so the cast is just to silence the type checker about the potential coroutine type in async contexts.
- If you call this from async code, create an async variant and await it.
2) Make sure your Settings casts match your dataclass annotations
You currently have:
- congresso_json_timeout annotated as float, but you cast with int(...). Either:
- change to float(...) OR
- change the dataclass field to int
Example with consistent types
return Settings(
runner_max_workers=int(pick("RUNNER_MAX_WORKERS","MYAPP_RUNNER_MAX_WORKERS", cfg.get("runner", {}).get("max_workers", 16))),
tse_base_url=str(pick("TSE_BASE_URL", "MYAPP_TSE_BASE_URL", cfg.get("tse", {}).get("base_url", ""))),
tse_stream_timeout=float(pick("TSE_STREAM_TIMEOUT", "MYAPP_TSE_STREAM_TIMEOUT", cfg.get("tse", {}).get("stream_timeout", 120))),
congresso_base_url=str(pick("CONGRESSO_BASE_URL","MYAPP_CONGRESSO_BASE_URL", cfg.get("congresso", {}).get("base_url", ""))),
congresso_json_timeout=float(pick("CONGRESSO_JSON_TIMEOUT","MYAPP_CONGRESSO_JSON_TIMEOUT", cfg.get("congresso", {}).get("json_timeout", 30))),
congresso_json_concurrency=int(pick("CONGRESSO_JSON_CONCURRENCY","MYAPP_CONGRESSO_JSON_CONCURRENCY", cfg.get("congresso", {}).get("json_concurrency", 10))),
)
Optional: typed pick helpers to centralize coercion and error messages
def pick_str(var, env, default):
v = _get_var(var);
if v is not None: return str(v)
e = os.getenv(env)
return e if e is not None else str(default)
def pick_int(var, env, default):
v = _get_var(var)
if v is not None:
if isinstance(v, bool): # avoid True->1 surprises
raise TypeError(f"{var} should be int, got bool")
return int(v)
e = os.getenv(env)
if e is not None: return int(e)
return int(default)
def pick_float(var, env, default):
v = _get_var(var)
if v is not None:
if isinstance(v, bool):
raise TypeError(f"{var} should be float, got bool")
return float(v)
e = os.getenv(env)
if e is not None: return float(e)
return float(default)
About âit says it can return a coroutineâ
- Prefect marks some APIs as sync-compatible: their type is T | Coroutine[..., T].
- In sync code, they return T directly; in async code, you must await them.
- Your helper is sync, so using cast(...) as shown is the right way to tell the type checker youâre using the sync branch.
Bottom line
- Keep explicit conversions when building Settings; itâs the safest way to normalize mixed sources.
- Fix the float/int mismatch for congresso_json_timeout.
- Use Variable.get(..., default=None) to avoid try/except and keep the code tidy.
Docs for Variables: VariablesGustavo Fonseca
10/20/2025, 2:42 AMGustavo Fonseca
10/20/2025, 2:42 AMMarvin
10/20/2025, 2:43 AMMarvin
10/20/2025, 2:43 AM[tse]
base_url = "<https://api.tse.example>"
stream_timeout = 120
[congresso]
base_url = "<https://api.congresso.example>"
json_timeout = 30
json_concurrency = 10
[runner]
max_workers = 16
config/loader.py
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
try:
import tomllib # Python 3.11+
except ModuleNotFoundError:
import tomli as tomllib # pip install tomli for 3.10
@dataclass
class Settings:
# TSE
tse_base_url: str
tse_stream_timeout: float = 120.0
# CONGRESSO
congresso_base_url: str
congresso_json_timeout: float = 30.0
congresso_json_concurrency: int = 10
# Runner
runner_max_workers: int = 16
def _read_toml(path: str | Path) -> dict[str, Any]:
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"Config file not found: {p.resolve()}")
with open(p, "rb") as f:
return tomllib.load(f)
def load_settings(config_path: str | Path = "config/settings.toml") -> Settings:
cfg = _read_toml(config_path)
# Required keys (fail fast with clear errors)
try:
tse_cfg = cfg["tse"]
congresso_cfg = cfg["congresso"]
except KeyError as e:
raise ValueError(f"Missing section in TOML: {e}. Expected [tse], [congresso].") from e
try:
tse_base_url = str(tse_cfg["base_url"])
congresso_base_url = str(congresso_cfg["base_url"])
except KeyError as e:
raise ValueError(f"Missing required key: {e}. Ensure base_url is set.") from e
# Optional keys with defaults from the dataclass
tse_stream_timeout = float(tse_cfg.get("stream_timeout", Settings.tse_stream_timeout))
congresso_json_timeout = float(congresso_cfg.get("json_timeout", Settings.congresso_json_timeout))
congresso_json_concurrency = int(congresso_cfg.get("json_concurrency", Settings.congresso_json_concurrency))
runner_max_workers = int(cfg.get("runner", {}).get("max_workers", Settings.runner_max_workers))
return Settings(
tse_base_url=tse_base_url,
tse_stream_timeout=tse_stream_timeout,
congresso_base_url=congresso_base_url,
congresso_json_timeout=congresso_json_timeout,
congresso_json_concurrency=congresso_json_concurrency,
runner_max_workers=runner_max_workers,
)
Using it in your flow
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings
@flow
def pipeline():
s = load_settings()
# pass settings to tasks as needed
return {"runner": s.runner_max_workers}
if __name__ == "__main__":
s = load_settings()
pipeline.with_options(task_runner=ThreadPoolTaskRunner(max_workers=s.runner_max_workers))()
Notes and best practices
- Keep secrets out of TOML; use Prefect Secret blocks for tokens/keys and read them in code when needed.
- If you want environment-specific configs later, you can add files like config/settings.dev.toml and pass the path to load_settings(...) without changing any other code.
- TOML preserves types, but still coerce to float/int at load time to be explicit and future-proof.
If you want stricter validation (e.g., URL format checks), I can show a pydantic-based loader too.Gustavo Fonseca
10/20/2025, 3:21 AMGustavo Fonseca
10/20/2025, 3:21 AMMarvin
10/20/2025, 3:21 AMMarvin
10/20/2025, 3:22 AM<http://logger.info|logger.info>("Iniciando pipeline")
- The <http://logger.info|logger.info> is Slackâs link syntax and will cause a syntax/type error in Python.
2) with_options âunknownâ is a typing/usage mismatch
- In Prefect 3, @flow returns a Flow object and with_options(...) is valid. Some linters/type-checkers still think itâs a plain function. At runtime it works if pipeline is actually decorated with @flow.
- Also, in your Settings dataclass the field is likely runner_max_workers (not max_workers). Using s.max_workers would raise an attribute error.
Two clean patterns
Pattern A â Compute runner from TOML, then with_options at call time
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings
@flow(log_prints=True) # no runner here; weâll set it at call time
def pipeline():
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
if __name__ == "__main__":
s = load_settings()
runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)
pipeline.with_options(task_runner=runner)()
If your type checker still complains that with_options doesnât exist, itâs safe to add a cast or ignore:
from typing import cast
from prefect.flows import Flow
cast(Flow, pipeline).with_options(task_runner=runner)()
# or: pipeline.with_options(task_runner=runner)() # type: ignore[attr-defined]
Pattern B â Use TOML at import time and set the runner in the decorator
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings
s = load_settings()
runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)
@flow(task_runner=runner, log_prints=True)
def pipeline():
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
if __name__ == "__main__":
pipeline()
This is simplest if youâre fine loading config at import time.
Quick checklist
- Make sure runner_max_workers in your TOML/Settings matches what you use in code.
- Keep the logger line as plain Python.
- with_options is the right way to override options per call in Prefect 3.
Docs: Flows and tasksGustavo Fonseca
10/20/2025, 3:30 AMMarvin
10/20/2025, 3:30 AMMarvin
10/20/2025, 3:30 AM<http://logger.info|logger.info>("Iniciando pipeline")
2) Pylance âNo overloads for __call__â is a type-checker false positive
In Prefect 3, passing a TaskRunner instance in the decorator is valid, but some editorsâ type stubs donât recognize it. The code will run. You can either suppress the warning or use a pattern that Pylance is happier with.
Clean working examples
A) Keep runner in the decorator (runtime works; may need a type-ignore)
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings
s = load_settings()
runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers) # ensure field name matches Settings
@flow(task_runner=runner, log_prints=True) # mypy/pylance may complain, but this runs
def pipeline():
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
if __name__ == "__main__":
pipeline()
If your editor still complains, add:
@flow(task_runner=runner, log_prints=True) # type: ignore[call-arg]
B) Move the runner to with_options at call time (usually quiets the linter)
from typing import cast
from prefect import flow, get_run_logger
from prefect.flows import Flow
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings
@flow(log_prints=True)
def pipeline():
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
if __name__ == "__main__":
s = load_settings()
runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)
cast(Flow, pipeline).with_options(task_runner=runner)()
Common gotchas to check
- Make sure your Settings field is runner_max_workers (not max_workers) if thatâs how you defined it.
- Ensure youâre on Prefect 3.x:
uv run prefect version
- Donât include Slack formatting in code blocks you paste into your editor.
Docs:
- Flows and tasks: Flows and tasksGustavo Fonseca
10/21/2025, 1:43 AMMarvin
10/21/2025, 1:43 AMMarvin
10/21/2025, 1:44 AMfrom __future__ import annotations
from pathlib import Path
import json
import httpx
def ensure_dir(path: str | Path) -> Path:
p = Path(path)
p.mkdir(parents=True, exist_ok=True)
return p
def filename_from_url(url: str) -> str:
# naive: last path segment
return url.rstrip("/").split("/")[-1] or "download.bin"
def download_stream(url: str, dest_path: str | Path, timeout: float = 120.0) -> str:
dest_path = Path(dest_path)
ensure_dir(dest_path.parent)
with httpx.stream("GET", url, timeout=timeout) as r:
r.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return str(dest_path)
def fetch_json(url: str, timeout: float = 30.0) -> dict:
with httpx.Client(timeout=timeout) as client:
r = client.get(url)
r.raise_for_status()
return r.json()
def save_json(url: str, dest_path: str | Path, timeout: float = 60.0) -> str:
data = fetch_json(url, timeout=timeout)
dest_path = Path(dest_path)
ensure_dir(dest_path.parent)
with open(dest_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
return str(dest_path)
tasks/tse.py
- One task per endpoint; keep the list of endpoints here
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream, filename_from_url
# Fill this with your 30 URLs
TSE_ENDPOINTS: list[str] = [
# "https://.../file1.zip",
# ...
]
@task(retries=2, retry_delay_seconds=5, timeout_seconds=600, log_prints=True)
def download_tse_zip(url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
name = filename_from_url(url)
dest = Path(out_dir) / name
<http://logger.info|logger.info>(f"TSE: downloading {url} -> {dest}")
return download_stream(url, dest)
tasks/congresso/legislatura.py
- Small JSON that feeds Deputados/Frentes
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import fetch_json
# Replace with real endpoint
LEGISLATURA_URL = "<https://api.congresso.example/legislatura>"
@task(retries=2, retry_delay_seconds=5, timeout_seconds=60, log_prints=True)
def fetch_legislatura() -> dict:
logger = get_run_logger()
<http://logger.info|logger.info>("Congresso: fetching Legislatura")
return fetch_json(LEGISLATURA_URL, timeout=60.0)
tasks/congresso/deputados.py
- Depends on Legislatura; save to disk or return JSON. Below: save to disk.
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import save_json
# Template; build the actual URL from legislatura fields
def deputados_url(legislatura: dict) -> str:
# e.g., f"<https://api.congresso.example/deputados?legislatura={legislatura['id']}>"
return "<https://api.congresso.example/deputados>"
@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def fetch_deputados(legislatura: dict, out_dir: str = "data/congresso") -> str:
logger = get_run_logger()
url = deputados_url(legislatura)
dest = Path(out_dir) / "deputados.json"
<http://logger.info|logger.info>(f"Congresso: fetching Deputados from {url} -> {dest}")
return save_json(url, dest, timeout=60.0)
tasks/congresso/frentes.py
- Also depends on Legislatura; run in parallel with Deputados
```
from pathlib import Path
from prefect import task, get_run_logger
from utils.io import save_json
# Template; build the actual URL from legislatura fieldsMarvin
10/21/2025, 1:44 AMflows/pipeline.py
- Orchestrate everything; TSE downloads run in parallel; Legislatura first, then Deputados and Frentes in parallel
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from tasks.tse import TSE_ENDPOINTS, download_tse_zip
from tasks.congresso.legislatura import fetch_legislatura
from tasks.congresso.deputados import fetch_deputados
from tasks.congresso.frentes import fetch_frentes
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True)
def pipeline():
logger = get_run_logger()
logger.info("Starting pipeline")
# TSE: fan out ~30 endpoints in parallel
tse_fs = [download_tse_zip.submit(url) for url in TSE_ENDPOINTS]
# Congresso: fetch Legislatura first (dependency)
legislatura = fetch_legislatura()
# Then Deputados and Frentes in parallel (both depend on Legislatura)
deputados_f = fetch_deputados.submit(legislatura)
frentes_f = fetch_frentes.submit(legislatura)
# Gather results
tse_paths = [f.result() for f in tse_fs]
deputados_path = deputados_f.result()
frentes_path = frentes_f.result()
logger.info(f"TSE files: {len(tse_paths)}")
logger.info(f"Deputados -> {deputados_path}")
logger.info(f"Frentes -> {frentes_path}")
return {
"tse": tse_paths,
"congresso_deputados": deputados_path,
"congresso_frentes": frentes_path,
}
if name == "__main__":
pipeline()
```
Notes and best practices
- Keep big files streamed to disk; return file paths from tasks.
- Keep small JSON in-memory only when you need it downstream; otherwise save to disk (as above).
- Add retries/timeouts to all network tasks (already shown).
- Control concurrency with ThreadPoolTaskRunner(max_workers=...). Even with âno request limitâ, a sane cap (e.g., 10â20) prevents local resource exhaustion.
- Keep URLs and per-API assembly logic near each task module (as shown with deputados_url/frentes_url) so dependency wiring stays obvious.
- If any endpoints need auth, load credentials from Secret blocks inside the task.
If you drop in your real URLs/parameterization (especially how Legislatura shapes Deputados/Frentes URLs), I can fill those helper functions precisely.Gustavo Fonseca
10/22/2025, 12:16 AMGustavo Fonseca
10/22/2025, 12:16 AMMarvin
10/22/2025, 12:16 AMMarvin
10/22/2025, 12:21 AMonly) and use native Python filtering so a new run can execute just a subset.
- Combine with task retries for transient failures.
from datetime import timedelta
from prefect import flow, task
from prefect.cache_policies import CachePolicy
@task(
retries=2,
retry_delay_seconds=10,
cache_policy=CachePolicy(),
cache_expiration=timedelta(days=1),
)
def call_endpoint(name: str):
# call your endpoint here
...
@flow
def sync_all(endpoints: list[str], only: list[str] | None = None):
targets = [e for e in endpoints if not only or e in only]
# Either map or loop â both are fine
call_endpoint.map(targets)
- First run:
sync_all(endpoints=ALL_ENDPOINTS)
- Re-run only failing ones:
sync_all(endpoints=ALL_ENDPOINTS, only=["/users", "/orders"])
2) Use result caching to avoid re-running successful work
- With the CachePolicy above, when you re-run the flow with the same inputs, Prefect loads cached results for tasks that already succeeded and re-executes only the ones that need work.
- If you want to force re-execution of a task even if thereâs a cache, set refresh_cache=True on the task (in the decorator) temporarily, or adjust your cache key policy.
- Docs: Result caching
3) Subflows per unit of work
- Wrap each endpoint in its own subflow, then call just the ones you want in a subsequent run.
from prefect import flow
@flow
def sync_one(endpoint: str):
call_endpoint(endpoint)
@flow
def sync_all(endpoints: list[str], only: list[str] | None = None):
for e in endpoints:
if not only or e in only:
sync_one(e)
4) Retries for transient errors
- Configure retries on tasks so transient issues are automatically retried during the same run.
- Docs: Task retries
Do I need a deployment?
- No. You can just call the flow function with the only parameter for ad-hoc re-runs.
- Deployments are optional but handy if you want to trigger these filtered re-runs from the UI/CLI on demand or on a schedule. If you already have a deployment, you can pass parameters via CLI:
prefect deployment run my-flow/my-deployment -p only='["/users","/orders"]' --watch
- Deployments overview: Deployments
UI note
- Thereâs no one-click âretry this single taskâ after a flow run completes. The idiom is to start a new run that targets the subset of work you want, using parameters and caching to avoid redoing everything.
If you can share how youâre structuring the endpoints today (e.g., a list you map over vs. individual tasks), I can tailor the pattern and code to your setup.Gustavo Fonseca
10/22/2025, 12:25 AMGustavo Fonseca
10/22/2025, 12:26 AMMarvin
10/22/2025, 12:26 AMMarvin
10/22/2025, 12:30 AM@task decorator:
- name= sets the static task name (the node label)
- task_run_name= sets the dynamic run name for each invocation (including mapped runs)
Examples
1) Static task name + dynamic run name from parameters
from prefect import task, flow
@task(
name="Download TSE endpoint",
task_run_name="Download {endpoint}" # uses the task parameter names
)
def download_tse_endpoint(endpoint: str):
...
@flow
def sync(endpoints: list[str]):
# Each mapped run is named "Download <endpoint>"
download_tse_endpoint.map(endpoints)
2) Set/override per call with with_options
download_tse_endpoint.with_options(
task_run_name="Download {endpoint}"
).map(endpoints)
3) Use a callable for complex naming
def run_name(params: dict) -> str:
ep = params.get("endpoint", "unknown")
return f"Download {ep}"
@task(task_run_name=run_name)
def download_tse_endpoint(endpoint: str):
...
Notes
- The placeholders in task_run_name refer to the taskâs parameter names (top-level fields).
- Prefect may append a short suffix for uniqueness (e.g., -252). You canât remove it, but putting your informative part first makes names clear in the UI.
- You can also set flow run names similarly: @flow(flow_run_name="Sync {date}").
References (implementation details)
- resolve_custom_task_run_name: source
- Task engine setting custom names: source
- Task decorator definition: source
If you share your current task signature (parameter names), I can suggest an exact task_run_name string for your case.Gustavo Fonseca
10/22/2025, 12:52 AMMarvin
10/22/2025, 12:52 AMMarvin
10/22/2025, 12:53 AMtask_run_name to format the run name from your name parameter so each run shows the endpoint key in the UI.
Option A: set it on the task decorator
from pathlib import Path
from prefect import task, get_run_logger
@task(
name="Fetch TSE endpoint", # static node label
task_run_name="fetch_tse_{name}", # dynamic run name per call
retries=5, retry_delay_seconds=5, timeout_seconds=120, log_prints=True
)
def fetch_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
dest = Path(out_dir) / f"{name}.csv"
<http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
return download_stream(url, dest)
Then map over your dict so name and url align:
from prefect import flow, unmapped
@flow
def sync_all():
names, urls = zip(*TSE_ENDPOINTS.items()) # preserves pairing
fetch_tse.map(name=names, url=urls, out_dir=unmapped("data/tse"))
Option B: set it at call-site with with_options (no change to decorator)
from prefect import unmapped
names, urls = zip(*TSE_ENDPOINTS.items())
fetch_tse.with_options(task_run_name="fetch_tse_{name}").map(
name=names, url=urls, out_dir=unmapped("data/tse")
)
Notes
- The placeholders in task_run_name must match your taskâs parameter names (name, url, etc.). Top-level keys only.
- Prefect may append a short suffix for uniqueness (e.g., -252), which is expected.
- You can also use a callable for complex formatting:
def rn(params: dict) -> str:
return f"fetch_tse_{params['name']}"
fetch_tse = fetch_tse.with_options(task_run_name=rn)
Relevant internals if youâre curious:
- Task decorator signature (supports task_run_name): source
- Name resolution: resolve_custom_task_run_nameGustavo Fonseca
10/22/2025, 2:04 AMMarvin
10/22/2025, 2:04 AMMarvin
10/22/2025, 2:05 AMpipeline in flows/pipeline.py) and reference that entrypoint with the CLI.
Suggested setup
- Keep main.py for local dev runs only (your if __name__ == "__main__": pipeline() is fine). The deployment wonât use `main.py`; it imports your flow directly from flows/pipeline.py.
Create a deployment from project root
# From the root of your repo (where ./flows/pipeline.py exists)
prefect deploy ./flows/pipeline.py:pipeline -n pipeline-dev
Start a worker to pick up runs
prefect worker start -p <your-work-pool>
Trigger the deployment
# Run now and watch logs
prefect deployment run pipeline/pipeline-dev --watch
# Pass parameters if needed (example):
prefect deployment run pipeline/pipeline-dev -p date_today='"2025-10-21"'
Notes
- The ENTRYPOINT format is path/to/file.py:flow_function_name (so: ./flows/pipeline.py:pipeline).
- You can create multiple deployments for the same flow (e.g., dev/prod, different work pools, schedules).
- If you ever want to define a deployment in code (e.g., from a remote repo), use the Prefect 3.x pattern:
from prefect import flow
flow.from_source(
"<https://github.com/owner/repo>",
entrypoint="flows/pipeline.py:pipeline",
).deploy(name="pipeline-dev", work_pool_name="<pool-name>")
Docs: Deployments in Prefect 3.x
If you share your work pool name and how you plan to schedule it, I can give you an exact prefect deploy command tailored to your setup.Gustavo Fonseca
10/22/2025, 2:07 AMMarvin
10/22/2025, 2:07 AMMarvin
10/22/2025, 2:08 AMpipeline flow in Prefect 3.x.
Assumptions:
- Your flow is defined at ./flows/pipeline.py as @flow def pipeline(...).
- You want to run it on a local Process worker (default/simple option).
Step 1) Create a work pool (once)
prefect work-pool create "process-pool" --type process
Step 2) Create the deployment from your project root
- The entrypoint format is path/to/file.py:flow_function_name
prefect deploy ./flows/pipeline.py:pipeline -n pipeline-dev
The CLI will prompt you for a work pool if not provided; choose process-pool.
Step 3) Start a worker to pick up runs
prefect worker start -p process-pool
Step 4) Run the deployment (and watch logs)
prefect deployment run pipeline/pipeline-dev --watch
Pass parameters (e.g., your date_today flow parameter) if needed:
prefect deployment run pipeline/pipeline-dev -p date_today='"2025-10-22"' --watch
Notes:
- You donât deploy `main.py`; deployments target the flow entrypoint directly (./flows/pipeline.py:pipeline). Keep main.py for local dev runs if you like.
- Your ThreadPoolTaskRunner in the flow is fine; it will be honored when the flow runs from a deployment.
- You can add a schedule later with --cron, --interval, etc., or via the UI.
Docs:
- Deployments: Prefect 3.x deployments
- Work pools and workers: Work pools and workersGustavo Fonseca
10/22/2025, 2:24 AMMarvin
10/22/2025, 2:24 AMMarvin
10/22/2025, 2:28 AMfrom flows.pipeline import pipeline
if __name__ == "__main__":
# Creates a deployment and starts a local runner that polls for runs
pipeline.serve(
name="deploy-1",
# optional: schedule it
# interval=3600, # run every hour
# or cron="0 7 * * *",
)
Then start it:
uv run src/main.py
Notes:
- serve() blocks and keeps running. It registers the deployment and polls for scheduled/manual runs.
- To run it immediately without a schedule, trigger it from another terminal or the UI:
prefect deployment run pipeline/deploy-1 --watch
- No worker/work pool required for serve; the process running serve executes the runs.
- By default, the deployment pauses on shutdown; if you want it to stay active after the process ends, set pause_on_shutdown=False.
Option B: Classic deployment + worker
- Create a work pool and deployment, then start a worker:
prefect work-pool create process-pool --type process
prefect deploy ./flows/pipeline.py:pipeline -n deploy-1
prefect worker start -p process-pool
prefect deployment run pipeline/deploy-1 --watch
Common gotchas
- Calling pipeline() in main.py executes the flow directly and bypasses deployments. Use pipeline.serve(...) (Option A) or create a deployment via CLI (Option B).
- If you put if __name__ == "__main__": pipeline.serve(...) in pipeline.py, you must run that file directly for it to execute; running main.py wonât trigger pipeline.pyâs main block.
- If serve is running but ânothing happens,â you likely havenât scheduled or triggered a run yet. Use a schedule in serve() or prefect deployment run ....
Docs
- Serve flows (embedded runner): Run flows in local processes
- Create deployments: Create deployments
If you tell me which option you prefer (serve vs worker), Iâll tailor the exact commands and file changes for your setup.Gustavo Fonseca
10/22/2025, 11:23 PMMarvin
10/22/2025, 11:23 PMMarvin
10/22/2025, 11:24 PMINPUTS constant or a CachePolicy() instance)
- Use the mapped object in your @task(cache_policy=...)
Example
TOML (e.g., config.toml)
[tse]
retries = 5
retry_delay = 5
timeout = 120
cache_policy = "INPUTS" # or "NONE", "DEFAULT", "FLOW_PARAMETERS", "TASK_SOURCE", "RUN_ID"
cache_expiration_seconds = 86400 # optional, if you also set cache_expiration
Config loader mapping
from datetime import timedelta
from prefect.cache_policies import (
CachePolicy,
INPUTS,
NONE, # aka NO_CACHE in some contexts
DEFAULT,
TASK_SOURCE,
FLOW_PARAMETERS,
RUN_ID,
)
_CACHE_POLICY_MAP = {
"INPUTS": INPUTS,
"NONE": NONE,
"NO_CACHE": NONE,
"DEFAULT": DEFAULT,
"TASK_SOURCE": TASK_SOURCE,
"FLOW_PARAMETERS": FLOW_PARAMETERS,
"RUN_ID": RUN_ID,
}
def coerce_cache_policy(policy_name: str | None):
if policy_name is None:
return None # means "NotSet" if you donât pass cache_policy
if isinstance(policy_name, str):
key = policy_name.strip().upper()
if key in _CACHE_POLICY_MAP:
return _CACHE_POLICY_MAP[key]
raise ValueError(f"Unknown cache_policy '{policy_name}'. "
f"Expected one of: {', '.join(_CACHE_POLICY_MAP)}")
# You could also allow pre-built CachePolicy objects here if needed
raise TypeError(f"cache_policy must be a string or None, got {type(policy_name)}")
Use in your settings/model
class TSEConfig(BaseModel):
RETRIES: int
RETRY_DELAY: int
TIMEOUT: int
CACHE_POLICY: str | None = None
CACHE_EXPIRATION_SECONDS: int | None = None
@property
def cache_policy_obj(self):
return coerce_cache_policy(self.CACHE_POLICY)
@property
def cache_expiration_td(self):
return timedelta(seconds=self.CACHE_EXPIRATION_SECONDS) if self.CACHE_EXPIRATION_SECONDS else None
Task usage
APP_SETTINGS = loader.load_config()
@task(
retries=APP_SETTINGS.TSE.RETRIES,
retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
log_prints=True,
cache_policy=APP_SETTINGS.TSE.cache_policy_obj,
cache_expiration=APP_SETTINGS.TSE.cache_expiration_td,
task_run_name="fetch_tse_{name}",
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
dest = Path(out_dir) / f"{name}.csv"
<http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
return download_stream(url, dest)
Notes
- task(cache_policy=...) expects a CachePolicy or one of the provided constants (e.g., INPUTS). Mapping strings from TOML to those objects is the safest approach.
- If you need a fully custom CachePolicy (e.g., specific key_storage or isolation_level), put the parameters in TOML and build a CachePolicy(...) in your loader when cache_policy = "CUSTOM".
- Caching docs (for more detail): Result caching in Prefect 3.x
If you share your loaderâs structure, I can fold the mapping logic directly into it.Gustavo Fonseca
10/22/2025, 11:36 PMMarvin
10/22/2025, 11:36 PMMarvin
10/22/2025, 11:37 PM"data/tse/candidatos_2018.csv" or "<s3://bucket/key.csv>"), not the bytes.
- Keep cache_policy=INPUTS so identical inputs skip re-downloads. Consider adding cache_expiration to bound staleness.
- Ensure results are persisted so the cache can be reused across runs. Either:
- set persist_result=True on the task, or
- enable globally with PREFECT_RESULTS_PERSIST_BY_DEFAULT=true.
- Make the file location durable:
- If you run everything on one machine (serve or a single worker), a local path is fine.
- If you run on multiple machines/workers, store the file in shared storage (e.g., S3/GCS) and return that URI so downstream steps can access it anywhere.
Practical pattern
from datetime import timedelta
from pathlib import Path
from prefect import task, get_run_logger
from prefect.cache_policies import INPUTS
@task(
retries=APP_SETTINGS.TSE.RETRIES,
retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
log_prints=True,
persist_result=True, # make cache reusable across runs
cache_policy=INPUTS, # cache by inputs (name, url, out_dir)
cache_expiration=timedelta(days=30), # optional TTL for freshness
task_run_name="fetch_tse_{name}",
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
dest = Path(out_dir) / f"{name}.csv"
# Local idempotency guard: if file already exists, skip network download
if dest.exists() and dest.stat().st_size > 0:
<http://logger.info|logger.info>(f"Arquivo jĂĄ existe, reutilizando: {dest}")
return str(dest)
<http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
return download_stream(url, dest)
Optional: smarter cache busting
- If the remote file can change at the same URL, you can include a version signal (e.g., ETag or Last-Modified) in the cache key via cache_key_fn. That way a changed upstream invalidates the cache.
Trade-offs to be aware of
- A cached success does not verify the file still exists on disk. If the file gets deleted, the cached run will still âsucceed.â The existence check above mitigates this.
- If you use multiple workers, a local path returned from one machine wonât exist on another. Prefer object storage for the actual file and return the object URI.
Docs
- Result caching
- Result persistence and storage
If you share whether youâre running on a single machine or multiple workers (and whether you have S3/GCS), I can tailor the storage/result settings exactly.Gustavo Fonseca
10/22/2025, 11:56 PMMarvin
10/22/2025, 11:56 PMMarvin
10/22/2025, 11:57 PMname parameter so each run shows the key in the UI.
Add task_run_name="extract_tse_{name}" to your task decorator:
from pathlib import Path
from prefect import task, get_run_logger
from datetime import timedelta
from <http://utils.io|utils.io> import download_stream
from config.loader import load_config, CACHE_POLICY_MAP
APP_SETTINGS = load_config()
@task(
retries=APP_SETTINGS.TSE.RETRIES,
retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
log_prints=True,
cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION),
task_run_name="extract_tse_{name}", # <- dynamic run name per invocation
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
dest = Path(out_dir) / f"{name}.csv"
<http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
return download_stream(url, dest)
Then, when you call/submit the task like:
# inside your flow
for key, url in TSE_ENDPOINTS.items():
extract_tse.submit(name=key, url=url)
each task run will appear as extract_tse_<key>-<suffix>, e.g., extract_tse_candidatos_2018-34b.
Notes
- The placeholders in task_run_name must match the taskâs parameter names (name, url, ...). Here {name} is the dict key youâre passing.
- Prefect appends a short suffix (e.g., -34b) for uniqueness â thatâs expected.
- If you prefer to set this at call time (without changing the decorator), you can do:
extract_tse.with_options(task_run_name="extract_tse_{name}").submit(name=key, url=url)
For complex formatting/truncation, you can use a callable:
def rn(params: dict) -> str:
raw = params["name"]
short = raw[:60]
return f"extract_tse_{short}"
extract_tse = extract_tse.with_options(task_run_name=rn)Gustavo Fonseca
10/23/2025, 12:05 AMMarvin
10/23/2025, 12:05 AMMarvin
10/23/2025, 12:06 AMfrom datetime import date, timedelta
import re
CURRENT_YEAR = date.today().year
def extract_year_from_name(name: str) -> int | None:
# e.g., "candidatos_2018", "redes_sociais_2024_SP"
m = re.search(r"\b(20\d{2})\b", name)
return int(m.group(1)) if m else None
def cache_opts_for_year(year: int | None):
if year is None:
# no year in name -> default behavior
return {}
if year < CURRENT_YEAR:
# cache past years
return {
"cache_policy": CACHE_POLICY_MAP["INPUTS"],
"cache_expiration": timedelta(days=365*5),
"persist_result": True,
}
else:
# do not cache current year
return {
"cache_policy": CACHE_POLICY_MAP["NO_CACHE"],
# OR: "refresh_cache": True,
}
# inside your flow
for name, url in TSE_ENDPOINTS.items():
year = extract_year_from_name(name)
opts = cache_opts_for_year(year)
extract_tse.with_options(**opts).submit(name=name, url=url)
Pattern 2: Pass year as an explicit parameter
If you already know the year, add it to the task signature to avoid parsing from the name:
@task(
# defaults here are fine â weâll override per call
cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION),
task_run_name="extract_tse_{name}",
)
def extract_tse(name: str, url: str, year: int | None = None, out_dir: str = "data/tse") -> str:
...
Then:
for name, url in TSE_ENDPOINTS.items():
year = extract_year_from_name(name)
opts = cache_opts_for_year(year)
extract_tse.with_options(**opts).submit(name=name, url=url, year=year)
Notes
- NO_CACHE vs refresh_cache:
- NO_CACHE cleanly disables caching for that call.
- refresh_cache=True forces bypass even if a cache exists; useful if you want to âalways re-downloadâ for current year.
- Keep returning a small value (the file path/URI), not the file bytes.
- Add a local existence guard to skip re-downloads regardless of Prefect cache:
- if the file already exists and is non-empty, just return the path.
- If you run on multiple machines, store files in shared/object storage and return the URI.
Config suggestion (TOML)
- You can keep your global defaults and still override per-call:
[tse]
cache_policy = "INPUTS" # default for past years
cache_expiration = 3650 # 10 years
no_cache_policy = "NO_CACHE" # used for current year
Then in loader:
def cache_opts_for_year(year):
if year is None or year < CURRENT_YEAR:
return {
"cache_policy": CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
"cache_expiration": timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION),
"persist_result": True,
}
else:
return {
"cache_policy": CACHE_POLICY_MAP[APP_SETTINGS.TSE.NO_CACHE_POLICY],
}
Docs: result caching in Prefect 3.x
- Result caching
If you share your naming scheme for all endpoints, I can provide an exact extract_year_from_name tailored to your keys.Gustavo Fonseca
10/23/2025, 12:33 AMMarvin
10/23/2025, 12:33 AMMarvin
10/23/2025, 12:38 AMAPP_SETTINGS = load_config() at import time, changes in TOML wonât be picked up until you restart the serve process. Pattern: load config at run time (inside the flow) instead of at import, or restart serve after changing config.
- If youâre using deploy + worker: each flow run starts in a fresh process, so if your code reads the TOML at run time, new runs will pick up the change without redeploy/restart. If you bake config into a Docker image, youâll need to rebuild or mount the config.
Best-practice patterns to avoid restarts/redeploys
1) Load config at runtime, not at import
- Move APP_SETTINGS = load_config() into your flow function (or a small helper thatâs called at the start of each run). This ensures each run reads the latest config.
- Avoid baking config into decorators. In your code, cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY] is evaluated at import time and âfreezesâ that value. Prefer to set task options per call with with_options(...).
2) Make cache policy dynamic per run
- Remove cache-related arguments from the decorator and set them at submit time, using the config you load at the start of the flow.
from datetime import timedelta
from prefect import task, flow
from prefect.cache_policies import INPUTS, NO_CACHE
@task(
retries=5,
retry_delay_seconds=5,
timeout_seconds=120,
log_prints=True,
task_run_name="extract_tse_{name}",
)
def extract_tse(name: str, url: str, out_dir: str="data/tse") -> str:
...
@flow
def pipeline(...):
settings = load_config() # read TOML per run
cache_policy = CACHE_POLICY_MAP[settings.TSE.CACHE_POLICY] # e.g. "NO_CACHE" -> NO_CACHE
cache_exp = timedelta(days=settings.TSE.CACHE_EXPIRATION)
for name, url in TSE_ENDPOINTS.items():
extract_tse.with_options(
cache_policy=cache_policy,
cache_expiration=cache_exp,
persist_result=True, # if you want cross-run reuse
).submit(name=name, url=url)
- Now you can change the TOML and the next run will use the new values, even with a running worker. For flow.serve, this also works as long as you read config in the flow, not at import.
3) Use flow parameters to override per run
- Expose caching knobs as flow parameters and set them when you trigger a run. No redeploy needed for perârun overrides:
@flow
def pipeline(cache_policy: str = "NO_CACHE", cache_expiration_days: int = 90):
cache_policy_obj = CACHE_POLICY_MAP[cache_policy]
cache_exp = timedelta(days=cache_expiration_days)
...
Trigger with:
prefect deployment run pipeline/deploy-1 -p cache_policy='"INPUTS"' -p cache_expiration_days=365 --watch
4) Use Prefect Variables or job variables for runtime config
- Prefect Variables: set in UI or via API, then read at runtime. Change anytime; no redeploy.
from prefect.variables import Variable
policy_name = Variable.get("TSE_CACHE_POLICY", default="NO_CACHE")
exp_days = int(Variable.get("TSE_CACHE_EXPIRATION_DAYS", default="90"))
Docs: Prefect Variables
- Job variables: pass env vars per run that your loader reads:
prefect deployment run pipeline/deploy-1 \
-jv TSE_CACHE_POLICY='"INPUTS"' \
-jv TSE_CACHE_EXPIRATION_DAYS=365 \
--watch
Docs: Customize job variables
When you DO need a restart or redeploy
- serve(): restart the process if config is loaded at import time or if you changed code. If you move config reads inside the flow, you donât need to restart for TOML changes.Marvin
10/23/2025, 12:38 AM@task(cache_policy=...) in code), thatâs a code change. For worker-based deployments, new runs pick it up automatically if youâre running from source. For Docker/K8s images or pinned-source deployments, rebuild/redeploy.
- If your deployment pins parameters or a specific image/revision, changing those defaults requires a new deployment version.
In short: refactor to read config at run time and set task options with with_options(...) or flow parameters. Then you wonât need to restart the deployment to apply caching changes.Gustavo Fonseca
10/23/2025, 2:16 AMprocess_limit
Maximum number of processes a runner will execute in parallel.
Type: `integer`
Default: `5`
TOML dotted key path: `runner.process_limit`
Supported environment variables: `PREFECT_RUNNER_PROCESS_LIMIT`
Setting this on my prefect.toml will make my task runner in my flow:
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=16), # type: ignore
log_prints=True
)
follow whats defined in the toml?Marvin
10/23/2025, 2:16 AMMarvin
10/23/2025, 2:16 AMrunner.process_limit does not change your flowâs ThreadPoolTaskRunner(max_workers=16).
What each knob controls
- runner.process_limit (setting or PREFECT_RUNNER_PROCESS_LIMIT): caps how many flow runs the embedded Runner will execute in parallel when you use flow.serve(...). Itâs about concurrent flow runs, not tasks within a flow. Docs: process_limit
- `ThreadPoolTaskRunner(max_workers=...)`: controls task-level concurrency inside a single flow run. This is set by your code and is not overridden by runner.process_limit.
Implications
- If youâre using flow.serve, you can have:
- up to runner.process_limit concurrent flow runs in that one serve process
- and inside each flow run, up to max_workers concurrent tasks (your 16)
- If youâre using worker-based deployments (prefect deploy + worker), runner.process_limit does not apply. Workers typically execute one flow run per process; scale by running more workers or using work pool concurrency limits.
If you want to make max_workers configurable via TOML, read it in your own config and pass it to the task runner:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
APP_SETTINGS = load_config()
@flow(
task_runner=ThreadPoolTaskRunner(
max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS # from your TOML
),
log_prints=True,
)
def pipeline(...):
...
Useful links
- Runner (serve) behavior: Run flows in local processes
- process_limit setting: Runner process_limitGustavo Fonseca
10/23/2025, 2:26 AMGustavo Fonseca
10/23/2025, 2:26 AMMarvin
10/23/2025, 2:26 AMMarvin
10/23/2025, 2:31 AMrefresh_cache=True on the task (decorator or via with_options) to bypass and overwrite any existing cache for that task.
- Example:
extract_tse.with_options(refresh_cache=True).submit(name=..., url=...)
- Globally for a run:
- Set env var PREFECT_TASKS_REFRESH_CACHE=true for the process running your flow. All tasks will refresh their cache.
Disable caching
- Per-task call:
- Use cache_policy=NO_CACHE via with_options(...) for a specific run:
from prefect.cache_policies import NO_CACHE
extract_tse.with_options(cache_policy=NO_CACHE).submit(...)
- Globally:
- Set PREFECT_TASKS_DEFAULT_NO_CACHE=true or PREFECT_TASKS_DISABLE_CACHING=true in the environment to disable caching across tasks.
Expire or invalidate by configuration
- cache_expiration: Reduce TTL so caches expire sooner:
extract_tse.with_options(cache_expiration=timedelta(days=1)).submit(...)
- Change inputs or code: With the DEFAULT/INPUTS policy, changing task inputs or the taskâs source code will produce a new cache key automatically.
Delete stored cache/results
- Local default result store is typically at ~/.prefect/storage/ (can be changed via PREFECT_LOCAL_STORAGE_PATH). Deleting entries here removes stored results and cache records. This is the blunt instrumentâsafe but removes everything.
- If you configured a custom CachePolicy(key_storage=...) or remote result storage (S3/GCS), clear the corresponding keys/objects there to invalidate caches.
- Thereâs no built-in CLI like prefect cache clear. If needed, you can script deletion.
Operational tips
- If youâre using flow.serve, remember itâs a long-running process. Environment variable changes apply when you restart that process.
- For worker-based deployments, each run is a fresh process. Env vars and code changes take effect on the next run.
- Prefer targeted invalidation (refresh_cache or NO_CACHE) over deleting the entire store unless you truly want to wipe all caches.
Docs for reference
- Caching concepts: Caching
- Advanced caching: Advanced caching
- Results and storage: Results
If you tell me whether youâre running locally with serve or via workers, and where your results are stored (local vs S3/GCS), I can give you exact commands/steps to clear them.Gustavo Fonseca
10/24/2025, 1:40 AMMarvin
10/24/2025, 1:40 AMMarvin
10/24/2025, 1:43 AMget_run_logger() if youâre happy using print() and youâve enabled print-capture correctly. With log_prints on, print() output is sent to Prefect logs.
Key points:
- Your current code uses <http://logger.info|logger.info>(...). That does require get_run_logger(). The log_prints setting does not create a logger; it only captures `print()`/stdout/stderr.
- If you want to drop get_run_logger(), replace the log line with print(...) and keep log_prints enabled.
Fix your prefect.toml (TOML booleans are unquoted and lowercase):
[logging]
level = "INFO"
log_prints = true
If you leave it as log_prints = "True", it wonât enable print capture.
When to prefer each:
- Use get_run_logger() when you want structured, Prefect-aware logs (correct run/task context, log levels, works even if log_prints is off).
- Use print() if you just want simple messages and have log_prints = true.
Per-task override is also available:
@task(log_prints=True)
def my_task():
print("this will be captured even if global log_prints is false")
More details:
- Add logging to flows/tasks: Add logging
- Advanced logging (levels, extra loggers for thirdâparty libraries): Advanced logging customization
- Settings reference (including logging.log_prints): Settings referenceGustavo Fonseca
10/24/2025, 2:06 AMMarvin
10/24/2025, 2:06 AMMarvin
10/24/2025, 2:13 AMfetch_json_many_async is async, you must either await it or run it via a sync bridge. You have two good options:
Option A â Make the task async and await inside
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config
APP_SETTINGS = load_config()
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
jsons = await fetch_json_many_async(
urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
)
dest = Path(out_dir)
return save_ndjson(cast(list[dict], jsons), dest)
How to call it:
- In an async flow, you can either:
- Run concurrently: fut = extract_frentes_membros.submit(...); result = fut.result()
- Or directly await: result = await extract_frentes_membros(...)
Option B â Keep the task sync and bridge the coroutine
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.utilities.asyncutils import run_coro_as_sync
from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config
APP_SETTINGS = load_config()
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
jsons = run_coro_as_sync(
fetch_json_many_async(
urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
)
)
dest = Path(out_dir)
return save_ndjson(cast(list[dict], jsons), dest)
Notes:
- Donât call the async function without awaiting/bridging; it will just return a coroutine and not execute.
- You do not need to set anything special when calling the task from the flow. Use .submit() (returns a PrefectFuture; call .result() to get the value) or call the task directly depending on whether your flow is async and whether you want concurrency.
References:
- Tasks and async patterns: Tasks concept
- Running work concurrently with submit/result: Run work concurrently
- Async utilities (run_coro_as_sync): Async utilities APIGustavo Fonseca
10/24/2025, 2:15 AMMarvin
10/24/2025, 2:15 AMMarvin
10/24/2025, 2:19 AMfrom pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.utilities.asyncutils import run_coro_as_sync
from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config
APP_SETTINGS = load_config()
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
# Bridge the async coroutine into this sync task
jsons = run_coro_as_sync(
fetch_json_many_async(
urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
)
)
dest = Path(out_dir)
return save_ndjson(cast(list[dict], jsons), dest)
And hereâs how youâd call it from your existing pipeline (unchanged call pattern):
# ... your imports and flow definition
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore
log_prints=True,
)
def pipeline(
date: date = datetime.now().date(),
refresh_cache: bool = False,
):
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
# TSE in parallel
tse_fs = [
# NOTE: pass function parameters to .submit; use with_options() only for task OPTIONS
extract_tse.submit(name, url, refresh_cache=refresh_cache)
for name, url in TSE_ENDPOINTS.items()
]
# CONGRESSO
legislatura = extract_legislatura(date)
deputados_f = extract_deputados.submit(legislatura)
frentes_f = extract_frentes.submit(legislatura)
# If you have frentes_ids available, submit the async-backed task as usual
# frentes_ids = ... # derive this from your data
# frentes_membros_f = extract_frentes_membros.submit(frentes_ids)
# Gather results
tse_paths = [f.result() for f in tse_fs]
deputados_path = deputados_f.result()
frentes_path = frentes_f.result()
# frentes_membros_path = frentes_membros_f.result()
<http://logger.info|logger.info>(f"Arquivos TSE: {len(tse_paths)}")
<http://logger.info|logger.info>(f"Deputados -> {deputados_path}")
<http://logger.info|logger.info>(f"Frentes -> {frentes_path}")
# <http://logger.info|logger.info>(f"Frentes Membros -> {frentes_membros_path}")
return {
"tse": tse_paths,
"congresso_deputados": deputados_path,
"congresso_frentes": frentes_path,
# "congresso_frentes_membros": frentes_membros_path,
}
Why this works:
- The task remains synchronous (so your sync flow + .submit/.result pattern is unchanged).
- The async I/O is handled inside the task by run_coro_as_sync, which safely runs the coroutine.
Two small tips:
- Use .submit(..., refresh_cache=refresh_cache) to pass function parameters. with_options() is for task options (name, retries, tags, timeout, etc.), not function args.
- If you decide to convert the flow to async later, you can also make the task async and await internally; then call it with .submit() or await task(...) from the async flow.
Docs:
- Running work concurrently: Run work concurrentlyMarvin
10/24/2025, 2:19 AMGustavo Fonseca
10/24/2025, 2:25 AMGustavo Fonseca
10/24/2025, 2:25 AMMarvin
10/24/2025, 2:25 AMMarvin
10/24/2025, 2:27 AMfrom prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore
log_prints=True,
)
def pipeline(date: date = datetime.now().date(), refresh_cache: bool = False):
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
# TSE in parallel
tse_fs = [
extract_tse.submit(name, url, refresh_cache=refresh_cache)
for name, url in TSE_ENDPOINTS.items()
]
# CONGRESSO
legislatura = extract_legislatura(date)
deputados_f = extract_deputados.submit(legislatura)
frentes_f = extract_frentes.submit(legislatura)
# If your task accepts the resolved frentes output, you can pass the future;
# Prefect will resolve it as an upstream dependency
frentes_membros_f = extract_frentes_membros.submit(frentes_f)
# Resolve everything at once, preserving structure
results = resolve_futures_to_results({
"tse": tse_fs,
"congresso_deputados": deputados_f,
"congresso_frentes": frentes_f,
"congresso_frentes_membros": frentes_membros_f,
})
<http://logger.info|logger.info>(f"Arquivos TSE: {len(results['tse'])}")
<http://logger.info|logger.info>(f"Deputados -> {results['congresso_deputados']}")
<http://logger.info|logger.info>(f"Frentes -> {results['congresso_frentes']}")
<http://logger.info|logger.info>(f"Frentes Membros -> {results['congresso_frentes_membros']}")
return results
Notes:
- Prefer .submit(...) for tasks you want to run concurrently. Passing a future as an argument is fine â Prefect wires the dependency and resolves it before the downstream task runs.
- resolve_futures_to_results works with nested structures, so you can add more futures without growing boilerplate.
Refs:
- Futures helpers: Run work concurrently
- API: prefect.futuresGustavo Fonseca
10/24/2025, 2:34 AMGustavo Fonseca
10/24/2025, 2:34 AMMarvin
10/24/2025, 2:34 AMMarvin
10/24/2025, 2:35 AMfrom pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.futures import PrefectFuture
@task(...)
def extract_frentes_membros(
frentes_ids: list[str] | PrefectFuture[list[str]],
out_dir: str | Path = "data/camara",
) -> str:
...
Then this is type-checker happy:
frentes_membros_f = extract_frentes_membros.submit(frentes_f)
2) Cast at the call site (minimal change)
from typing import Any, cast
frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
3) Ignore the warning inline (PyLance/Pyright)
frentes_membros_f = extract_frentes_membros.submit(frentes_f) # pyright: ignore[reportArgumentType]
And to reduce the repeated .result() calls, resolve everything at once with structure-preserving helpers:
from prefect.futures import resolve_futures_to_results
results = resolve_futures_to_results({
"tse": tse_fs,
"congresso_deputados": deputados_f,
"congresso_frentes": frentes_f,
"congresso_frentes_membros": frentes_membros_f,
})
References:
- Run work concurrently
- prefect.futures API
- Tasks conceptGustavo Fonseca
10/24/2025, 2:38 AMMarvin
10/24/2025, 2:38 AMMarvin
10/24/2025, 2:39 AMPrefectFuture as an argument to another task. At runtime, Prefect resolves the future before running the downstream task â so your call extract_frentes_membros.submit(frentes_f) is functionally correct.
- The error you see is just static type checking (PyLance) complaining because your task parameter is annotated as list[str] while youâre passing a PrefectFuture[list[str]].
Three ways to quiet the type checker
Pick one â the behavior at runtime is the same.
1) Allow the task to accept the future type in its signature
from prefect.futures import PrefectFuture
@task(...)
def extract_frentes_membros(
frentes_ids: list[str] | PrefectFuture[list[str]],
out_dir: str | Path = "data/camara",
) -> str:
...
2) Cast at the call site (keeps task signature âpureâ)
from typing import Any, cast
frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
3) Ignore the warning inline
frentes_membros_f = extract_frentes_membros.submit(frentes_f) # pyright: ignore[reportArgumentType]
Important fix: your async call inside the task
fetch_json_many_async is async; in a synchronous task you must bridge it. Use `run_coro_as_sync`:
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.utilities.asyncutils import run_coro_as_sync
from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config
APP_SETTINGS = load_config()
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
jsons = run_coro_as_sync(
fetch_json_many_async(
urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
)
)
dest = Path(out_dir)
return save_ndjson(cast(list[dict], jsons), dest)
Clean up your flow call sites
- Donât use with_options to pass function arguments. Use `.submit(..., refresh_cache=refresh_cache)`; with_options is for task options (name, retries, tags, etc.), not function args.
- To avoid repetitive .result() calls, resolve everything at once and keep your code DRY.
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results
@flow(
task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore
log_prints=True,
)
def pipeline(date: date = datetime.now().date(), refresh_cache: bool = False):
logger = get_run_logger()
<http://logger.info|logger.info>("Iniciando pipeline")
# TSE in parallel
tse_fs = [
extract_tse.submit(name, url, refresh_cache=refresh_cache)
for name, url in TSE_ENDPOINTS.items()
]
# CONGRESSO
legislatura = extract_legislatura(date)
deputados_f = extract_deputados.submit(legislatura)
frentes_f = extract_frentes.submit(legislatura)
# Passing a future to another task is fine; Prefect resolves it
frentes_membros_f = extract_frentes_membros.submit(frentes_f)
# Resolve everything at once (preserves structure)
results = resolve_futures_to_results({
"tse": tse_fs,
"congresso_deputados": deputados_f,
"congresso_frentes": frentes_f,
"congresso_frentes_membros": frentes_membros_f,
})
return resultsMarvin
10/24/2025, 2:39 AMrun_coro_as_sync): Async utilities API
- Tasks concept: Tasks conceptGustavo Fonseca
10/24/2025, 2:48 AMGustavo Fonseca
10/24/2025, 2:48 AMMarvin
10/24/2025, 2:48 AMMarvin
10/24/2025, 2:50 AMfrom pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config
APP_SETTINGS = load_config()
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
jsons = await fetch_json_many_async(
urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
)
dest = Path(out_dir)
return save_ndjson(cast(list[dict], jsons), dest)
How to call it:
- Keep your current sync flow and use concurrency:
- frentes_membros_f = extract_frentes_membros.submit(frentes_f) # pass the upstream future; Prefect wires dependencies and resolves it at runtime
- Later, get the value with .result() or use resolve_futures_to_results(...)
- Or convert the flow to async and await directly:
- frentes_ids = await extract_frentes(legislatura) then result = await extract_frentes_membros(frentes_ids)
- Or still use .submit() for concurrency and collect later with .result()
Docs:
- Add logging: Add logging
- Concurrency patterns: Run work concurrently
2) "PrefectFuture[list[str]] is not iterable" in frentes_membros_urls
Donât add PrefectFuture to the helperâs signature. Prefect resolves upstream futures before calling your task function, so inside the task you will receive a concrete list[str]. Keep your helper typed strictly:
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
If PyLance complains at the flow call site when you pass a future to the task, use one of these to quiet the type checker:
- Cast at call site:
from typing import Any, cast
frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
- Or ignore inline:
frentes_membros_f = extract_frentes_membros.submit(frentes_f) # pyright: ignore[reportArgumentType]
3) "No overloads for submit match" when calling extract_tse.submit(name, url, refresh_cache=refresh_cache)
This is a PyLance typing limitation with Prefectâs Task/submit wrappers, not a runtime problem. Two options:
- Cast the task to Any at the call site (simple and common with static checkers):
from typing import Any, cast
tse_fs = [
cast(Any, extract_tse).submit(name, url, refresh_cache=refresh_cache)
for name, url in TSE_ENDPOINTS.items()
]
- Or ensure your taskâs function signature matches exactly what youâre passing (names and types). If it does, the cast above is still the pragmatic fix for PyLance.
Bonus: resolving futures without repetitive .result()
from prefect.futures import resolve_futures_to_results
results = resolve_futures_to_results({
"tse": tse_fs,
"congresso_deputados": deputados_f,
"congresso_frentes": frentes_f,
"congresso_frentes_membros": frentes_membros_f,
})
References:Marvin
10/24/2025, 2:50 AMGustavo Fonseca
10/24/2025, 3:06 AMMarvin
10/24/2025, 3:06 AMMarvin
10/24/2025, 3:08 AMfrom typing import Any
from prefect import get_run_logger
from prefect.exceptions import MissingContextError
def _get_prefect_logger_or_none() -> Any | None:
try:
return get_run_logger()
except MissingContextError:
return None
async def fetch_json_many_async(
urls: list[str],
out_dir: str | Path | None = None,
concurrency: int = 10,
timeout: float = 30.0,
follow_pagination: bool = True,
logger: Any | None = None, # <- NEW
) -> list[str] | list[dict]:
"""
- Se out_dir for fornecido, salva cada JSON em um arquivo e retorna a lista de caminhos
- Caso contrĂĄrio, retorna a lista de dicionĂĄrios em memĂłria
"""
logger = logger or _get_prefect_logger_or_none()
def log(msg: str):
if logger:
<http://logger.info|logger.info>(msg)
else:
print(msg)
sem = asyncio.Semaphore(concurrency)
limits = httpx.Limits(max_connections=max(concurrency, 10))
timeout_cfg = httpx.Timeout(timeout)
ensure_dir(out_dir) if out_dir else None
processed_urls = set()
results: list[str] | list[dict] = []
async def one(u: str):
if u in processed_urls:
log(f"Skipping duplicate URL: {u}")
return []
processed_urls.add(u)
log(f"Downloading {u}")
async with sem:
async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client:
r = await client.get(u)
r.raise_for_status()
data = r.json()
if out_dir:
name = hashlib.sha1(u.encode()).hexdigest() + ".json"
path = Path(out_dir) / name
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
current_result = str(path)
log(f"Downloaded {u} -> {current_result}")
else:
current_result = data
log(f"Downloaded {u}")
additional_results = []
if follow_pagination and isinstance(data, dict) and "links" in data:
links = {link["rel"]: link["href"] for link in data.get("links", []) if isinstance(link, dict)}
if links.get("self") and links.get("last") and links["self"] != links["last"] and "next" in links:
next_url = links["next"]
log(f"Following pagination: {u} -> next {next_url}")
additional_results = await one(next_url)
if isinstance(additional_results, list):
return [current_result] + additional_results
else:
return [current_result]
tasks = [one(u) for u in urls]
nested_results = await asyncio.gather(*tasks)
for item in nested_results:
if isinstance(item, list):
results.extend(item)
else:
results.append(item)
log(f"Finished: {len(results)} items")
return results
Usage from your task:
@task(log_prints=False, ...) # log_prints not needed when using logger
def my_task(...):
logger = get_run_logger()
jsons = run_coro_as_sync(fetch_json_many_async(urls, out_dir=None, logger=logger))
Option B â Just use print and capture via log_prints
- If you prefer prints, turn on print capture:
- Globally in prefect.toml:
[logging]
level = "INFO"
log_prints = true
- Or per task: @task(log_prints=True)
- Then add print lines in the same spots:
print(f"Downloading {u}")
print(f"Downloaded {u} -> {current_result}")
print(f"Following pagination: {u} -> next {next_url}")
print(f"Finished: {len(results)} items")
Notes:Marvin
10/24/2025, 3:08 AMGustavo Fonseca
10/25/2025, 9:51 PMMarvin
10/25/2025, 9:51 PMMarvin
10/25/2025, 9:54 PMprefect.artifacts.create_link_artifact. It stores the link as markdown so it renders as a clickable link in the UI. The function returns the artifactâs UUID.
Example
from prefect import flow
from prefect.artifacts import create_link_artifact
@flow
def demo():
create_link_artifact(
link="<https://dash.mycompany.com/reports/123>",
link_text="Open KPI dashboard", # optional; defaults to the URL if omitted
key="kpi-dashboard-link", # optional; lowercase letters, numbers, dashes
description="Daily KPI dashboard for 2025-10-25",
)
demo()
How it differs from other artifact types
- LinkArtifact: renders a single clickable link.
- MarkdownArtifact: arbitrary markdown content.
- TableArtifact: tabular data that renders as a table.
- ImageArtifact: displays an image from a public URL.
- ProgressArtifact: shows progress (0â100) and is the only type you update in place.
Docs and API
- Concepts: Artifacts concept
- How-to guide: Create and use artifacts
- API reference: prefect.artifacts API
If you share where you want to link out to (e.g., S3 objects, BI dashboards, Snowflake worksheets), I can tailor an example.Gustavo Fonseca
10/25/2025, 10:51 PMMarvin
10/25/2025, 10:51 PMGustavo Fonseca
10/25/2025, 10:52 PMMarvin
10/25/2025, 10:52 PMMarvin
10/25/2025, 10:54 PMprogress_artifact_id is None or not a UUID returned by create_progress_artifact/acreate_progress_artifact, so your if progress_artifact_id block never runs.
- Created outside a Prefect run with no key: artifacts without a key only show on the associated flow/task runâs Artifacts tab. If thereâs no run context, it wonât be visible anywhere. Add a key to see it on the global Artifacts page.
- Checking the wrong UI page:
- Without a key: see Flow Run -> Artifacts tab
- With a key: see the global Artifacts page
- Wrong type or version mismatch: make sure youâre using a Progress artifact in Prefect 3.x and updating it with the correct function.
Known-good pattern
from prefect import flow
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact
@flow
async def main(urls):
# 1) Create the progress artifact and capture its UUID
progress_id = await acreate_progress_artifact(
progress=0,
description="Starting downloadsâŠ",
# Optional: add a key to also see it on the global Artifacts page
key="camara-download-progress"
)
# 2) Pass the UUID to your async helper
results = await fetch_json_many_async(
urls=urls,
concurrency=10,
timeout=30.0,
follow_pagination=True,
progress_artifact_id=progress_id,
)
# 3) Final update to 100%
await aupdate_progress_artifact(
artifact_id=progress_id,
progress=100,
description="Downloads complete"
)
Notes for your function
- Your if progress_artifact_id and len(urls) > 0: guard means no updates happen if the ID is falsy. Confirm youâre passing the UUID returned by create/acreate.
- In an async function, prefer aupdate_progress_artifact to avoid blocking, though using the sync version wonât prevent it from showing up.
- If this runs outside a flow/task, set a key on creation; otherwise it wonât be visible in the UI.
Docs
- Concepts: Artifacts concept
- How-to: Create and use artifacts
- API: prefect.artifacts API
If you can share how you set progress_artifact_id (the creation snippet) and where youâre looking in the UI, I can pinpoint the exact issue.Gustavo Fonseca
10/25/2025, 11:00 PMMarvin
10/25/2025, 11:00 PMMarvin
10/25/2025, 11:02 PMextract_frentes_membros task, then go to its Artifacts tab.
- Or give the artifact a key when creating it, then check the global Artifacts page.
2) Make sure youâre creating and updating the same Progress artifact
- Your pattern is right: create a Progress artifact at the start of the task, pass its UUID into your helper, and call update on that UUID.
- Since your task and helper are async, I recommend using the async API to avoid blocking and ensure the UI gets timely updates.
Suggested changes
from prefect import task, get_run_logger
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
# Create the progress artifact (optionally add a key to also see it on the global Artifacts page)
progress_artifact = await acreate_progress_artifact(
progress=0.0,
description="Download de membros de frentes da CĂąmara",
# key="frentes-membros-progress" # optional
)
<http://logger.info|logger.info>(f"Progress artifact id: {progress_artifact}")
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
jsons = await fetch_json_many_async(
urls=urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
progress_artifact_id=progress_artifact,
)
# Final update to 100% is a nice touch
await aupdate_progress_artifact(
artifact_id=progress_artifact,
progress=100.0,
description="Downloads concluĂdos"
)
dest = Path(out_dir) / "frentes_membros.ndjson"
return save_ndjson(cast(list[dict], jsons), dest)
And in your helper, use the async updater:
from prefect.artifacts import aupdate_progress_artifact
# ...
if progress_artifact_id and len(urls) > 0:
async with update_lock:
downloaded_urls += 1
await aupdate_progress_artifact(
artifact_id=progress_artifact_id,
progress=(downloaded_urls / len(urls)) * 100
)
Quick checklist
- Confirm youâre opening the task runâs details page and checking the Artifacts tab.
- Log the artifact UUID right after creation to ensure itâs not None.
- Try adding a key to the artifact on creation; then check the global Artifacts page to confirm it exists.
- Ensure your update_progress_artifact calls are actually executed (e.g., urls length > 0, no early returns).
Docs
- Concepts: Artifacts concept
- How-to: Create and use artifacts
- API: prefect.artifacts API
If you can share a screenshot of where youâre looking in the UI (task run details vs. expanded row) and your Prefect version, I can confirm precisely what you should see.Gustavo Fonseca
10/25/2025, 11:08 PMMarvin
10/25/2025, 11:08 PMMarvin
10/25/2025, 11:09 PM<coroutine object acreate_progress_artifact at ...> â that happens when acreate_progress_artifact(...) wasnât awaited.
Fix
- Await the async creation and updates, and pass the UUID to your helper.
Correct pattern
from uuid import UUID
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact
@task(...)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
logger = get_run_logger()
progress_id = await acreate_progress_artifact(
progress=0.0,
description="Download de membros de frentes da CĂąmara",
key="frentes-membros-progress"
)
<http://logger.info|logger.info>(f"Progress artifact id: {progress_id} (type={type(progress_id)})")
assert isinstance(progress_id, UUID)
urls = frentes_membros_urls(frentes_ids)
<http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")
jsons = await fetch_json_many_async(
urls=urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
follow_pagination=True,
progress_artifact_id=progress_id, # <- pass the UUID
)
await aupdate_progress_artifact(
artifact_id=progress_id,
progress=100.0,
description="Downloads concluĂdos"
)
dest = Path(out_dir) / "frentes_membros.ndjson"
return save_ndjson(cast(list[dict], jsons), dest)
And inside your helper:
from prefect.artifacts import aupdate_progress_artifact
# ...
if progress_artifact_id and len(urls) > 0:
async with update_lock:
downloaded_urls += 1
await aupdate_progress_artifact(
artifact_id=progress_artifact_id,
progress=(downloaded_urls / len(urls)) * 100
)
Also, make sure youâre looking at:
- Task run details -> Artifacts tab (without a key), or
- Global Artifacts page (since you added a key).
Docs for reference:
- API: prefect.artifacts API
- How-to: Create and use artifacts
If you still hit 404 after this, paste the exact lines where you create the artifact and log its value/type. Thatâll confirm whether itâs a UUID or not.Gustavo Fonseca
10/25/2025, 11:24 PMMarvin
10/25/2025, 11:24 PMMarvin
10/25/2025, 11:28 PMaupdate_progress_artifact on meaningful increments (e.g., every N items or every 1â2s) so the UI has time to poll and show visible changes.
- Confirm youâre looking in the task runâs Artifacts tab or the global Artifacts page (since you added a key).
- Make sure youâre using the async updater and awaiting it:
- progress_id = await acreate_progress_artifact(...)
- await aupdate_progress_artifact(artifact_id=progress_id, progress=...)
Minimal repro you can run to observe UI behavior
from prefect import flow
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact
import asyncio
@flow
async def progress_ui_test():
aid = await acreate_progress_artifact(progress=0, key="ui-progress-test", description="UI progress test")
for i in range(0, 101, 10):
await aupdate_progress_artifact(artifact_id=aid, progress=i, description=f"{i}%")
await asyncio.sleep(2) # allow the UI to poll between updates
progress_ui_test()
- Open the global Artifacts page and watch the artifact with key ui-progress-test. If the circle doesnât move unless you refresh, it confirms the polling/refresh behavior.
If it still looks stuck after trying the above
- Share these details and I can dig deeper or file an issue:
- Prefect version (prefect version)
- Prefect Cloud or self-hosted (and UI URL/port â I see 127.0.0.1:4200)
- Which exact page/view shows the stuck circle (global Artifacts list, task run Artifacts tab, run summary, etc.)
- Browser and version
- Whether throttling updates (e.g., every 1â2s) changes the behavior
References
- Artifacts how-to: Create and use artifacts
- API reference: prefect.artifacts APIGustavo Fonseca
10/25/2025, 11:45 PMMarvin
10/25/2025, 11:45 PMMarvin
10/25/2025, 11:46 PMimport time
last_sent = 0.0
# inside your async update block
now = time.monotonic()
if (now - last_sent) >= 2.0 or downloaded_urls == len(urls):
last_sent = now
await aupdate_progress_artifact(
artifact_id=progress_artifact_id,
progress=(downloaded_urls / len(urls)) * 100
)
- Hard refresh the page (Shift + Reload) or navigate away/back if you need to see changes immediately.
- Confirm youâre on the latest 3.x; improvements land regularly. Run prefect version.
If you want to doubleâcheck the backend is updating (it is), you can hit the API directly:
# by artifact id
curl <http://127.0.0.1:4200/api/artifacts/<ARTIFACT_ID>>
# by key (list latest first)
curl "<http://127.0.0.1:4200/api/artifacts?key=frentes-membros-progress&sort=desc>"
API refs: Read artifacts, Update artifact.
If the circle still never advances without a manual refresh on the run details page, thatâs likely a UI limitation/bug in that specific view. If you can share:
- Prefect version (prefect version)
- UI build (self-host URL/port, e.g., 127.0.0.1:4200)
- Which exact page is âstuckâ (run details -> task bars vs Artifacts tab vs global Artifacts)
- Browser/version
âŠI can open an issue with the UI team on your behalf.
Also â whatâs your other question? Happy to help.Gustavo Fonseca
10/25/2025, 11:47 PMGustavo Fonseca
10/25/2025, 11:47 PMMarvin
10/25/2025, 11:47 PMMarvin
10/25/2025, 11:48 PMGustavo Fonseca
10/26/2025, 12:22 AMfrom pathlib import Path
from typing import cast
from uuid import UUID
from prefect import task, get_run_logger
from prefect.artifacts import create_progress_artifact
from datetime import timedelta
from <http://utils.io|utils.io> import download_stream
from utils.br_data import BR_STATES, ELECTIONS_YEARS
from config.loader import load_config, CACHE_POLICY_MAP
APP_SETTINGS = load_config()
# MONTAR ENDPOINT REDES SOCIAIS POR ESTADO
REDES_SOCIAIS_ENDPOINTS = {
f"redes_sociais_{year}_{state}": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/rede_social_candidato_{year}_{state}.zip"
for state in BR_STATES
for year in ELECTIONS_YEARS
}
TSE_ENDPOINTS = {
"candidatos_2018": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2018.zip",
"candidatos_2022": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2022.zip",
"prestaca_contas_2018": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2018.zip",
"prestacao_contas_2022": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2022.zip",
"resultado_eleicao_2018": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2018.zip",
"resultado_eleicao_2022": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2022.zip"
}
TSE_ENDPOINTS = TSE_ENDPOINTS | REDES_SOCIAIS_ENDPOINTS
@task(
task_run_name="extract_tse_{name}",
retries=APP_SETTINGS.TSE.RETRIES,
retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION)
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
logger = get_run_logger()
progress_id = create_progress_artifact(
progress=0.0,
description=f"Download do arquivo {name}, do TSE"
)
dest = Path(out_dir) / f"{name}.zip"
<http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{url}' -> {dest}")
dest_path = download_stream(url, dest, unzip=True, progress_artifact_id=cast(UUID, progress_id))
return dest_path
# TSE: ~30 endpoints em paralelo tse_fs = [ cast(Any, extract_tse) .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ]
Acima tenho uma função a qual quero criar um artefato agregado de todos esses downloads feitos em paralelo. Como posso juntar esses dados em um artefato prefect ou atualizar uma tabela de artefato ja criado?Marvin
10/26/2025, 12:22 AMMarvin
10/26/2025, 12:24 AMfrom prefect import flow
from prefect.futures import resolve_futures_to_results
from prefect.artifacts import create_table_artifact
@flow
def pipeline(...):
# submit tasks
tse_fs = [
extract_tse.with_options(refresh_cache=refresh_cache).submit(name, url)
for name, url in TSE_ENDPOINTS.items()
]
# resolve results to paths (or whatever you want in the table)
results = resolve_futures_to_results(tse_fs)
# build a table: list[dict] works nicely
table = [
{"name": name, "url": url, "path": path}
for (name, url), path in zip(TSE_ENDPOINTS.items(), results)
]
# create/update the aggregate table artifact (new version each run)
create_table_artifact(
table=table,
key="tse-downloads-summary",
description="Resumo dos downloads TSE desta execução"
)
return results
Option B â Stream partials and republish new versions
- If you want the table to evolve during the run, you can publish a new table artifact with the same key as chunks complete. This does NOT update in place; it creates new versions and the UI will show the latest version.
- Throttle how often you republish (e.g., every N completions or every few seconds) to avoid spamming.
Notes and limitations
- Table, Markdown, Link, and Image artifacts are immutable; re-creating with the same key creates a new version.
- Only Progress artifacts can be updated in place (update_progress_artifact or aupdate_progress_artifact).
- To retrieve the latest version later, you can query by key (e.g., Artifact.get(key="tse-downloads-summary")) if you need it downstream.
If you want a row-per-file download with live-ish updates, I recommend Option A for simplicity. If you really want incremental visibility, do a periodic republish:
from prefect.artifacts import create_table_artifact
from time import monotonic
last_pub = 0.0
def maybe_publish(partial_rows):
global last_pub
now = monotonic()
if now - last_pub >= 5.0 or len(partial_rows) == TOTAL:
create_table_artifact(
table=partial_rows,
key="tse-downloads-summary",
description="Resumo parcial dos downloads TSE"
)
last_pub = now
If you share what columns you want in the summary (e.g., name, url, destination path, bytes, duration, success/error), I can draft the exact table building code.Gustavo Fonseca
10/26/2025, 12:42 AMMarvin
10/26/2025, 12:42 AMMarvin
10/26/2025, 12:42 AMfrom prefect.artifacts import acreate_table_artifact
# ...
artifact_id = await acreate_table_artifact(
key="frentes_membros",
table=artifact_data,
description="Total de membros encontrados nas frentes."
)
- Or keep the sync name but await it in async code:
from prefect.artifacts import create_table_artifact
# ...
artifact_id = await create_table_artifact(
key="frentes_membros",
table=artifact_data,
description="Total de membros encontrados nas frentes."
)
Tip
- Table artifacts are immutable; calling (a)create_table_artifact again with the same key creates a new version, not an in-place update.
- Keep using the async variants consistently inside async tasks: you already do this for progress artifacts, so mirroring that for table artifacts is a good practice.
Docs
- How-to: Create and use artifacts
- API: prefect.artifacts APIGustavo Fonseca
10/26/2025, 12:55 PMMarvin
10/26/2025, 12:55 PMMarvin
10/26/2025, 12:56 PMdeputados_f = extract_deputados.submit(legislatura)
assiduidade_f = extract_assiduidade_deputados.submit(deputados_f, legislatura)
This creates a dependency: assiduidade waits for deputados to finish, then starts. It will run in parallel with all other branches that donât depend on deputados.
2) If you want per-id parallelism for assiduidade
Tasks cannot schedule other tasks (only flows can). To fan out per-id work, use a subflow or resolve the list in the flow, then submit per-id tasks.
Option A: Subflow that fans out
- Make a small subflow that takes the list of IDs and submits per-id tasks. Then call that subflow with the future; Prefect will start the subflow as soon as deputados finishes.
from prefect import flow, task
from prefect.futures import resolve_futures_to_results
@task
def extract_assiduidade_for_one(dep_id: str, legislatura: int) -> dict:
# ... compute one deputadoâs assiduidade ...
return {"id": dep_id, "ok": True}
@flow
def extract_assiduidade_deputados(ids: list[str], legislatura: int) -> list[dict]:
fs = [extract_assiduidade_for_one.submit(i, legislatura) for i in ids]
return resolve_futures_to_results(fs)
# In your main flow:
deputados_f = extract_deputados.submit(legislatura)
assiduidade_f = extract_assiduidade_deputados.submit(deputados_f, legislatura)
- This runs âright after deputados finishesâ and in parallel with other tasks already running.
Option B: Await the future in the flow, then submit per-id tasks
- Since your flow is async, you can await just that future to get the list and immediately fan out. Other tasks submitted earlier keep running in parallel.
deputados_ids = await deputados_f.result() # or just await deputados_f if you prefer
assiduidade_fs = [extract_assiduidade_for_one.submit(i, legislatura) for i in deputados_ids]
Which should you choose?
- If you want assiduidade as a single aggregated task, keep your original pattern with .submit(deputados_f, legislatura).
- If you want per-id parallelism, use the subflow pattern (Option A) or await-and-fan-out (Option B). Donât call .submit inside a task; it wonât schedule child tasks.
Notes
- Your ThreadPoolTaskRunner will run these tasks concurrently as they are submitted.
- Avoid calling .result() on futures too early in the flow unless you intend to gate subsequent scheduling on that completion. In your case, awaiting just deputados_f to fan out its dependents is perfectly fine; everything else already submitted keeps running.
If you share the signature of extract_assiduidade_deputados (what it expects and returns), I can tailor the exact code for either single-task or per-id parallel patterns.Gustavo Fonseca
10/26/2025, 1:04 PMMarvin
10/26/2025, 1:04 PMMarvin
10/26/2025, 1:05 PMfrom pathlib import Path
from typing import Sequence, cast
from uuid import UUID
from prefect import task, get_run_logger
from prefect.artifacts import (
acreate_progress_artifact,
aupdate_progress_artifact,
)
from selectolax.parser import HTMLParser
import re
from <http://utils.io|utils.io> import fetch_html_many_async, save_ndjson
from config.loader import load_config
APP_SETTINGS = load_config()
def assiduidade_urls(deputados_ids: Sequence[str], leg_years: Sequence[int]) -> list[str]:
return [
f"{APP_SETTINGS.CAMARA.PORTAL_BASE_URL}deputados/{dep_id}/presenca-plenario/{year}"
for dep_id in deputados_ids
for year in leg_years
]
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_assiduidade_deputados(
deputados_ids: list[str],
legislatura: dict,
out_dir: str | Path = "data/camara/assiduidade",
) -> str:
logger = get_run_logger()
# Get years safely
leg_years = legislatura.get("dados", {}).get("anosPassados", [])
if not isinstance(leg_years, list):
leg_years = []
# Normalize IDs to strings
dep_ids = [str(i) for i in deputados_ids]
progress_id = await acreate_progress_artifact(
progress=0.0,
description="Progresso do download da assiduidade de deputados",
)
<http://logger.info|logger.info>(f"Progress artifact id: {progress_id}")
urls = assiduidade_urls(dep_ids, cast(list[int], leg_years))
<http://logger.info|logger.info>(f"CĂąmara: buscando assiduidade de {len(dep_ids)} deputados dos anos {leg_years}")
htmls = await fetch_html_many_async(
urls=urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.TIMEOUT,
progress_artifact_id=progress_id,
)
await aupdate_progress_artifact(
artifact_id=progress_id,
progress=100.0,
description="Downloads concluĂdos",
)
href_pattern = re.compile(r"<https://www>\.camara\.leg\.br/deputados/\d+")
id_ano_pattern = r"/deputados/(?P<id>\d+)\?.*ano=(?P<ano>\d+)"
json_results = []
for html in htmls:
tree = HTMLParser(cast(str, html))
for link in tree.css("a"):
href = link.attributes.get("href", "") or ""
if not isinstance(href, str):
logger.warning(f"O href {href} nĂŁo Ă© string")
continue
if href_pattern.match(href):
match = re.search(id_ano_pattern, href)
if match:
deputado_id = int(match.group("id"))
ano = int(match.group("ano"))
json_results.append({"deputado_id": deputado_id, "ano": ano, "html": html})
else:
logger.warning("NĂŁo foram encontrados dados suficientes na pĂĄgina HTML")
dest = Path(out_dir) / "assiduidade.ndjson"
dest_path = save_ndjson(json_results, dest)
return dest_path
At the call site
- Keep your pipeline code passing the future directly; Prefect will resolve it. If Pylance complains at the submit line, silence it with a cast just at the call site (only for the linter; runtime is fine):
```
assiduidade_f = extract_assiduidade_deputados.submit(cast(list[str], deputados_f), legislatura)
# or, more broadly:Marvin
10/26/2025, 1:05 PMextract_deputados task is annotated to return list[str], so the future is PrefectFuture[list[str]]. Then use a small helper subflow to fan out per-id if needed (we discussed patterns earlier).
Why not accept PrefectFuture in the task signature?
- Tasks should receive materialized values. Prefect handles resolving futures automatically before invoking your task. Including PrefectFuture in the type hints confuses static type checkers and your own code, as you saw when you tried to use the union in places that expect concrete types.
If you want, paste the signature and return type of extract_deputados and Iâll align the types end-to-end to make Pylance happy at both the definition and submit sites.Gustavo Fonseca
10/26/2025, 1:17 PMMarvin
10/26/2025, 1:17 PMMarvin
10/26/2025, 1:22 PM@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=APP_SETTINGS.CAMARA.TASK_TIMEOUT_SECONDS, # e.g., 900
)
âŠand use a different setting for HTTP per-request timeout:
htmls = await fetch_html_many_async(
urls=urls,
concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
timeout=APP_SETTINGS.CAMARA.HTTP_TIMEOUT_SECONDS, # e.g., 20â60
progress_artifact_id=progress_id,
)
2) Override per submission
assiduidade_f = extract_assiduidade_deputados.with_options(
timeout_seconds=900 # or None to disable
).submit(deputados_f, legislatura)
3) Split the work into smaller tasks
- Chunk the URLs and submit multiple tasks (e.g., 200 URLs per task). Each task finishes faster and stays below the timeout.
- Or use a subflow to fan out and collect results, as discussed earlier.
Helpful sanity checks
- Log the number of URLs and your concurrency:
- total_urls = len(deputados_ids) * len(leg_years)
- With concurrency C and average per-request time T seconds, the lower bound is roughly ceil(total_urls / C) * T. Make sure your task timeout comfortably exceeds that.
- Verify your years extraction is correct; the current line assumes a list at dados:
# safer:
leg_years = (legislatura.get("dados") or {}).get("anosPassados") or []
# ensure ints
leg_years = [int(y) for y in leg_years]
Using [...][0] will throw if dados isnât a list.
Summary
- This is a Prefect task timeout. Increase timeout_seconds (or set to None) for the task, and keep a separate smaller HTTP timeout.
- Consider chunking work or fanning out if total runtime is large.Gustavo Fonseca
10/26/2025, 4:51 PMMarvin
10/26/2025, 4:51 PMMarvin
10/26/2025, 4:53 PMfrom datetime import date
from prefect import task
from prefect.utilities.hashing import hash_objects
def assiduidade_cache_key(context, params) -> str | None:
# Expect params to include legislatura (dict) and deputados_ids (list[str])
today_year = date.today().year
leg_years = (params.get("legislatura") or {}).get("dados", {}).get("anosPassados") or []
dep_ids = params.get("deputados_ids") or []
# If any requested year is current year, skip caching this run entirely
# (You could be more granular if you split per-year work into separate tasks)
if any(int(y) == today_year for y in leg_years):
return None
# Otherwise, cache by the combination of dep_ids and leg_years
# Normalize to strings and a stable ordering
key = hash_objects(sorted(map(str, dep_ids)), sorted(int(y) for y in leg_years))
return f"assiduidade:{key}"
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=1800,
cache_key_fn=assiduidade_cache_key,
cache_expiration=timedelta(days=30), # adjust to taste
)
async def extract_assiduidade_deputados(...):
...
Option B: Define a CachePolicy (reusable)
from datetime import date, timedelta
from prefect.cache_policies import CachePolicy
from prefect.utilities.hashing import hash_objects
def assiduidade_cache_key(context, params) -> str | None:
today_year = date.today().year
leg_years = (params.get("legislatura") or {}).get("dados", {}).get("anosPassados") or []
dep_ids = params.get("deputados_ids") or []
if any(int(y) == today_year for y in leg_years):
return None
key = hash_objects(sorted(map(str, dep_ids)), sorted(int(y) for y in leg_years))
return f"assiduidade:{key}"
ASSIDUIDADE_CACHE = CachePolicy.from_cache_key_fn(
assiduidade_cache_key,
# optional: key_storage=..., isolation_level=..., lock_manager=...
)
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=1800,
cache_policy=ASSIDUIDADE_CACHE,
cache_expiration=timedelta(days=30),
)
async def extract_assiduidade_deputados(...):
...
Granular per-year caching (optional but cleaner)
- Since you fetch multiple years in one task, skipping cache for âany current year presentâ disables cache entirely for that run.
- If you want past years to be cached and only the current year to be fetched, split the work per-year:
- Have the task accept a single year
- Submit one task per year
- Use cache_key_fn that keys by year; return None when year == current_year.
Example:
@task(cache_key_fn=lambda _, p: None if p["year"] == date.today().year else f"assiduidade:{p['year']}", cache_expiration=timedelta(days=180))
async def extract_assiduidade_year(deputados_ids: list[str], year: int) -> str:
# do the same logic but for one year
...
# In your flow after you have dep_ids and leg_years:
fs = [extract_assiduidade_year.submit(dep_ids, y) for y in leg_years]
This gives you the best of both worlds: current-year runs fresh; past years reuse cache and donât re-download.
A few extra correctness tweaks for your current code
- Safer access to years:
leg_years = (legislatura.get("dados") or {}).get("anosPassados") or []
leg_years = [int(y) for y in leg_years]
Your [0] indexing will crash if dados isnât a list.Marvin
10/26/2025, 4:53 PMGustavo Fonseca
10/26/2025, 10:01 PMMarvin
10/26/2025, 10:01 PMMarvin
10/26/2025, 10:01 PMcache_key_fn=lambda _, year: None if year == date.today().year else f"assiduidade:{year}"
Here, âyearâ is actually the entire params dict, so Prefect tries to use a cache key like:
"assiduidade:{'deputados_ids': [...], 'legislatura_year': 2022, 'out_dir': '...'}"
That string contains braces, colons, commas, etc. Prefect stores cache data under ~/.prefect/storage/<cache_key>, so on Windows this becomes an invalid directory name and you get WinError 123.
Fix: read the year from the params dict and return a filesystem-safe key. Also, prefer hashing inputs if you want the cache to reflect the full input set.
Example (robust and Windows-safe)
from datetime import date, timedelta
from prefect.utilities.hashing import hash_objects
def assiduidade_cache_key(_, params) -> str | None:
# pull year from params dict
year = int(params["legislatura_year"])
# skip caching for the current year
if year == date.today().year:
return None
# Include deputies list in the key so cache changes if inputs change
dep_ids = [str(i) for i in params.get("deputados_ids", [])]
deps_hash = hash_objects(sorted(dep_ids)) or "noids"
# Return a short, filesystem-safe key (avoid :, {}, [], spaces, etc.)
return f"assiduidade-{year}-{deps_hash}"
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=1800,
cache_key_fn=assiduidade_cache_key,
cache_expiration=timedelta(days=180),
)
async def extract_assiduidade_deputados(
deputados_ids: list[str],
legislatura_year: int,
out_dir: str | Path = "data/camara",
) -> str:
...
If you truly want to cache only by year (regardless of which IDs you pass), you can simplify:
@task(
...,
cache_key_fn=lambda _, p: None if int(p["legislatura_year"]) == date.today().year
else f"assiduidade-{int(p['legislatura_year'])}",
cache_expiration=timedelta(days=180)
)
Note the lambdaâs second arg is p (the params dict), and the key avoids colons/braces.
Why this is necessary
- Prefect stores cache entries under a directory named by your key. Keys must be filesystem-safe, especially on Windows.
- The cache_key_fn must accept (context, params_dict). Returning None skips caching for that run.
Related docs
- Caching concepts: Caching concepts
- Cache policies API: prefect.cache_policies
- Hash helpers: prefect.utilities.hashing
If you want, I can also add a small validation that logs the computed cache key on task start, so you can quickly confirm it looks clean and behaves as expected.Gustavo Fonseca
10/26/2025, 10:08 PMMarvin
10/26/2025, 10:08 PMMarvin
10/26/2025, 10:08 PMimport os
from pathlib import Path
import json
def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
dest_path = Path(dest_path)
dest_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = dest_path.with_suffix(dest_path.suffix + ".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
# Atomic replace on the same filesystem
os.replace(tmp_path, dest_path)
return str(dest_path)
Use unique filenames per task
# inside extract_assiduidade_deputados(...)
dest = Path(out_dir) / f"assiduidade_{legislatura_year}.ndjson"
dest_path = save_ndjson(json_results, dest)
return dest_path
Merge step (downstream)
import os, shutil
from pathlib import Path
def merge_ndjson(inputs: list[str | Path], dest: str | Path) -> str:
dest = Path(dest)
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as out:
for p in inputs:
p = Path(p)
if not p.exists():
continue
with open(p, "r", encoding="utf-8") as f:
shutil.copyfileobj(f, out)
os.replace(tmp, dest)
return str(dest)
In your flow
# submit one task per year (what youâre already doing)
assiduidade_fs = [
extract_assiduidade_deputados.submit(cast(Any, deputados_f), ano)
for ano in anos_passados
]
# later, merge all NDJSONs deterministically
from prefect.futures import resolve_futures_to_results
paths = resolve_futures_to_results(assiduidade_fs)
final_path = merge_ndjson(paths, Path("data/camara") / "assiduidade.ndjson")
# create a single aggregate artifact here if desired
from prefect.artifacts import create_table_artifact
# build an aggregated table across years, then:
# create_table_artifact(key="assiduidade", table=agg_rows, description="...")
If you really must write to a single file from multiple tasks
- Open in append mode with a file lock. This is still risky across processes and can interleave lines if not locked properly. A lock file (e.g., using the filelock package) can coordinate, but the aggregate step is cleaner:
from filelock import FileLock
def append_ndjson(records: list[dict], dest_path: str | Path):
dest_path = Path(dest_path)
dest_path.parent.mkdir(parents=True, exist_ok=True)
lock = FileLock(str(dest_path) + ".lock")
with lock:
with open(dest_path, "a", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
About artifacts
- Youâre calling acreate_table_artifact(key="assiduidade") inside each per-year task. That will publish multiple versions with the same key; the latest one wins in the UI.
- Options:
- Key per year (e.g., assiduidade-2022, assiduidade-2023), or
- Move table creation to the merge/aggregate step and publish a single combined table once.
This will keep your writes safe and your artifacts consistent while tasks run in parallel. Want me to refactor your pipeline snippet to include the merge step and a single aggregate artifact?Gustavo Fonseca
10/26/2025, 10:40 PMMarvin
10/26/2025, 10:40 PMMarvin
10/26/2025, 10:41 PMimport os, json
from pathlib import Path
from uuid import uuid4
def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
dest_path = Path(dest_path)
dest_path.parent.mkdir(parents=True, exist_ok=True)
# unique tmp file in the same directory to keep replace atomic
tmp_path = dest_path.with_suffix(dest_path.suffix + f".{os.getpid()}.{uuid4().hex}.tmp")
try:
with open(tmp_path, "w", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
os.replace(tmp_path, dest_path) # atomic swap on same filesystem
finally:
# best-effort cleanup if something went wrong before replace
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
return str(dest_path)
Notes
- Keep the tmp file on the same filesystem/drive as dest; atomic replace semantics depend on that.
- On Windows/NTFS, os.replace is atomic, but replacing a file that another process has open in exclusive mode can raise an error. Generally fine for standard usage.
- For truly concurrent append-by-many-writers, use an append function with a file lock (e.g., filelock). The aggregate-then-merge pattern youâre using is safer and simpler.
So, no adverse impact for typical usage â it just makes writes safer. If you have any readers that depended on reading lines while the file was being written, theyâll now need to wait until the write completes.Gustavo Fonseca
10/26/2025, 10:52 PMMarvin
10/26/2025, 10:52 PMMarvin
10/26/2025, 10:53 PMfrom datetime import date, timedelta
def assiduidade_cache_key(_, p) -> str | None:
year = int(p["legislatura_year"])
if year == date.today().year:
return None # skip cache for current year
return f"assiduidade-{year}" # short, filesystem-safe key
@task(
retries=APP_SETTINGS.CAMARA.RETRIES,
retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
timeout_seconds=1800,
cache_key_fn=assiduidade_cache_key,
cache_expiration=timedelta(days=180),
)
async def extract_assiduidade_deputados(...):
...
2) Write a unique NDJSON per year inside the task
- Right now each per-year task writes to the same "assiduidade.ndjson", so the last writer wins. Return a per-year file and merge later (you already have merge_ndjson downstream).
Inside the task
dest = Path(out_dir) / f"assiduidade_{legislatura_year}.ndjson"
dest_path = save_ndjson(json_results, dest)
return dest_path
3) Donât block the flow before submitting other branches
- You call resolve_futures_to_results(assiduidade_fs) before submitting frentes/frentes_membros. That pauses scheduling of those tasks until assiduidade is fully done.
- Submit frentes/frentes_membros first, then resolve assiduidade when you actually need the paths (or make merge a task).
Example reordering
# submit assiduidade tasks
assiduidade_fs = [
extract_assiduidade_deputados.submit(cast(Any, deputados_f), ano)
for ano in anos_passados
]
# submit independent branch BEFORE waiting
frentes_f = extract_frentes.submit(legislatura)
frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
# now resolve and merge
paths = resolve_futures_to_results(assiduidade_fs)
final_path = merge_ndjson(paths, Path("data/camara") / "assiduidade.ndjson")
Optional: make merge_ndjson a @task so it shows up in the UI and runs on the task runner:
@task
def merge_ndjson_task(paths: list[str], dest: str | Path) -> str:
return merge_ndjson(paths, dest)
merged_f = merge_ndjson_task.submit(assiduidade_fs, Path("data/camara") / "assiduidade.ndjson")
4) Artifacts
- If you publish a table artifact per-year inside the task, use a per-year key to avoid overwriting versions immediately:
await acreate_table_artifact(
key=f"assiduidade-{legislatura_year}",
table=artifact_data,
description=f"Assiduidade de deputados ({legislatura_year})",
)
- If you want one combined table, move table creation to the merge/aggregate step instead.
5) Safer anos_passados extraction
- Your pipeline uses legislatura.get("dados", [])[0] which assumes a list. If dados is a dict, this will raise.
dados = (legislatura.get("dados") or {})
anos_passados = dados.get("anosPassados", [])
6) Optional: force-refresh current-year when needed
- You can bypass cache per-run by setting refresh_cache at submit:
extract_assiduidade_deputados.with_options(refresh_cache=refresh_cache).submit(...)
Quick checklist
- Unique per-year filenames in the task: yes
- Cache key uses params dict and is filesystem-safe: yes
- Submit independent branches before resolving futures: yes
- Table artifacts either per-year key or created once in aggregate: choose one
- Safer anos_passados access: yes
References
- Caching: Caching concepts
- Artifacts: Artifacts how-toGustavo Fonseca
10/26/2025, 11:02 PMMarvin
10/26/2025, 11:02 PMMarvin
10/26/2025, 11:03 PMimport os, json
from pathlib import Path
from uuid import uuid4
def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
dest_path = Path(dest_path)
dest_path.parent.mkdir(parents=True, exist_ok=True)
# unique tmp in same directory (same filesystem) to keep replace atomic
tmp_path = dest_path.with_name(dest_path.name + f".{os.getpid()}.{uuid4().hex}.tmp")
try:
with open(tmp_path, "w", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
os.replace(tmp_path, dest_path) # atomic swap on same volume
finally:
# best-effort cleanup if something went wrong before replace
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
return str(dest_path)
Fix 2 â Ensure you donât spawn duplicate writers
- Deduplicate years before submitting tasks:
- anos_passados = sorted(set(anos_passados))
- If you run multiple flow runs concurrently (or task retries can overlap), make the output path unique per run to avoid cross-run conflicts, e.g., include flow run id or a timestamp:
- assiduidade_{year}_{flow_run_id}.ndjson
- Then merge downstream into a canonical file.
Fix 3 â Keep one writer per final file
- Youâre already writing per-year and merging later â thatâs good.
- Double-check that each per-year task writes to assiduidade_{year}.ndjson (not all to the same filename).
- If you ever need multiple writers to append to a single file, use a file lock and append mode instead, but the aggregate-then-merge pattern is simpler and safer.
Optional diagnostics
- Log the tmp_path and dest before the replace; if the error repeats, check for duplicate submissions for the same year in logs.
- After the write block, add assert Path(tmp_path).exists() to catch early if the tmp is missing.
This change is drop-in: it wonât affect other callers except that files will âappearâ only when fully written (safer) and tmp files are unique per writer.Gustavo Fonseca
10/26/2025, 11:09 PMMarvin
10/26/2025, 11:09 PMMarvin
10/26/2025, 11:09 PMimport os, json
from pathlib import Path
def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
dest_path = Path(dest_path)
dest_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = dest_path.with_suffix(dest_path.suffix + ".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
os.replace(tmp_path, dest_path) # atomic on same filesystem
return str(dest_path)
More robust (avoid tmp collisions when multiple writers target the same dest)
import os, json
from pathlib import Path
from uuid import uuid4
def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
dest_path = Path(dest_path)
dest_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = dest_path.with_name(dest_path.name + f".{os.getpid()}.{uuid4().hex}.tmp")
try:
with open(tmp_path, "w", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
os.replace(tmp_path, dest_path)
finally:
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
return str(dest_path)
Notes
- Keep tmp in the same directory as dest so os.replace stays atomic.
- This pattern doesnât serialize concurrent writers; it just prevents partial files. If multiple tasks write the same dest concurrently, last-writer-wins. Use unique filenames per task (e.g., per-year) and merge later, as youâre doing.
- The frentes.ndjson path in your error suggests another task may still be writing to a single shared filename; ensure each parallel task writes a unique file or you coordinate writes.