<@ULVA73B9P> is it safe to store api keys on Prefe...
# marvin-ai
g
@Marvin is it safe to store api keys on Prefect Variables? Also why other people can see what i ask you? 😿
m
✅ thought for 182.7 seconds
Short answer: No. Don’t store API keys in Prefect Variables. - In Prefect 3.x, Variables are for non-sensitive config only. They’re plain strings and visible to anyone with read access to your workspace. Variables docs - For secrets, use a Secret block (or an external secret manager). Secret block values are encrypted at rest and retrieved only when needed. - Prefect Cloud: secrets are encrypted and stored in a separate, tightly controlled project. Security ‱ Shared responsibility model - Prefect OSS: secrets are encrypted at rest by the server. How to store secrets ‱ Blocks concept Example (create and use a Secret block):
Copy code
from prefect.blocks.system import Secret

# Create and save once (e.g., in a setup script or REPL)
Secret(value="YOUR_API_KEY").save("my-api-key")

# Load and use in your flow
from prefect import flow
from prefect.blocks.system import Secret

@flow
def my_flow():
    api_key = Secret.load("my-api-key").get()
    # use api_key safely here

if __name__ == "__main__":
    my_flow()
Prefer external managers when you can (AWS Secrets Manager, GCP Secret Manager, Vault) via Prefect integrations: - prefect-aws - prefect-gcp - prefect-snowflake On “why other people can see what I ask you?” - If you’re chatting in a public Slack channel (like the Prefect Community Slack), everyone in that channel can see your messages. For privacy, send me a direct message or move to a private channel with only the people you want included. And regardless, never paste real secrets into Slack. If you’d like, tell me which variables you’re currently using for keys and I can help you migrate them to Secret blocks.
g
@Marvin Im getting an error trying to reset my database using this command: uv run prefect server database reset The error says: sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) NOT NULL constraint failed: _alembic_tmp_task_run.flow_run_id [SQL: INSERT INTO _alembic_tmp_task_run (id, created, updated, name, state_type, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, task_key, dynamic_key, cache_key, cache_expiration, task_version, empirical_policy, task_inputs, tags, flow_run_id, state_id, state_name, state_timestamp, flow_run_run_count) SELECT task_run.id, task_run.created, task_run.updated, task_run.name, task_run.state_type, task_run.run_count, task_run.expected_start_time, task_run.next_scheduled_start_time, task_run.start_time, task_run.end_time, task_run.total_run_time, task_run.task_key, task_run.dynamic_key, task_run.cache_key, task_run.cache_expiration, task_run.task_version, task_run.empirical_policy, task_run.task_inputs, task_run.tags, task_run.flow_run_id, task_run.state_id, task_run.state_name, task_run.state_timestamp, task_run.flow_run_run_count FROM task_run] (Background on this error at: https://sqlalche.me/e/20/gkpj) An exception occurred.
m
✅ thought for 248.8 seconds
Thanks for the traceback — that’s an Alembic migration failing because there are existing
task_run
rows where
flow_run_id
is NULL, but the migration is trying to enforce
NOT NULL
on that column. This typically happens when an older DB/schema has inconsistent data and we run a newer migration. First, a couple quick checks: - Make sure no Prefect server/API process is running while you reset (stop any running
prefect server start
or API containers). - Confirm which DB you’re targeting and that you’re resetting the right one:
Copy code
uv run prefect config view --show-sources
Look for
server.database.connection_url
and
home
. If it’s SQLite, you’ll see a
sqlite+aiosqlite:///...
path to a .db file under
$PREFECT_HOME
(by default under your home directory). Ways to fix Option A — Nuke-and-recreate (SQLite and you don’t need the data) 1) Stop Prefect server/API. 2) Back up and delete the SQLite file reported by
server.database.connection_url
. 3) Recreate:
Copy code
uv run prefect server database reset -y
Then start the server again. Option B — Clean bad rows and retry (if you need to keep the rest of the data) - For SQLite (replace with the actual path you saw in config view):
Copy code
sqlite3 /path/to/your.db "DELETE FROM task_run WHERE flow_run_id IS NULL;"
- For Postgres (replace with your DSN/connection and schema if needed):
Copy code
psql "$PREFECT_SERVER_DATABASE_CONNECTION_URL" -c "DELETE FROM task_run WHERE flow_run_id IS NULL;"
Then:
Copy code
uv run prefect server database reset -y
Option C — Ensure the reset is targeting the right DB If you manage multiple environments, force the target explicitly:
Copy code
PREFECT_SERVER_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///absolute/path/to/your.db" uv run prefect server database reset -y
If the error persists, please share: - The output of:
Copy code
uv run prefect version
uv run prefect config view --show-sources --show-defaults
- Whether you’re on SQLite or Postgres - Whether any Prefect server/API process was running when you ran the reset References: - Settings and profiles - Settings reference
g
@Marvin When i run the following command uv run prefect config view --show-sources i get these response: PREFECT_PROFILE='local' PREFECT_API_URL='http://127.0.0.1:4200/api' (from profile) So, no database path set. And yes i want to nuke the database. But what is the path to it? Yesterday i did many resets on the db while the server was running. Should i stop the server first?
m
✅ thought for 190.7 seconds
Short answers - Yes — stop the server before resetting or deleting the DB. Running resets while the server is running can leave the DB in a bad state. - By default, the local DB is a SQLite file under your Prefect home directory. If you haven’t customized anything, it’s typically at ~/.prefect/prefect.db Find the exact DB path - The DB URL is masked by default in config view. Show it explicitly:
Copy code
uv run prefect config view --show-sources --show-secrets | grep DATABASE_CONNECTION_URL
You should see something like:
Copy code
PREFECT_API_DATABASE_CONNECTION_URL='sqlite+aiosqlite:///.../.prefect/prefect.db'
Note: If you’re using Docker to run the server, that path lives inside the container’s filesystem unless you’ve mounted a volume. Nuke and recreate safely 1) Stop the server (Ctrl+C if it’s running in your terminal, or stop the process/container). 2) Delete the SQLite file shown by the command above (example for default):
Copy code
rm -f ~/.prefect/prefect.db
3) Recreate/reset the DB:
Copy code
uv run prefect server database reset -y
4) Start the server again:
Copy code
uv run prefect server start
If you’d rather not hunt for the file, you can force a fresh DB by explicitly setting the URL to a new path and running reset:
Copy code
PREFECT_API_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///$HOME/.prefect/prefect.db" uv run prefect server database reset -y
If anything’s unclear (OS, Docker vs local, etc.), let me know and I’ll tailor the exact commands. References: - Settings and profiles: Docs - Settings reference (PREFECT_HOME, API DB settings): Reference
g
@Marvin I got this code from the 102 course of Prefect. I did everything that the instructor did but inspecting the UI i see that theres no second task in the chart, the second task is marked with status pending in the Task Run menu inside Runs. But when i click in the Task pending i get the log Finished in state Completed(). Im running locally
m
✅ thought for 116.6 seconds
Got it — a few things can cause that “pending in list, but logs say Completed” mismatch. Can you share: - Your Prefect version (run:
uv run prefect version
) - How you’re running the server (the exact command, e.g.,
uv run prefect server start
) - A minimal snippet of the flow/tasks from the 102 lesson In the meantime, here are the most common causes and fixes: 1) UI state is stale - The Task Runs table does not always auto-refresh. Hard refresh the page and toggle “Live updates” on the run page. - Make sure your Python session is pointed at the same API URL you’re viewing in the browser (
PREFECT_API_URL=<http://127.0.0.1:4200/api>
if you’re truly local). 2) Server was started without services or the server wasn’t restarted cleanly - Ensure you’re not using
--no-services
. - Stop the server completely, then start it again:
Copy code
# stop any running server
# then start it cleanly
uv run prefect server start
3) Local database got into a weird state (especially if you reset while the server was running) - Yes — stop the server first. Then nuke and recreate:
Copy code
# stop server first
uv run prefect config view --show-sources --show-secrets | grep DATABASE_CONNECTION_URL
# delete the sqlite file that command shows (typically ~/.prefect/prefect.db)
rm -f ~/.prefect/prefect.db
uv run prefect server database reset -y
uv run prefect server start
4) The second task wasn’t actually scheduled/executed (common call patterns) - In Prefect 3.x, either call tasks directly (sequential) or use
.submit()
and then
.result()
for concurrency. Examples: Direct calls (simple and correct):
Copy code
from prefect import flow, task

@task
def plus_one(x): 
    return x + 1

@task
def times_two(x): 
    return x * 2

@flow
def demo():
    a = plus_one(1)
    b = times_two(a)
    return b

if __name__ == "__main__":
    demo()
Concurrent pattern (don’t forget to resolve results):
Copy code
@flow
def demo_concurrent():
    a_f = plus_one.submit(1)
    b_f = times_two.submit(a_f)
    b = b_f.result()  # ensure completion
    return b
If you share your snippet, I can confirm the pattern. Also, since you’ve done DB resets while the server was running, I’d strongly recommend the “stop server → delete DB file → reset → start server” sequence above and then re-run the flow. Docs for reference: - Flows and tasks in Prefect 3.x: Docs
g
I forgot the code. This is my code: Finished in state Completed()
I forgot the code heres my code: from prefect import flow, task import httpx @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" temps = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0]) print(f"Forecasted temp C: {forecasted_temp} degrees") return forecasted_temp @task def save_weather(temp: float): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float = 38.9, lon: float = -77.0): temp = fetch_weather(lat, lon) result = save_weather(temp) return result if name == "__main__": pipeline()
@Marvin I forgot the code heres my code: from prefect import flow, task import httpx @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" temps = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0]) print(f"Forecasted temp C: {forecasted_temp} degrees") return forecasted_temp @task def save_weather(temp: float): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float = 38.9, lon: float = -77.0): temp = fetch_weather(lat, lon) result = save_weather(temp) return result if name == "__main__": pipeline()
m
✅ thought for 20.5 seconds
Thanks for sharing the code — that looks correct and should produce two task nodes in the run graph:
fetch_weather
then
save_weather
. Given your earlier DB reset issues and that you’re seeing “Pending” in the list but “Finished in state Completed()” inside the details, this sounds like a UI/state index mismatch caused by the local server/database getting into a weird state. Do this to get to a clean slate and verify: 1) Stop the local server - Make sure no
prefect server start
is running. If you’re using Docker, stop the container. 2) Find and delete the local SQLite DB - Show the exact DB URL (it’s masked by default):
Copy code
uv run prefect config view --show-sources --show-secrets | grep DATABASE_CONNECTION_URL
- Delete the SQLite file shown there (typically
~/.prefect/prefect.db
), e.g.:
Copy code
rm -f ~/.prefect/prefect.db
3) Reset and start the server cleanly
Copy code
uv run prefect server database reset -y
uv run prefect server start
Make sure you’re not using
--no-services
. 4) Ensure your Python session points at your local API
Copy code
export PREFECT_API_URL="<http://127.0.0.1:4200/api>"
5) Run your flow with logging captured
Copy code
from prefect import flow, task
import httpx

@task(log_prints=True)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    temps = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
    print(f"Forecasted temp C: {forecasted_temp} degrees")
    return forecasted_temp

@task(log_prints=True)
def save_weather(temp: float):
    print(f"Saving {temp} to weather.csv")
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@flow(log_prints=True)
def pipeline(lat: float = 38.9, lon: float = -77.0):
    temp = fetch_weather(lat, lon)
    result = save_weather(temp)
    return result

if __name__ == "__main__":
    pipeline()
6) In the UI - Hard refresh the browser and toggle “Live updates” on the run page. - You should see both tasks in the Graph tab and both in the Task Runs list as Completed. If the second task still doesn’t appear in the Graph: - Please share: -
uv run prefect version
- Confirmation you started with
uv run prefect server start
(no extra flags) - A screenshot of the run page (Graph tab + Task Runs tab) - Also, confirm you’re looking at the same API your Python is pointing to (Local UI at http://127.0.0.1:4200 and
PREFECT_API_URL
set to http://127.0.0.1:4200/api).
g
@Marvin i have "solved" the issue by putting a sleep on the save task: from prefect import flow, task import httpx import time @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" temps = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0]) print(f"Forecasted temp C: {forecasted_temp} degrees") return forecasted_temp @task def save_weather(temp: str): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float = 38.9, lon: float = -77.0): temp = fetch_weather(lat, lon) result = save_weather(temp) time.sleep(1) return result if name == "__main__": pipeline() Heres the debug log: 163205.322 | DEBUG | prefect.profiles - Using profile 'local' 163206.128 | INFO | prefect - Starting temporary server on http://127.0.0.1:8730 See https://docs.prefect.io/v3/concepts/server#how-to-guides for more information on running a dedicated Prefect server. 163209.204 | DEBUG | prefect.task_runner.threadpool - Starting task runner 163209.208 | INFO | Flow run 'fiery-puffin' - Beginning flow run 'fiery-puffin' for flow 'pipeline' 163209.210 | DEBUG | prefect.utilities.urls - No URL found for the Prefect UI, and no default base path provided. 163209.210 | DEBUG | Flow run 'fiery-puffin' - Executing flow 'pipeline' for flow run 'fiery-puffin'... 163209.465 | DEBUG | prefect.client - Connecting to API at http://127.0.0.1:8730/api/ 163209.466 | DEBUG | prefect.client - Connecting to API at http://127.0.0.1:8730/api/ 163209.471 | DEBUG | Task run 'fetch_weather-c88' - Created task run 'fetch_weather-c88' for task 'fetch_weather' 163209.472 | DEBUG | Task run 'fetch_weather-c88' - Executing task 'fetch_weather' for task run 'fetch_weather-c88'... 163209.726 | DEBUG | prefect.events.clients - Reconnecting websocket connection. 163209.727 | DEBUG | prefect.events.clients - Opening websocket connection. 163209.729 | DEBUG | prefect.events.clients - Pinging to ensure websocket connected. 163209.730 | DEBUG | prefect.events.clients - Pong received. Websocket connected. 163209.730 | DEBUG | prefect.events.clients - Resending 0 unconfirmed events. 163209.730 | DEBUG | prefect.events.clients - Finished resending unconfirmed events. 163209.730 | DEBUG | prefect.client - Connecting to API at http://127.0.0.1:8730/api/ 163209.736 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emitting event id=0199f8ce-d87a-77bd-9594-b39dd867341c. 163209.736 | DEBUG | prefect.events.clients - Added event id=0199f8ce-d87a-77bd-9594-b39dd867341c to unconfirmed events list. There are now 1 unconfirmed events. 163209.736 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emit reconnection attempt 0. 163209.737 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Sending event id=0199f8ce-d87a-77bd-9594-b39dd867341c. 163209.737 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Checkpointing event id=0199f8ce-d87a-77bd-9594-b39dd867341c. 163209.738 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emitting event id=0199f8ce-d87f-75e4-8753-4d604c26cc26. 163209.738 | DEBUG | prefect.events.clients - Added event id=0199f8ce-d87f-75e4-8753-4d604c26cc26 to unconfirmed events list. There are now 2 unconfirmed events. 163209.738 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emit reconnection attempt 0. 163209.738 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Sending event id=0199f8ce-d87f-75e4-8753-4d604c26cc26. 163209.739 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Checkpointing event id=0199f8ce-d87f-75e4-8753-4d604c26cc26. Forecasted temp C: 13.3 degrees 163210.419 | INFO | Task run 'fetch_weather-c88' - Finished in state Completed() 163210.420 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emitting event id=0199f8ce-dc33-7034-ae5b-2ade75cf3652. 163210.421 | DEBUG | prefect.events.clients - Added event id=0199f8ce-dc33-7034-ae5b-2ade75cf3652 to unconfirmed events list. There are now 3 unconfirmed events. 163210.421 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emit reconnection attempt 0. 163210.421 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Sending event id=0199f8ce-dc33-7034-ae5b-2ade75cf3652. 163210.422 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Checkpointing event id=0199f8ce-dc33-7034-ae5b-2ade75cf3652. 163210.611 | DEBUG | prefect.client - Connecting to API at http://127.0.0.1:8730/api/ 163210.613 | DEBUG | Task run 'save_weather-6fe' - Created task run 'save_weather-6fe' for task 'save_weather' 163210.613 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emitting event id=0199f8ce-dcf4-71bf-92ee-47d6942fdd80. 163210.614 | DEBUG | Task run 'save_weather-6fe' - Executing task 'save_weather' for task run 'save_weather-6fe'... 163210.615 | DEBUG | prefect.events.clients - Added event id=0199f8ce-dcf4-71bf-92ee-47d6942fdd80 to unconfirmed events list. There are now 4 unconfirmed events. 163210.615 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emit reconnection attempt 0. 163210.616 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Sending event id=0199f8ce-dcf4-71bf-92ee-47d6942fdd80. 163210.616 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Checkpointing event id=0199f8ce-dcf4-71bf-92ee-47d6942fdd80. 163210.618 | INFO | Task run 'save_weather-6fe' - Finished in state Completed() 163210.621 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emitting event id=0199f8ce-dcf6-7182-80d9-662db1c8de5c. 163210.621 | DEBUG | prefect.events.clients - Added event id=0199f8ce-dcf6-7182-80d9-662db1c8de5c to unconfirmed events list. There are now 5 unconfirmed events. 163210.621 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emit reconnection attempt 0. 163210.622 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Sending event id=0199f8ce-dcf6-7182-80d9-662db1c8de5c. 163210.622 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Checkpointing event id=0199f8ce-dcf6-7182-80d9-662db1c8de5c. 163210.623 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emitting event id=0199f8ce-dcfa-7e6f-a42f-7c68e935dd1c. 163210.624 | DEBUG | prefect.events.clients - Added event id=0199f8ce-dcfa-7e6f-a42f-7c68e935dd1c to unconfirmed events list. There are now 6 unconfirmed events. 163210.625 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Emit reconnection attempt 0. 163210.625 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Sending event id=0199f8ce-dcfa-7e6f-a42f-7c68e935dd1c. 163210.625 | DEBUG | prefect.events.clients - EventsClient(id=1727037201712): Checkpointing event id=0199f8ce-dcfa-7e6f-a42f-7c68e935dd1c. 163211.628 | DEBUG | prefect.task_runner.threadpool - Stopping task runner 163211.629 | INFO | Flow run 'fiery-puffin' - Finished in state Completed() 163211.634 | INFO | prefect - Stopping temporary server on http://127.0.0.1:8730
m
Your message was too long, here's your message at the allowed limit: i have solved the issue by putting a sleep on the save task from prefect import flow task import httpx import time @task *def fetch_weather(lat float lon float) * *base_url = <https //api open-meteo com/v1/forecast/> * temps = httpx get( *base_url * *params=dict(latitude=lat longitude=lon hourly= temperature_2m ) * ) forecasted_temp = float(temps json()[ hourly ][ temperature_2m ][0]) print(f Forecasted temp C {forecasted_temp} degrees ) return forecasted_temp @task *def save_weather(temp str) * *with open( weather csv w+ ) as w * w write(str(temp)) *return Successfully wrote temp * @flow *def pipeline(lat float = 38 9 lon float = -77 0) * temp = fetch_weather(lat lon) result = save_weather(temp) time sleep(1) return result *if name == main * pipeline() *Heres the debug log * 16 32 05 322 | DEBUG | prefect profiles - Using profile 'local' 16 32 06 128 | INFO | prefect - Starting temporary server on <http //127 0 0 1 8730> See <https //docs prefect io/v3/concepts/server#how-to-guides> for more information on running a dedicated Prefect server 16 32 09 204 | DEBUG | prefect task_runner threadpool - Starting task runner 16 32 09 208 | INFO | Flow run 'fiery-puffin' - Beginning flow run 'fiery-puffin' for flow 'pipeline' 16 32 09 210 | DEBUG | prefect utilities urls - No URL found for the Prefect UI and no default base path provided 16 32 09 210 | DEBUG | Flow run 'fiery-puffin' - Executing flow 'pipeline' for flow run 'fiery-puffin' 16 32 09 465 | DEBUG | prefect client - Connecting to API at <http //127 0 0 1 8730/api/> 16 32 09 466 | DEBUG | prefect client - Connecting to API at <http //127 0 0 1 8730/api/> 16 32 09 471 | DEBUG | Task run 'fetch_weather-c88' - Created task run 'fetch_weather-c88' for task 'fetch_weather' 16 32 09 472 | DEBUG | Task run 'fetch_weather-c88' - Executing task 'fetch_weather' for task run 'fetch_weather-c88' 16 32 09 726 | DEBUG | prefect events clients - Reconnecting websocket connection 16 32 09 727 | DEBUG | prefect events clients - Opening websocket connection 16 32 09 729 | DEBUG | prefect events clients - Pinging to ensure websocket connected 16 32 09 730 | DEBUG | prefect events clients - Pong received Websocket connected 16 32 09 730 | DEBUG | prefect events clients - Resending 0 unconfirmed events 16 32 09 730 | DEBUG | prefect events clients - Finished resending unconfirmed events 16 32 09 730 | DEBUG | prefect client - Connecting to API at <http //127 0 0 1 8730/api/> 16 32 09 736 | DEBUG | prefect events clients - EventsClient(id=1727037201712) Emitting event id=0199f8ce-d87a-77bd-9594-b39dd867341c 16 32 09 736 | DEBUG | prefect events clients - Added event id=0199f8ce-d87a-77bd-9594-b39dd867341c to unconfirmed events list There are now 1 unconfirmed events 16 32 09 736 | DEBUG | prefect events clients - EventsClient(id=1727037201712) Emit reconnection attempt 0 16 32 09 737 | DEBUG | prefect events clients - EventsClient(id=1727037201712) Sending event id=0199f8ce-d87a-77bd-9594-b39dd867341c 16 32 09
g
@Marvin i have "solved" the issue by putting a sleep on the save task: from prefect import flow, task import httpx import time @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" temps = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0]) print(f"Forecasted temp C: {forecasted_temp} degrees") return forecasted_temp @task def save_weather(temp: str): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float = 38.9, lon: float = -77.0): temp = fetch_weather(lat, lon) result = save_weather(temp) time.sleep(1) return result if name == "__main__": pipeline() Why is that?
m
✅ thought for 25.7 seconds
Great question — the sleep “fix” is a clue. What’s happening - Your flow and both tasks are actually finishing. The “Pending” you see briefly is a UI/state propagation lag. - Locally, the server persists events and artifacts (like the run graph) via background services that batch/flush on an interval. By default, the event persister flushes every ~5 seconds. - Adding
time.sleep(1)
keeps your process alive just a bit longer, giving those background services time to flush updates so the UI graph catches up. Without the delay, the run ends quickly and the UI can momentarily show stale state in the list/graph, even though the task’s detail page shows Completed. You don’t need the sleep — here are better options - Refresh and enable Live updates on the run page; give it a few seconds after the run completes. - Make sure you start the server with services enabled (default). Avoid
--no-services
. - Start it cleanly:
Copy code
uv run prefect server start
- For faster local feedback, reduce the flush interval:
Copy code
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 uv run prefect server start
- Optional: enable task print logging so you see output in the UI immediately:
Copy code
@task(log_prints=True)
  def save_weather(temp: float):
      print(f"Saving {temp}
")
      ...
Why the mismatch between “Pending” list vs. “Completed” details? - The list/graph views depend on aggregated artifacts/events that are flushed on a schedule. - The task detail view queries the task run state directly, which is usually up-to-date first. - If you recently reset the DB while the server was running, that can also exacerbate temporary inconsistencies. If you still see oddities after a clean server start, please share: -
uv run prefect version
- How you’re starting the server (the exact command) - A quick screenshot of the run Graph tab vs. Task Runs tab Docs: - Flows and tasks: Flows and tasks - Settings reference (event persister settings): Settings reference
g
I have solved the database reset problem its working normally i have nuked it. In the UI i dont see any live updates enabler. Can i set this via CLI? PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 is this an env variable that i can set?
@Marvin I have solved the database reset problem its working normally i have nuked it. In the UI i dont see any live updates enabler. Can i set this via CLI? PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 is this an env variable that i can set?
m
✅ thought for 143.0 seconds
Short answers - There isn’t a “Live updates” toggle in the Prefect 3 UI. - Yes, you can control update frequency via settings. And yes,
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1
is a valid env var you can set. What to set - Flush interval (seconds):
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
(default 5) - Batch size:
PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE
(default 20) - Enable/disable event persister:
PREFECT_API_SERVICES_EVENT_PERSISTER_ENABLED
(default True) - Event stream out to UI:
PREFECT_API_EVENTS_STREAM_OUT_ENABLED
(default True) Two ways to apply it Option A — Per run (environment variable)
Copy code
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 uv run prefect server start
Option B — Persist to your Prefect profile (CLI)
Copy code
uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1
uv run prefect config view --show-sources
# restart the server to pick up the new setting
uv run prefect server start
Notes - You can also use the SERVER-prefixed variants (e.g.,
PREFECT_SERVER_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
); the API-prefixed ones work well for local
prefect server start
. - Lowering flush interval increases write frequency; great for local dev, but keep defaults in prod unless you have a reason. Settings reference: Settings reference
g
I have used uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 and it worked. I on Prefect 3. Setting this variable how is it stored? When i kill the cli it will reset too? Also, explain to me what it actually configures
@Marvin I have used uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 and it worked. I on Prefect 3. Setting this variable how is it stored? When i kill the cli it will reset too? Also, explain to me what it actually configures
m
✅ thought for 51.8 seconds
Great questions! How your change is stored -
prefect config set ...
writes the setting to your current Prefect profile and persists it on disk. - Location:
$PREFECT_HOME/profiles.toml
(by default
~/.prefect/profiles.toml
). - It does NOT reset when you close your terminal or kill the CLI; it remains until you change/unset it or switch profiles. - You can confirm the source:
Copy code
uv run prefect config view --show-sources
uv run prefect profile ls
- To unset or change later:
Copy code
uv run prefect config unset PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL -y
# or set a new value
uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=5
What it actually configures -
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
controls how often the server’s Event Persister flushes pending events to the database (in seconds). - The Event Persister batches events (task/flow state changes, logs, artifacts) and flushes when: - either the batch size is reached (
PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE
, default 20) - or the flush interval elapses (default 5s; you set it to 1s) - Lowering the interval makes the UI reflect updates faster, at the cost of more frequent DB writes. It’s a good tweak for local dev. - This is a server-side setting; restart the server to pick up changes:
Copy code
uv run prefect server start
Tips and notes - The setting is saved in the active profile only. If you switch profiles (e.g.,
uv run prefect profile use another-profile
) or run in a container without your local profiles, you won’t see the same behavior unless you set it there too. - You can also use the SERVER-prefixed variant; both are accepted: -
PREFECT_SERVER_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
- Full settings reference: Settings reference - Profiles and settings overview: Settings and profiles
g
@Marvin uv run prefect config set PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL=1 how can i remove this variable via CLI?
m
✅ thought for 9.7 seconds
Use unset to remove it from your active Prefect profile:
Copy code
uv run prefect config unset PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL -y
Then restart your server to apply the change:
Copy code
uv run prefect server start
Verify it’s gone (and back to the default of 5 seconds):
Copy code
uv run prefect config view --show-sources
Notes: - This removes the setting from your current profile (stored in ~/.prefect/profiles.toml by default). - If you had also exported it as an environment variable in your shell, unset that too:
Copy code
unset PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL
Docs: - Settings and profiles - Settings reference
g
Ok got it. About the env variable PREFECT_LOGGING_LOG_PRINTS = "True". Running locally i already get all the INFO logs. Do i need to set this only if im running on the cloud?
@Marvin Ok got it. About the env variable PREFECT_LOGGING_LOG_PRINTS = "True". Running locally i already get all the INFO logs. Do i need to set this only if im running on the cloud?
m
✅ thought for 106.4 seconds
You only need
PREFECT_LOGGING_LOG_PRINTS
if you want plain print() output to show up as Prefect logs. It applies both locally and in Cloud; it’s not Cloud-specific. Key points: - Default is False. By default, print() output is not routed through Prefect logging. - You can enable it globally with the setting: - Temporary (env var):
PREFECT_LOGGING_LOG_PRINTS=true
- Persisted (profile):
uv run prefect config set PREFECT_LOGGING_LOG_PRINTS=true
- Or enable per-flow/task with the decorator option, which overrides the global: -
@flow(log_prints=True)
and/or
@task(log_prints=True)
If you’re already seeing INFO logs, that’s from Prefect’s logger. Set
log_prints
only if you also want your print() statements to appear alongside those logs in the UI. Docs: - Add logging - Settings reference
g
Ok i see it now the difference between the two. Other question about the UI: when i select a run i can click o Parameters tab and i see my parameters: This in fetch_weather
Copy code
{
  "lat": [],
  "lon": []
}
But when i click on Task Runs and select a task and click on the parameters it shows for save_weather: { "temp": [ { "inputType": "task_run", "id": "0199f8ee-4053-7689-b7ac-67156b4c3528" } ] Giving my code: from prefect import flow, task import httpx import time @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" temps = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0]) print(f"Forecasted temp C: {forecasted_temp} degrees") return forecasted_temp @task def save_weather(temp: str): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float = 38.9, lon: float = -77.0): temp = fetch_weather(lat, lon) result = save_weather(temp) return result if name == "__main__": pipeline() I was specting to get the values recieved by fetch_weather about len and lon anda astring for save_weather Why is that? This is what it shows on the flow parameters:
Copy code
{
  "lat": 38.9,
  "lon": -77
}
@Marvin Ok i see it now the difference between the two. Other question about the UI: when i select a run i can click o Parameters tab and i see my parameters: This in fetch_weather
Copy code
{
  "lat": [],
  "lon": []
}
But when i click on Task Runs and select a task and click on the parameters it shows for save_weather: { "temp": [ { "inputType": "task_run", "id": "0199f8ee-4053-7689-b7ac-67156b4c3528" } ] Giving my code: from prefect import flow, task import httpx import time @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" temps = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0]) print(f"Forecasted temp C: {forecasted_temp} degrees") return forecasted_temp @task def save_weather(temp: str): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float = 38.9, lon: float = -77.0): temp = fetch_weather(lat, lon) result = save_weather(temp) return result if name == "__main__": pipeline() I was specting to get the values recieved by fetch_weather about len and lon anda astring for save_weather Why is that? This is what it shows on the flow parameters:
Copy code
{
  "lat": 38.9,
  "lon": -77
}
m
✅ thought for 204.7 seconds
You’re seeing two different things: - Flow run “Parameters” tab: shows the actual values you passed to the flow (e.g., lat=38.9, lon=-77). That’s the true parameter payload of the flow run. - Task run “Parameters” panel: shows provenance (task_inputs), not raw argument values. It answers “where did this input come from?” not “what was its literal value?” What you’re seeing is expected: - save_weather.temp shows:
Copy code
{
  "temp": [
    {
      "inputType": "task_run",
      "id": "<upstream task run id>"
    }
  ]
}
Because
temp
came from the output of the
fetch_weather
task run. That’s lineage: a task_run -> temp. - fetch_weather.lat and fetch_weather.lon show empty arrays:
Copy code
{
  "lat": [],
  "lon": []
}
Because those were plain primitives from your flow parameters with no trackable upstream dependency. Prefect does not dump literal values there; it only records provenance. In some cases you may see
inputType: "parameter"
or
inputType: "constant"
entries, but the UI does not display raw values for task arguments by default (partly for privacy/perf reasons). If you want to see the actual values at the task level: - Log them explicitly (best for local dev):
Copy code
from prefect import task, flow

  @task(log_prints=True)
  def fetch_weather(lat: float, lon: float):
      print(f"lat={lat}, lon={lon}")
      ...
- Or create an artifact with the inputs/outputs you care about:
Copy code
from prefect import artifacts

  @task
  def save_weather(temp: float):
      artifacts.create_markdown(f"Temp: {temp}")
      ...
References: - UI task run schema (task_inputs provenance): UI schemas - Flow/task concepts: Flows and tasks
g
@Marvin from prefect.blocks.system import Secret my_secret_block = Secret(value="chave") my_secret_block.save(name="secret-block") How to do this in the CLI?
m
✅ thought for 61.6 seconds
Short answer: there isn’t a Prefect CLI subcommand to set a Secret’s value inline. The CLI’s “create” command just prints a link to the UI. You have two practical options: Option A — Create via UI link - List block types and find the slug:
Copy code
uv run prefect block type ls
- Generate a creation link for Secret:
Copy code
uv run prefect block create secret
- Click the URL, fill Name = secret-block, Value = chave, Save. - Verify:
Copy code
uv run prefect block inspect secret/secret-block
Option B — One-liner from the shell (Python via uv) - Do the same thing you wrote in code but as a CLI one-liner:
Copy code
uv run python -c "from prefect.blocks.system import Secret; Secret(value='chave').save('secret-block')"
- Verify:
Copy code
uv run prefect block ls
uv run prefect block inspect secret/secret-block
Use in flows:
Copy code
from prefect.blocks.system import Secret
api_key = Secret.load("secret-block").get()
Notes: - The secret value will be encrypted at rest and masked in the UI/CLI. - Be mindful of shell history if you put secrets directly in command lines. Docs: - Blocks concept - Store secrets
g
About persisting results: from prefect import flow, task import pandas as pd @task(persist_result=True) def my_task(): df = pd.DataFrame(dict(a=[2, 3], b=[4, 5])) return df @flow def my_flow(): res = my_task() if name == "__main__": my_flow() What data im storing here? And i head the data is pickled. How can i unpickle it?
@Marvin About persisting results: from prefect import flow, task import pandas as pd @task(persist_result=True) def my_task(): df = pd.DataFrame(dict(a=[2, 3], b=[4, 5])) return df @flow def my_flow(): res = my_task() if name == "__main__": my_flow() What data im storing here? And i head the data is pickled. How can i unpickle it?
m
✅ thought for 236.9 seconds
Great question. In Prefect 3,
@task(persist_result=True)
tells Prefect to serialize the task’s return value and write it to result storage. What that means in practice: What is stored - The returned object (your pandas DataFrame) is serialized and written to a file in your configured result storage. - By default, that’s the local filesystem under ~/.prefect/storage/, using a UUID-like key. - The Prefect database stores only metadata (storage key, serializer, block id), not the actual bytes. How it’s serialized - Default serializer is cloudpickle (i.e., pickled via cloudpickle). - You can change the serializer or storage per-task/flow, or globally (e.g., to S3/GCS/Azure via blocks). - Note: pickled objects are Python/pandas-version dependent; prefer smaller/JSON-serializable results when possible. How to load it back (recommended) - Let Prefect do the unpickling for you. Fetch the task run’s state and ask Prefect to return the Python object:
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.results import get_state_result

async def load_latest_my_task_result():
    async with get_client() as client:
        # get the most recent run of 'my_task'
        trs = await client.read_task_runs(
            task_run_filter={"name": {"any_": ["my_task"]}},
            limit=1,
        )
        state = trs[0].state
        df = await get_state_result(state)  # fetches from storage and deserializes
        return df

df = asyncio.run(load_latest_my_task_result())
print(df)
Manual unpickle (not recommended, but possible) - If you really want to read the bytes yourself, you need the storage key and the serializer used. - Typically the file will be at ~/.prefect/storage/<storage_key>. Then:
Copy code
import os, cloudpickle

path = os.path.expanduser("~/.prefect/storage/<storage_key>")
with open(path, "rb") as f:
    df = cloudpickle.load(f)
Caveats: - The exact on-disk layout and metadata handling are internal details; using
get_state_result
keeps you insulated from changes and works for remote storage too. - Large pickled results can be slow/heavy; consider writing big artifacts (like parquet/CSV to object storage) and returning just a reference/URI. Useful settings and knobs: - Default result storage block: set globally or per task/flow (
result_storage=...
) - Default serializer:
PREFECT_RESULTS_DEFAULT_SERIALIZER
- Persist by default (all tasks):
PREFECT_TASKS_DEFAULT_PERSIST_RESULT=true
Docs: - Results overview: https://docs-3.prefect.io/v3/advanced/results - Serializers API: https://docs-3.prefect.io/v3/api-ref/python/prefect-serializers - Filesystems/blocks: https://docs-3.prefect.io/v3/api-ref/python/prefect-filesystems
g
Automations are acessible running locally?
@Marvin Automations are acessible running locally?
m
✅ thought for 161.9 seconds
Short answer: Automations are a Prefect Cloud feature — they’re not available when running the OSS server locally. What you can do locally instead - Send notifications from your flows using notification blocks (works in OSS): - Slack: SlackWebhook - Email: SendgridEmail or Apprise (SMTP) - Teams, PagerDuty, Opsgenie, etc. Example:
Copy code
from prefect.blocks.notifications import SlackWebhook

  @flow
  def my_flow():
      SlackWebhook.load("my-slack").notify("Flow finished!")
- Use deployment-level triggers in prefect.yaml (these are not Automations, but can react to events like schedule/bad states for a deployment). - Roll your own “automation” in Python by listening for events via API or by adding guards in flows/tasks. References: - Automations (Cloud): Creating automations - Notifications blocks: Notification blocks - Deployment triggers: Deployment triggers
g
@Marvin about Flow code storage. It has some option to use Github. I have a small project in mind, only using the local version of Prefect. Do i have something specific to connect to my github? How this can be helpful?
m
✅ thought for 267.4 seconds
You don’t need anything special to “connect” Prefect to GitHub, even when running locally. In Prefect 3, you point a deployment at a repo using flow.from_source(...). This replaces the old GitHub storage blocks from 2.x. Why it’s helpful (even for small, local projects) - Reproducibility: your deployments always pull the exact code/commit you specify. - Easy updates: push to Git, redeploy to pick up changes. - Works with private repos via credentials; public repos need nothing extra. Basic patterns Run code locally without Git (pure local dev): - Just call your flow or use serve:
Copy code
from my_flow_module import my_flow
my_flow()                 # ad-hoc run
# or
from prefect import flow
@flow
def my_flow(...):
    ...
my_flow.serve(name="local")   # local scheduler, no deployments needed
Create a deployment from GitHub (public repo):
Copy code
from prefect import flow
from prefect.storage import GitRepository

# Option A: direct URL string
flow.from_source(
    source="<https://github.com/owner/repo>",
    entrypoint="path/to/flow_file.py:my_flow",
).deploy(
    name="my-deployment",
    work_pool_name="process-pool"   # local process worker
)

# Option B: explicit GitRepository (choose branch/commit)
repo = GitRepository(
    url="<https://github.com/owner/repo>",
    branch="main",   # or commit_sha="abc123"
)
flow.from_source(repo, "path/to/flow_file.py:my_flow").deploy(
    name="my-deployment",
    work_pool_name="process-pool"
)
Private repositories - Use a token via GitCredentials or a Secret block. Example:
Copy code
from prefect.storage import GitRepository, GitCredentials
from prefect.blocks.system import Secret

# Save your token once
Secret(value="ghp_xxx").save("github-token")

creds = GitCredentials(access_token=Secret.load("github-token").get())
repo = GitRepository(url="<https://github.com/owner/private-repo>", credentials=creds)

flow.from_source(repo, "flows/my_flow.py:my_flow").deploy(
    name="secure-deployment",
    work_pool_name="process-pool"
)
Run it - Start a local work pool and worker to pick up deployments:
Copy code
uv run prefect work-pool create process-pool --type process
uv run prefect worker start -p process-pool
- Then kick off a run from the UI or:
Copy code
uv run prefect deployment run "my-flow/my-deployment"
Notes - No GitHub storage block needed; do not use deprecated 2.x patterns like
Deployment.build_from_flow()
or
prefect deployment build
. - For purely local iteration, you can skip deployments and Git entirely;
flow.serve()
is super convenient for local schedules. - Docs: - Store flow code: Store flow code - Deploy flows: Deployment versioning - GitHub integration notes: prefect-github
g
@Marvin why is it giving me errrors when i try to execute? from prefect import flow from prefect.deployments import run_deployment @flow def run_deployment_from_flow(): print("Running deployment from a flow") run_deployment( name="pipeline/my-first-managed-deployment", parameters={"lat": 1, "lon": 2} ) return if name == "__main__": run_deployment_from_flow() The error: 214640.996 | INFO | Flow run 'malachite-clam' - Finished in state Failed('Flow run encountered an exception: ObjectNotFound: None') Traceback (most recent call last): File "C:\Projetos\magnetron\.venv\Lib\site-packages\prefect\client\orchestration\_deployments\client.py", line 1024, in read_deployment_by_name response = await self.request( ^^^^^^^^^^^^^^^^^^^ ...<6 lines>... ) ^
m
✅ thought for 84.1 seconds
ObjectNotFound means Prefect can’t find a deployment with the name you gave in the API you’re connected to. Checklist to fix: 1) Confirm you’re pointed at the right API (local vs Cloud)
Copy code
uv run prefect config view --show-sources
Make sure
PREFECT_API_URL
is your local server if that’s where you created the deployment (e.g. http://127.0.0.1:4200/api). 2) Verify the deployment actually exists
Copy code
uv run prefect deployment ls
uv run prefect deployment inspect "pipeline/my-first-managed-deployment"
If inspect returns “not found”, the deployment doesn’t exist in this API. Since you “nuked” the DB earlier, any existing deployments were deleted — you’ll need to recreate them. 3) Create (or recreate) the deployment - In Python:
Copy code
from prefect import flow

@flow
def pipeline(lat: float = 38.9, lon: float = -77.0):
    ...

# run once to create the deployment
pipeline.deploy(
    name="my-first-managed-deployment",
    work_pool_name="process-pool",
)
- Or via CLI from your project root:
Copy code
uv run prefect deploy ./path/to/file.py:pipeline --name my-first-managed-deployment
(You’ll still need a work pool/worker; see next step.) 4) Ensure a worker is running for the target work pool
Copy code
uv run prefect work-pool create process-pool --type process
uv run prefect worker start -p process-pool
5) Call run_deployment with the exact name Your code is fine:
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def run_deployment_from_flow():
    run_deployment(
        name="pipeline/my-first-managed-deployment",
        parameters={"lat": 1, "lon": 2},
    )
Tip: If you prefer, you can pass the deployment UUID instead of the “flow-name/deployment-name” string. If this still errors, please share: - Output of
uv run prefect deployment ls
- Output of
uv run prefect config view --show-sources
- How you created the deployment (UI, Python, or CLI) Docs: - Store flow code for deployments - Deployment/versioning guide
g
The project that i want to build i need to download data from two separate apis. This way i can do parallelism between these two apis. This way means that each one of them are a flow, so two separate flows, right? Do i need a flow to envelop these two flows? But also i need to download some other large files that has no dependencies. Should them be separate tasks or flow?
@Marvin The project that i want to build i need to download data from two separate apis. This way i can do parallelism between these two apis. This way means that each one of them are a flow, so two separate flows, right? Do i need a flow to envelop these two flows? But also i need to download some other large files that has no dependencies. Should them be separate tasks or flow?
m
✅ thought for 46.2 seconds
Great question. Short version: start with one parent flow and make each download an independent task; use a task runner to run them in parallel. Make separate flows only when you need separate schedules, ownership, or infrastructure. Recommended structure - Two APIs in parallel (IO-bound): use tasks + ThreadPoolTaskRunner - Large file downloads with no deps: also tasks; submit many in parallel (map over URLs) - Make them separate flows only if you want to: - schedule them independently - run them on different work pools/infrastructure - reuse them across projects - get their own run UI/metrics Example: single flow with parallel tasks (best for local dev)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import httpx

@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def fetch_api1():
    return httpx.get("<https://api.example.com/a>").json()

@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def fetch_api2():
    return httpx.get("<https://api.example.com/b>").json()

@task(retries=2, retry_delay_seconds=5)
def download_file(url: str) -> str:
    # stream to disk; don’t return large bytes
    path = f"data/{url.split('/')[-1]}"
    with httpx.stream("GET", url) as r:
        r.raise_for_status()
        with open(path, "wb") as f:
            for chunk in r.iter_bytes():
                f.write(chunk)
    return path  # return the path, not file contents

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def etl(urls: list[str]):
    a1_f = fetch_api1.submit()
    a2_f = fetch_api2.submit()
    file_fs = [download_file.submit(u) for u in urls]

    a1 = a1_f.result()
    a2 = a2_f.result()
    paths = [f.result() for f in file_fs]
    return a1, a2, paths

if __name__ == "__main__":
    etl(["<https://example.com/large1.bin>", "<https://example.com/large2.bin>"])
When to use separate flows (and optionally run them in parallel) - If API A and API B should be deployed/scheduled separately or run on different workers:
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def orchestrator():
    # kick off two independent deployed flows in parallel
    fr1 = run_deployment(name="api-a-flow/prod")
    fr2 = run_deployment(name="api-b-flow/prod")
    # optionally wait / poll if you need their results
This launches distinct flow runs that can execute concurrently across workers/machines. Additional tips - IO-bound work (HTTP, S3/GCS downloads): ThreadPoolTaskRunner is ideal. - CPU-bound work (parsing, compression): ProcessPoolTaskRunner is better. - Avoid returning large payloads; write to storage and return a reference/path. - Add retries/timeouts to network tasks. - Persisting results: only persist small/structured outputs; for large files, persist the file path or a URI. References: - Flows and tasks: Docs - Run flows in local processes: Guide - Results best practices: Results
g
@Marvin Ok i got it. It is only one flow, with three main tasks. Since these three main tasks are separate apis i can call them in parallel. is this right? And inside the subtasks where i download data i can ise concorrency async
m
✅ thought for 23.3 seconds
Yes—that’s a good plan. - One flow that orchestrates three independent “main” tasks - Run those tasks in parallel (IO‑bound HTTP) with a task runner - Inside each task, you can use async to fan out multiple downloads Pattern A: parallel tasks at the flow level (simple and effective for IO)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import httpx

@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def api_a():
    return httpx.get("<https://api.example.com/a>").json()

@task(retries=2, retry_delay_seconds=5, timeout_seconds=60)
def api_b():
    return httpx.get("<https://api.example.com/b>").json()

@task(retries=2, retry_delay_seconds=5)
def big_file(url: str) -> str:
    path = f"data/{url.split('/')[-1]}"
    with httpx.stream("GET", url) as r:
        r.raise_for_status()
        with open(path, "wb") as f:
            for chunk in r.iter_bytes():
                f.write(chunk)
    return path

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def pipeline(urls: list[str]):
    a_f = api_a.submit()
    b_f = api_b.submit()
    file_fs = [big_file.submit(u) for u in urls]

    a = a_f.result()
    b = b_f.result()
    paths = [f.result() for f in file_fs]
    return a, b, paths
Pattern B: async fan‑out inside a task (useful when each “main task” needs many concurrent calls)
Copy code
import asyncio, httpx
from prefect import task

@task
async def download_many(urls: list[str]) -> list[str]:
    limits = httpx.Limits(max_connections=10)
    timeout = httpx.Timeout(60)
    sem = asyncio.Semaphore(5)

    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        async def fetch(u):
            async with sem:
                path = f"data/{u.split('/')[-1]}"
                async with client.stream("GET", u) as r:
                    r.raise_for_status()
                    with open(path, "wb") as f:
                        async for chunk in r.aiter_bytes():
                            f.write(chunk)
                return path

        return await asyncio.gather(*[fetch(u) for u in urls])
Guidance: - It’s fine to combine both (parallel tasks + internal async), but be mindful of total concurrency so you don’t overwhelm the APIs or your machine. - Use retries/timeouts on network tasks. - For CPU‑bound work (parsing/compression), prefer ProcessPoolTaskRunner. - Return paths/handles for large files instead of the raw bytes. If you share a quick outline of your three APIs and the download counts, I can suggest concrete max_workers, timeouts, and concurrency caps.
g
@Marvin I want to build a new project where i download data from APIs. In it i want one flow that has two main tasks. I want these tasks to run in parallel because theyre from different APIs so its not possible to reach the requests limit of download. One of these tasks, that were are going to call TSE, i download data from 3 different endpoints. They can run in parallel too because theyre a single file each one to download so the limit wont be reached. In the other main task, that were calling Congresso, firstly i download a file thats necessary for the subflows of it. It has two subtasks that recieves data from this first task. The substaks can run in parallel because theyre from different apis so it wont reach a request limite. In substaks A we are going to download from thee different endpoints in parallel because it is a small number of requests to be done, so it wont reach a request limit. Subtask B, in other hand, needs to run in sync. Give me a simple example of how to build this flow following the best practices. Dont stack everything in one single file, separate them in folders and files to make sense
m
✅ thought for 80.6 seconds
Yes — one flow orchestrating three independent downloads is perfect. Run the “main” tasks in parallel, then fan out within each main task as needed. Below is a small, clean project layout that follows Prefect 3 best practices: flows orchestrate, tasks do work, and downloads stream to disk and return paths. Project layout - flows/ - pipeline.py (top-level flow) - tasks/ - tse.py (TSE endpoints) - congresso/ - base.py (manifest prerequisite) - subtask_a.py (parallel endpoints) - subtask_b.py (sequential endpoints) - utils/ - io.py (shared download helper) utils/io.py
Copy code
from pathlib import Path
import httpx

def ensure_dir(path: str | Path) -> Path:
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p

def download_stream(url: str, dest_path: str | Path, timeout: float = 60.0) -> str:
    dest_path = Path(dest_path)
    ensure_dir(dest_path.parent)
    with httpx.stream("GET", url, timeout=timeout) as r:
        r.raise_for_status()
        with open(dest_path, "wb") as f:
            for chunk in r.iter_bytes():
                f.write(chunk)
    return str(dest_path)
tasks/tse.py
Copy code
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream

# Replace these with your real endpoints
TSE_ENDPOINTS = {
    "tse_a": "<https://example.com/tse/a.csv>",
    "tse_b": "<https://example.com/tse/b.csv>",
    "tse_c": "<https://example.com/tse/c.csv>",
}

@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def download_tse_endpoint(name: str, url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()
    dest = Path(out_dir) / f"{name}.csv"
    <http://logger.info|logger.info>(f"Downloading TSE endpoint '{name}' -> {dest}")
    return download_stream(url, dest)
tasks/congresso/base.py
Copy code
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream

# Replace with your real manifest URL
CONGRESSO_MANIFEST_URL = "<https://example.com/congresso/manifest.json>"

@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def download_manifest(out_dir: str = "data/congresso") -> str:
    logger = get_run_logger()
    dest = Path(out_dir) / "manifest.json"
    <http://logger.info|logger.info>(f"Downloading Congresso manifest -> {dest}")
    return download_stream(CONGRESSO_MANIFEST_URL, dest)
tasks/congresso/subtask_a.py
Copy code
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream

# Replace these with your real endpoints
CONGRESSO_A_ENDPOINTS = {
    "a_1": "<https://example.com/congresso/a_1.json>",
    "a_2": "<https://example.com/congresso/a_2.json>",
    "a_3": "<https://example.com/congresso/a_3.json>",
}

@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def download_congresso_a_endpoint(name: str, url: str, manifest_path: str, out_dir: str = "data/congresso/a") -> str:
    # Use manifest_path if you need to read metadata for requests
    logger = get_run_logger()
    dest = Path(out_dir) / f"{name}.json"
    <http://logger.info|logger.info>(f"[A] Downloading {name} using manifest {manifest_path} -> {dest}")
    return download_stream(url, dest)
tasks/congresso/subtask_b.py ``` from pathlib import Path from prefect import task, get_run_logger from utils.io import download_stream # Replace with your real B endpoints that must run sequentially CONGRESSO_B_ENDPOINTS = [ "https://example.com/congresso/b_1.parquet", "https://example.com/congresso/b_2.parquet", ] @task(retries=2, retry_delay_seconds=5, timeout_seconds=600, log_prints=True) def download_congresso_b_sequential(manifest_path: str, out_dir: str = "data/congresso/b") -> list[str]: logger = get_run_logger() results = [] for i, url in enumerate(CONGRESSO_B_ENDPOINTS, start=1): dest = Path(out_dir) / f"b_{i}.parquet"
logger.info(f"[B] Sequential download {i}/{len(CONGRESSO_B_ENDPOINTS)} using manifest {manifest_path} -> {dest}") results.append(download_stream(url, dest)) return results
Copy code
flows/pipeline.py
from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from tasks.tse import TSE_ENDPOINTS, download_tse_endpoint from tasks.congresso.base import download_manifest from tasks.congresso.subtask_a import CONGRESSO_A_ENDPOINTS, download_congresso_a_endpoint from tasks.congresso.subtask_b import download_congresso_b_sequential @flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Starting pipeline") # TSE: three endpoints in parallel tse_fs = [download_tse_endpoint.submit(name, url) for name, url in TSE_ENDPOINTS.items()] # Congresso: first download prerequisite (manifest) manifest_path = download_manifest() # Then A and B in parallel: # - A: three endpoints in parallel a_fs = [download_congresso_a_endpoint.submit(name, url, manifest_path) for name, url in CONGRESSO_A_ENDPOINTS.items()] # - B: must run sequentially, so a single task that loops internally b_f = download_congresso_b_sequential.submit(manifest_path) # Gather results tse_paths = [f.result() for f in tse_fs] a_paths = [f.result() for f in a_fs] b_paths = b_f.result() logger.info(f"TSE files: {tse_paths}") logger.info(f"Congresso A files: {a_paths}") logger.info(f"Congresso B files: {b_paths}") return {"tse": tse_paths, "congresso_a": a_paths, "congresso_b": b_paths} if name == "__main__": pipeline()
Copy code
Why this matches your requirements
- One flow orchestrates everything.
- TSE endpoints run in parallel.
- Congresso downloads the manifest first, then runs Subtask A (parallel) and Subtask B (sequential) at the same time.
- All downloads stream to disk; tasks return file paths, not large payloads.
- ThreadPoolTaskRunner is ideal for IO-bound HTTP work.

Tuning and tips
- Control parallelism by adjusting `max_workers` in ThreadPoolTaskRunner.
- Add rate limiting if you need it later (Prefect concurrency limits or a simple `asyncio.Semaphore` inside a task if you switch to async).
- Put secrets (API keys) in Secret blocks and read them in tasks — avoid Variables for sensitive data.
- For production, consider persisting only small results and store big files in object storage; return URIs instead of local paths.

Run it
- Start your local server (optional if you just want local logs/observability):
uv run prefect server start
Copy code
- Execute the flow:
uv run python flows/pipeline.py
Copy code
- Or serve/schedule locally:
uv run python -c "from flows.pipeline import pipeline; pipeline.serve(name='local-pipeline')" ```
g
So we have a stream download. This is useful for the TSE endpoints because theyre large CSV files. In Congresso i also have large CSVs but i also have tasks where i have to download a single json file or a great number of small json files. So im going to need other functions besides download_stream, right?
@Marvin So we have a stream download. This is useful for the TSE endpoints because theyre large CSV files. In Congresso i also have large CSVs but i also have tasks where i have to download a single json file or a great number of small json files. So im going to need other functions besides download_stream, right?
m
✅ thought for 50.4 seconds
Yes—keep
download_stream
for big CSVs, and add a couple of small JSON helpers so you don’t buffer large payloads unnecessarily or return huge objects from tasks. Here’s a minimal set of utilities I recommend: - download_stream(url, dest_path): for large CSVs or binaries - fetch_json(url): for a single small JSON you want in memory - save_json(url, dest_path): for a single JSON saved to disk - fetch_json_many_async(urls, out_dir, concurrency): for many small JSONs in parallel with an async client - write_ndjson(records, dest_path): if you prefer aggregating many small JSONs into one NDJSON file utils/io.py
Copy code
from __future__ import annotations

import asyncio
import json
from pathlib import Path
import hashlib
import httpx

def ensure_dir(path: str | Path) -> Path:
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p

def download_stream(url: str, dest_path: str | Path, timeout: float = 60.0) -> str:
    dest_path = Path(dest_path)
    ensure_dir(dest_path.parent)
    with httpx.stream("GET", url, timeout=timeout) as r:
        r.raise_for_status()
        with open(dest_path, "wb") as f:
            for chunk in r.iter_bytes():
                f.write(chunk)
    return str(dest_path)

def fetch_json(url: str, timeout: float = 30.0) -> dict:
    with httpx.Client(timeout=timeout) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()

def save_json(url: str, dest_path: str | Path, timeout: float = 30.0) -> str:
    dest_path = Path(dest_path)
    ensure_dir(dest_path.parent)
    with httpx.Client(timeout=timeout) as client:
        r = client.get(url)
        r.raise_for_status()
        data = r.json()
    with open(dest_path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False)
    return str(dest_path)

def write_ndjson(records: list[dict], dest_path: str | Path) -> str:
    dest_path = Path(dest_path)
    ensure_dir(dest_path.parent)
    with open(dest_path, "w", encoding="utf-8") as f:
        for rec in records:
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")
    return str(dest_path)

async def fetch_json_many_async(
    urls: list[str],
    out_dir: str | Path | None = None,
    concurrency: int = 10,
    timeout: float = 30.0,
) -> list[str] | list[dict]:
    """
    - If out_dir is provided, saves each JSON to a file and returns list of file paths.
    - Otherwise returns list of parsed dicts in memory.
    """
    sem = asyncio.Semaphore(concurrency)
    limits = httpx.Limits(max_connections=max(concurrency, 10))
    timeout_cfg = httpx.Timeout(timeout)
    ensure_dir(out_dir) if out_dir else None

    async def one(u: str):
        async with sem:
            async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client:
                r = await client.get(u)
                r.raise_for_status()
                data = r.json()
            if out_dir:
                # deterministic filename by URL hash
                name = hashlib.sha1(u.encode()).hexdigest() + ".json"
                path = Path(out_dir) / name
                with open(path, "w", encoding="utf-8") as f:
                    json.dump(data, f, ensure_ascii=False)
                return str(path)
            else:
                return data

    return await asyncio.gather(*[one(u) for u in urls])
Examples of using these in Prefect tasks - TSE large CSVs: keep streaming to disk (your current use of download_stream is perfect). - Congresso single small JSON: use save_json if you want a file, or fetch_json if you need the content in-memory downstream. - Congresso many small JSONs: use an async task that calls fetch_json_many_async with a concurrency cap. tasks/congresso/subtask_a.py (parallel small JSONs) ``` from prefect import task from utils.io import fetch_json_many_async @task(retries=2, retry_delay_seconds=5) async def download_many_congresso_a(urls: list[str], out_dir: str = "data/congresso/a") -> list[str]:
# writes each JSON to disk; returns list of file paths return await fetch_json_many_async(urls, out_dir=out_dir, concurrency=10, timeout=30.0)
Copy code
tasks/congresso/base.py (single JSON manifest)
from prefect import task from utils.io import save_json @task(retries=2, retry_delay_seconds=5) def download_manifest(url: str, out_dir: str = "data/congresso") -> str: return save_json(url, dest_path=f"{out_dir}/manifest.json", timeout=60.0) ``` Why split like this - You avoid holding big payloads in memory; tasks return paths. - Async client + concurrency for “many small JSONs” is efficient and controllable. - You can tune concurrency easily to respect rate limits. General guidance - Keep using ThreadPoolTaskRunner at the flow level for IO-bound tasks; async tasks will still work fine inside. - Add retries/timeouts on tasks (as shown) and keep per-request timeouts reasonable. - If you later need to aggregate many JSONs, prefer writing NDJSON files with write_ndjson or store them in object storage and return URIs. If you share some endpoint counts and typical sizes, I can suggest good defaults for concurrency and timeouts for each group.
g
@Marvin i want to put all of those configurations about timeout, and workers on easiy configurable place. Also i want to put some basic variables like the base urls. Should i use Prefet Variables or should i put in a toml config file?
m
✅ thought for 91.9 seconds
Great question. Short answer: use a small, versioned config file for your defaults, then allow overrides via env vars and (optionally) Prefect Variables for quick tweaks. Use Secret blocks for any sensitive values. Why this approach - Versioned config (toml/yaml/py) keeps your defaults with the code and survives DB resets. - Env vars let you override per machine/container without code changes. - Prefect Variables are handy for non-sensitive, runtime overrides via UI/CLI — but they live in the Prefect DB, so you’ll lose them if you nuke/reset the DB. - Secrets should never go in Variables; use Secret blocks. Minimal pattern Project layout: - config/ - settings.toml - loader.py - flows/ - pipeline.py config/settings.toml
Copy code
[tse]
base_url = "<https://api.tse.example>"
stream_timeout = 120

[congresso]
base_url = "<https://api.congresso.example>"
json_timeout = 30
json_concurrency = 10

[runner]
max_workers = 16
config/loader.py
Copy code
import os, json
from pathlib import Path
from dataclasses import dataclass
from typing import Any
from prefect.variables import Variable

try:
    import tomllib  # Python 3.11+
except ModuleNotFoundError:
    import tomli as tomllib  # pip install tomli for 3.10-

@dataclass
class Settings:
    tse_base_url: str
    tse_stream_timeout: float
    congresso_base_url: str
    congresso_json_timeout: float
    congresso_json_concurrency: int
    runner_max_workers: int

def _read_toml(path: str | Path) -> dict[str, Any]:
    p = Path(path)
    if not p.exists():
        return {}
    with open(p, "rb") as f:
        return tomllib.load(f)

def _get_var(name: str) -> str | None:
    # Prefect Variables override if set (non-sensitive only)
    try:
        v = Variable.get(name)
        return None if v in ("", None) else v
    except Exception:
        return None

def load_settings(config_path: str | Path = "config/settings.toml") -> Settings:
    cfg = _read_toml(config_path)

    def pick(var_name: str, env_name: str, default: Any) -> Any:
        # priority: Prefect Variable -> ENV -> TOML -> default
        v = _get_var(var_name)
        if v is not None:
            return v
        if (e := os.getenv(env_name)) is not None:
            return e
        return default

    return Settings(
        tse_base_url=str(pick("TSE_BASE_URL", "MYAPP_TSE_BASE_URL", cfg.get("tse", {}).get("base_url", ""))),
        tse_stream_timeout=float(pick("TSE_STREAM_TIMEOUT", "MYAPP_TSE_STREAM_TIMEOUT", cfg.get("tse", {}).get("stream_timeout", 120))),
        congresso_base_url=str(pick("CONGRESSO_BASE_URL", "MYAPP_CONGRESSO_BASE_URL", cfg.get("congresso", {}).get("base_url", ""))),
        congresso_json_timeout=float(pick("CONGRESSO_JSON_TIMEOUT", "MYAPP_CONGRESSO_JSON_TIMEOUT", cfg.get("congresso", {}).get("json_timeout", 30))),
        congresso_json_concurrency=int(pick("CONGRESSO_JSON_CONCURRENCY", "MYAPP_CONGRESSO_JSON_CONCURRENCY", cfg.get("congresso", {}).get("json_concurrency", 10))),
        runner_max_workers=int(pick("RUNNER_MAX_WORKERS", "MYAPP_RUNNER_MAX_WORKERS", cfg.get("runner", {}).get("max_workers", 16))),
    )
flows/pipeline.py (using the settings)
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings

@flow
def pipeline():
    s = load_settings()
    # You can pass these into tasks or construct runners with them
    print("Using:", s)

if __name__ == "__main__":
    s = load_settings()
    pipeline.with_options(task_runner=ThreadPoolTaskRunner(max_workers=s.runner_max_workers))()
Using Prefect Variables for quick overrides (non-sensitive)
Copy code
uv run prefect variable set TSE_BASE_URL <https://api.tse.example>
uv run prefect variable set RUNNER_MAX_WORKERS 32
Unset to fall back to env/TOML/defaults:
Copy code
uv run prefect variable unset RUNNER_MAX_WORKERS -y
Where to put secrets - Use Secret blocks: ``` from prefect.blocks.system import Secret Secret(value="your-token").save("tse-token") # later
token = Secret.load("tse-token").get() ``` When to favor each option - Use TOML (or a Python settings module) for defaults you want in VCS and to survive DB resets. - Use env vars to change behavior per machine/container/CI runner. - Use Prefect Variables for non-sensitive toggles you want to edit quickly in the UI/CLI without code changes. - Use Secret blocks for credentials/tokens. Docs: - Variables: Variables - Settings and profiles: Settings and profiles - Blocks: Blocks If you share which knobs you want to control (timeouts, concurrency, base URLs, etc.), I can tailor the loader and naming to your exact needs.
g
@Marvin About this part: def _get_gar(name: str) -> str | None: # Se as VariĂĄveis do Prefect forem definidas, sobrescrevem try: v = Variable.get(name) return None if v in ("", None) else v except Exception: return None it gives me an error: "*Type "StrictVariableValue | Coroutine[Any, Any, StrictVariableValue]" is not assignable to return type "str | None"* Type "StrictVariableValue | Coroutine[Any, Any, StrictVariableValue]" is not assignable to type "str | None" Type "StrictFloat" is not assignable to type "str | None" "float" is not assignable to "str" "float" is not assignable to "None"" If i pass the return type as Any it resolves. Why and how can i resolve that. Alto explain to me: it says it can return a coroutine. It returns a coroutine only if i define this function as async?
m
✅ thought for 75.7 seconds
Great question — this is a typing nuance with Prefect’s “sync-compatible” APIs and the type of Variables. Why you see that error -
Variable.get(...)
is sync-compatible: its static type is “Sync[T]”, which expands to “T | Coroutine[Any, Any, T]”. - For Prefect Variables, T is
StrictVariableValue
(a union of JSON-like primitives: str | int | float | bool | None). - Your function is annotated to return
str | None
, but the type checker sees “StrictVariableValue | Coroutine[...]” coming back — not assignable to
str | None
. It also flags that the value might be a float/bool, not a string. Does it actually return a coroutine? - Only in async contexts. If you call
Variable.get
inside an async function, you should
await
it. - In sync code, it returns the value directly. The coroutine bit is in the type to support both contexts. Two clean ways to resolve Option A — Return the correct Prefect type and convert later
Copy code
from typing import Optional, cast
from prefect.variables import Variable
from prefect.types import StrictVariableValue

def get_var(name: str) -> Optional[StrictVariableValue]:
    # Cast removes the Sync[...] union for type-checkers
    v = cast(StrictVariableValue, Variable.get(name, default=None))
    return v
Then when you need a string/float/int, convert explicitly:
Copy code
def get_var_str(name: str) -> str | None:
    v = get_var(name)
    return None if v is None else str(v)

def get_var_int(name: str, default: int) -> int:
    v = get_var(name)
    if v is None:
        return default
    if isinstance(v, bool):
        return int(v)
    if isinstance(v, (int, float, str)):
        return int(v)
    raise TypeError(f"Unsupported type for int: {type(v)}")
Option B — Support async usage If you’re calling this inside async code, make the helper async and await:
Copy code
from typing import Optional
from prefect.variables import Variable
from prefect.types import StrictVariableValue

async def aget_var(name: str) -> Optional[StrictVariableValue]:
    return await Variable.get(name, default=None)

async def aget_var_str(name: str) -> str | None:
    v = await aget_var(name)
    return None if v is None else str(v)
Why casting helps - Prefect annotates many functions with a Sync[T] alias (T | Coroutine[..., T]) to be accurate in both sync/async contexts. - In purely sync helpers, using
cast(StrictVariableValue, Variable.get(...))
tells the type checker “treat this like the sync branch,” removing the coroutine from the union. Also note - Prefect Variables can hold numbers and booleans, not just strings. If you want all config as strings, coerce with `str(v)`; if you want typed config, convert thoughtfully (like the int/float helpers above). - If you truly want your helper to return only strings, reflect that in both the conversion and the annotation. Docs: - Variables: https://docs-3.prefect.io/v3/concepts/variables If you share which exact fields you want as str/int/float, I can sketch small typed getters you can drop into your loader.
g
My loader.py is this: import os, json, tomllib from pathlib import Path from dataclasses import dataclass from typing import Any, Optional, cast from prefect.variables import Variable from prefect.types import StrictVariableValue @dataclass class Settings: # TSE tse_base_url: str tse_stream_timeout: float # CONGRESSO congresso_base_url: str congresso_json_timeout: float congresso_json_concurrency: int runner_max_workers: int def _read_toml(path: str | Path) -> dict[str, Any]: p = Path(path) if not p.exists(): return {} with open(p, "rb") as f: return tomllib.load(f) def _get_var(name: str) -> Optional[StrictVariableValue]: # Se as VariĂĄveis do Prefect forem definidas, sobrescrevem try: v = cast(StrictVariableValue, Variable.get(name)) return None if v in ("", None) else v except Exception: return None def load_settings(config_path: str | Path = "config/settings.toml") -> Settings: cfg = _read_toml(config_path) def pick(var_name: str, env_name: str, default: Any) -> Any: # Prioridade: Prefect Variable -> ENV -> TOML -> default v = _get_var(var_name) if v is not None: return v if (e := os.getenv(env_name)) is not None: return e return default return Settings( # GERAL runner_max_workers=int(pick("RUNNER_MAX_WORKERS","MYAPP_RUNNER_MAX_WORKERS", cfg.get("runner", {}).get("max_workers", 16))), # TSE tse_base_url=str(pick("TSE_BASE_URL", "MYAPP_TSE_BASE_URL", cfg.get("tse", {}).get("base_url", ""))), tse_stream_timeout=float(pick("TSE_STREAM_TIMEOUT", "MYAPP_TSE_STREAM_TIMEOUT", cfg.get("tse", {}).get("stream_timeout", 120))), # CONGRESSO congresso_base_url=str(pick("CONGRESSO_BASE_URL","MYAPP_CONGRESSO_BASE_URL", cfg.get("congresso", {}).get("base_url", ""))), congresso_json_timeout=int(pick("CONGRESSO_JSON_TIMEOUT","MYAPP_CONGRESSO_JSON_TIMEOUT", cfg.get("congresso", {}).get("json_timeout", 30))), congresso_json_concurrency=int(pick("CONGRESSO_JSON_CONCURRENCY","MYAPP_CONGRESSO_JSON_CONCURRENCY", cfg.get("congresso", {}).get("json_concurrency", 10))), ) do i still need to convert explicty to other types?
@Marvin My loader.py is this: import os, json, tomllib from pathlib import Path from dataclasses import dataclass from typing import Any, Optional, cast from prefect.variables import Variable from prefect.types import StrictVariableValue @dataclass class Settings: # TSE tse_base_url: str tse_stream_timeout: float # CONGRESSO congresso_base_url: str congresso_json_timeout: float congresso_json_concurrency: int runner_max_workers: int def _read_toml(path: str | Path) -> dict[str, Any]: p = Path(path) if not p.exists(): return {} with open(p, "rb") as f: return tomllib.load(f) def _get_var(name: str) -> Optional[StrictVariableValue]: # Se as VariĂĄveis do Prefect forem definidas, sobrescrevem try: v = cast(StrictVariableValue, Variable.get(name)) return None if v in ("", None) else v except Exception: return None def load_settings(config_path: str | Path = "config/settings.toml") -> Settings: cfg = _read_toml(config_path) def pick(var_name: str, env_name: str, default: Any) -> Any: # Prioridade: Prefect Variable -> ENV -> TOML -> default v = _get_var(var_name) if v is not None: return v if (e := os.getenv(env_name)) is not None: return e return default return Settings( # GERAL runner_max_workers=int(pick("RUNNER_MAX_WORKERS","MYAPP_RUNNER_MAX_WORKERS", cfg.get("runner", {}).get("max_workers", 16))), # TSE tse_base_url=str(pick("TSE_BASE_URL", "MYAPP_TSE_BASE_URL", cfg.get("tse", {}).get("base_url", ""))), tse_stream_timeout=float(pick("TSE_STREAM_TIMEOUT", "MYAPP_TSE_STREAM_TIMEOUT", cfg.get("tse", {}).get("stream_timeout", 120))), # CONGRESSO congresso_base_url=str(pick("CONGRESSO_BASE_URL","MYAPP_CONGRESSO_BASE_URL", cfg.get("congresso", {}).get("base_url", ""))), congresso_json_timeout=int(pick("CONGRESSO_JSON_TIMEOUT","MYAPP_CONGRESSO_JSON_TIMEOUT", cfg.get("congresso", {}).get("json_timeout", 30))), congresso_json_concurrency=int(pick("CONGRESSO_JSON_CONCURRENCY","MYAPP_CONGRESSO_JSON_CONCURRENCY", cfg.get("congresso", {}).get("json_concurrency", 10))), ) do i still need to convert explicty to other types?
m
✅ thought for 27.1 seconds
Short answer: yes, keep doing explicit conversions when you build Settings. Here’s why and a couple tweaks you can make. Why explicit conversion is needed - Env vars are always strings. - Prefect Variables can be str/int/float/bool/None (StrictVariableValue). - TOML gives you typed values, but you’re mixing three sources. Explicitly coercing to the target type at the Settings boundary guarantees consistent types regardless of the source. Two fixes I recommend 1) Use Variable.get(..., default=None) to avoid try/except and keep strict typing
Copy code
from typing import Optional, cast
from prefect.variables import Variable
from prefect.types import StrictVariableValue

def _get_var(name: str) -> Optional[StrictVariableValue]:
    v = cast(StrictVariableValue, Variable.get(name, default=None))
    return None if v in ("", None) else v
- Note: Variable.get is “sync-compatible”, so the cast is just to silence the type checker about the potential coroutine type in async contexts. - If you call this from async code, create an async variant and await it. 2) Make sure your Settings casts match your dataclass annotations You currently have: - congresso_json_timeout annotated as float, but you cast with int(...). Either: - change to float(...) OR - change the dataclass field to int Example with consistent types
Copy code
return Settings(
    runner_max_workers=int(pick("RUNNER_MAX_WORKERS","MYAPP_RUNNER_MAX_WORKERS", cfg.get("runner", {}).get("max_workers", 16))),

    tse_base_url=str(pick("TSE_BASE_URL", "MYAPP_TSE_BASE_URL", cfg.get("tse", {}).get("base_url", ""))),
    tse_stream_timeout=float(pick("TSE_STREAM_TIMEOUT", "MYAPP_TSE_STREAM_TIMEOUT", cfg.get("tse", {}).get("stream_timeout", 120))),

    congresso_base_url=str(pick("CONGRESSO_BASE_URL","MYAPP_CONGRESSO_BASE_URL", cfg.get("congresso", {}).get("base_url", ""))),
    congresso_json_timeout=float(pick("CONGRESSO_JSON_TIMEOUT","MYAPP_CONGRESSO_JSON_TIMEOUT", cfg.get("congresso", {}).get("json_timeout", 30))),
    congresso_json_concurrency=int(pick("CONGRESSO_JSON_CONCURRENCY","MYAPP_CONGRESSO_JSON_CONCURRENCY", cfg.get("congresso", {}).get("json_concurrency", 10))),
)
Optional: typed pick helpers to centralize coercion and error messages
Copy code
def pick_str(var, env, default): 
    v = _get_var(var); 
    if v is not None: return str(v)
    e = os.getenv(env)
    return e if e is not None else str(default)

def pick_int(var, env, default):
    v = _get_var(var)
    if v is not None:
        if isinstance(v, bool):  # avoid True->1 surprises
            raise TypeError(f"{var} should be int, got bool")
        return int(v)
    e = os.getenv(env)
    if e is not None: return int(e)
    return int(default)

def pick_float(var, env, default):
    v = _get_var(var)
    if v is not None:
        if isinstance(v, bool):
            raise TypeError(f"{var} should be float, got bool")
        return float(v)
    e = os.getenv(env)
    if e is not None: return float(e)
    return float(default)
About “it says it can return a coroutine” - Prefect marks some APIs as sync-compatible: their type is T | Coroutine[..., T]. - In sync code, they return T directly; in async code, you must await them. - Your helper is sync, so using cast(...) as shown is the right way to tell the type checker you’re using the sync branch. Bottom line - Keep explicit conversions when building Settings; it’s the safest way to normalize mixed sources. - Fix the float/int mismatch for congresso_json_timeout. - Use Variable.get(..., default=None) to avoid try/except and keep the code tidy. Docs for Variables: Variables
g
I only want a config file as a TOML
@Marvin ok i want only the TOML file as a config file
m
✅ thought for 33.3 seconds
Got it — you want a single TOML file to be the source of truth. Here’s a simple, typed setup that uses only TOML (no env vars, no Prefect Variables), with sensible defaults and clear errors if required values are missing. Project layout - config/ - settings.toml - loader.py config/settings.toml
Copy code
[tse]
base_url = "<https://api.tse.example>"
stream_timeout = 120

[congresso]
base_url = "<https://api.congresso.example>"
json_timeout = 30
json_concurrency = 10

[runner]
max_workers = 16
config/loader.py
Copy code
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any

try:
    import tomllib  # Python 3.11+
except ModuleNotFoundError:
    import tomli as tomllib  # pip install tomli for 3.10

@dataclass
class Settings:
    # TSE
    tse_base_url: str
    tse_stream_timeout: float = 120.0

    # CONGRESSO
    congresso_base_url: str
    congresso_json_timeout: float = 30.0
    congresso_json_concurrency: int = 10

    # Runner
    runner_max_workers: int = 16

def _read_toml(path: str | Path) -> dict[str, Any]:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"Config file not found: {p.resolve()}")
    with open(p, "rb") as f:
        return tomllib.load(f)

def load_settings(config_path: str | Path = "config/settings.toml") -> Settings:
    cfg = _read_toml(config_path)

    # Required keys (fail fast with clear errors)
    try:
        tse_cfg = cfg["tse"]
        congresso_cfg = cfg["congresso"]
    except KeyError as e:
        raise ValueError(f"Missing section in TOML: {e}. Expected [tse], [congresso].") from e

    try:
        tse_base_url = str(tse_cfg["base_url"])
        congresso_base_url = str(congresso_cfg["base_url"])
    except KeyError as e:
        raise ValueError(f"Missing required key: {e}. Ensure base_url is set.") from e

    # Optional keys with defaults from the dataclass
    tse_stream_timeout = float(tse_cfg.get("stream_timeout", Settings.tse_stream_timeout))
    congresso_json_timeout = float(congresso_cfg.get("json_timeout", Settings.congresso_json_timeout))
    congresso_json_concurrency = int(congresso_cfg.get("json_concurrency", Settings.congresso_json_concurrency))
    runner_max_workers = int(cfg.get("runner", {}).get("max_workers", Settings.runner_max_workers))

    return Settings(
        tse_base_url=tse_base_url,
        tse_stream_timeout=tse_stream_timeout,
        congresso_base_url=congresso_base_url,
        congresso_json_timeout=congresso_json_timeout,
        congresso_json_concurrency=congresso_json_concurrency,
        runner_max_workers=runner_max_workers,
    )
Using it in your flow
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings

@flow
def pipeline():
    s = load_settings()
    # pass settings to tasks as needed
    return {"runner": s.runner_max_workers}

if __name__ == "__main__":
    s = load_settings()
    pipeline.with_options(task_runner=ThreadPoolTaskRunner(max_workers=s.runner_max_workers))()
Notes and best practices - Keep secrets out of TOML; use Prefect Secret blocks for tokens/keys and read them in code when needed. - If you want environment-specific configs later, you can add files like config/settings.dev.toml and pass the path to load_settings(...) without changing any other code. - TOML preserves types, but still coerce to float/int at load time to be explicit and future-proof. If you want stricter validation (e.g., URL format checks), I can show a pydantic-based loader too.
g
from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from config.loader import load_settings @flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Iniciando pipeline") if name == "__main__": s = load_settings() pipeline.with_options( task_runner=ThreadPoolTaskRunner(max_workers=s.max_workers) )() This gives me errors: from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from config.loader import load_settings @flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Iniciando pipeline") if name == "__main__": s = load_settings() pipeline.with_options( task_runner=ThreadPoolTaskRunner(max_workers=s.max_workers) )() and ge
@Marvin from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from config.loader import load_settings @flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Iniciando pipeline") if name == "__main__": s = load_settings() pipeline.with_options( task_runner=ThreadPoolTaskRunner(max_workers=s.max_workers) )() This gives me errors: from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from config.loader import load_settings @flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Iniciando pipeline") if name == "__main__": s = load_settings() pipeline.with_options( task_runner=ThreadPoolTaskRunner(max_workers=s.max_workers) )() and annot access attribute "with_options" for class "FunctionType" Attribute "with_options" is unknown
m
✅ thought for 41.4 seconds
You’ve got two separate issues: 1) The logger line has Slack formatting in your Python - Replace this:
Copy code
<http://logger.info|logger.info>("Iniciando pipeline")
- The
<http://logger.info|logger.info>
is Slack’s link syntax and will cause a syntax/type error in Python. 2) with_options “unknown” is a typing/usage mismatch - In Prefect 3,
@flow
returns a Flow object and
with_options(...)
is valid. Some linters/type-checkers still think it’s a plain function. At runtime it works if
pipeline
is actually decorated with
@flow
. - Also, in your Settings dataclass the field is likely
runner_max_workers
(not
max_workers
). Using
s.max_workers
would raise an attribute error. Two clean patterns Pattern A — Compute runner from TOML, then with_options at call time
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings

@flow(log_prints=True)  # no runner here; we’ll set it at call time
def pipeline():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

if __name__ == "__main__":
    s = load_settings()
    runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)
    pipeline.with_options(task_runner=runner)()
If your type checker still complains that
with_options
doesn’t exist, it’s safe to add a cast or ignore:
Copy code
from typing import cast
from prefect.flows import Flow

cast(Flow, pipeline).with_options(task_runner=runner)()
# or: pipeline.with_options(task_runner=runner)()  # type: ignore[attr-defined]
Pattern B — Use TOML at import time and set the runner in the decorator
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings

s = load_settings()
runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)

@flow(task_runner=runner, log_prints=True)
def pipeline():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

if __name__ == "__main__":
    pipeline()
This is simplest if you’re fine loading config at import time. Quick checklist - Make sure
runner_max_workers
in your TOML/Settings matches what you use in code. - Keep the logger line as plain Python. -
with_options
is the right way to override options per call in Prefect 3. Docs: Flows and tasks
g
@Marvin from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from config.loader import load_settings s = load_settings() runner = ThreadPoolTaskRunner(max_workers=s.max_workers) @flow(task_runner=runner, log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Iniciando pipeline") if name == "__main__": pipeline() still got error on @flow(task_runner=runner, log_prints=True) No overloads for "__call__" match the provided argumentsPylancereportCallIssue
m
✅ thought for 43.4 seconds
Two things going on: 1) Remove Slack formatting from your Python You still have Slack’s link markup in the code. Change this line to plain Python:
Copy code
<http://logger.info|logger.info>("Iniciando pipeline")
2) Pylance “No overloads for __call__” is a type-checker false positive In Prefect 3, passing a TaskRunner instance in the decorator is valid, but some editors’ type stubs don’t recognize it. The code will run. You can either suppress the warning or use a pattern that Pylance is happier with. Clean working examples A) Keep runner in the decorator (runtime works; may need a type-ignore)
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings

s = load_settings()
runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)  # ensure field name matches Settings

@flow(task_runner=runner, log_prints=True)  # mypy/pylance may complain, but this runs
def pipeline():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

if __name__ == "__main__":
    pipeline()
If your editor still complains, add:
Copy code
@flow(task_runner=runner, log_prints=True)  # type: ignore[call-arg]
B) Move the runner to with_options at call time (usually quiets the linter)
Copy code
from typing import cast
from prefect import flow, get_run_logger
from prefect.flows import Flow
from prefect.task_runners import ThreadPoolTaskRunner
from config.loader import load_settings

@flow(log_prints=True)
def pipeline():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

if __name__ == "__main__":
    s = load_settings()
    runner = ThreadPoolTaskRunner(max_workers=s.runner_max_workers)
    cast(Flow, pipeline).with_options(task_runner=runner)()
Common gotchas to check - Make sure your Settings field is runner_max_workers (not max_workers) if that’s how you defined it. - Ensure you’re on Prefect 3.x:
Copy code
uv run prefect version
- Don’t include Slack formatting in code blocks you paste into your editor. Docs: - Flows and tasks: Flows and tasks
g
@Marvin ok lets forget about the config file and go back to the file and folder architecture and how i can structure my project. Im goint to tell you the current tasks and dependencies that my project has and you tell me the bests practices to implement them (which folders, how to declare functions) for the desired outcome. I have a flow. In the flow i want to download the TSE endpoints, they are 30 endpoints that can be downloaded in parallel because theyre not going to reach the request limit. Most of them are large zip but also i got a number of small zip ones. The function you gave it to me is fine. The other task is Congresso, where i start downloading Legislatura, a single json file that returns data directly to other two tasks: Deputados and Frentes that are a single json file. How should i structure following best practices the flow and these tasks?
m
✅ thought for 64.4 seconds
Great plan. Here’s a clean, “Prefect 3 best practices” project layout for your case: - One orchestrating flow - Three main areas of work: - TSE: ~30 ZIP endpoints in parallel (stream to disk) - Congresso: first get Legislatura (small JSON), then run Deputados and Frentes in parallel; both depend on Legislatura Recommended layout - flows/ - pipeline.py - tasks/ - tse.py - congresso/ - legislatura.py - deputados.py - frentes.py - utils/ - io.py utils/io.py - Shared helpers for streaming large files and fetching/saving JSON
Copy code
from __future__ import annotations
from pathlib import Path
import json
import httpx

def ensure_dir(path: str | Path) -> Path:
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p

def filename_from_url(url: str) -> str:
    # naive: last path segment
    return url.rstrip("/").split("/")[-1] or "download.bin"

def download_stream(url: str, dest_path: str | Path, timeout: float = 120.0) -> str:
    dest_path = Path(dest_path)
    ensure_dir(dest_path.parent)
    with httpx.stream("GET", url, timeout=timeout) as r:
        r.raise_for_status()
        with open(dest_path, "wb") as f:
            for chunk in r.iter_bytes():
                f.write(chunk)
    return str(dest_path)

def fetch_json(url: str, timeout: float = 30.0) -> dict:
    with httpx.Client(timeout=timeout) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()

def save_json(url: str, dest_path: str | Path, timeout: float = 60.0) -> str:
    data = fetch_json(url, timeout=timeout)
    dest_path = Path(dest_path)
    ensure_dir(dest_path.parent)
    with open(dest_path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False)
    return str(dest_path)
tasks/tse.py - One task per endpoint; keep the list of endpoints here
Copy code
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import download_stream, filename_from_url

# Fill this with your 30 URLs
TSE_ENDPOINTS: list[str] = [
    # "https://.../file1.zip",
    # ...
]

@task(retries=2, retry_delay_seconds=5, timeout_seconds=600, log_prints=True)
def download_tse_zip(url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()
    name = filename_from_url(url)
    dest = Path(out_dir) / name
    <http://logger.info|logger.info>(f"TSE: downloading {url} -> {dest}")
    return download_stream(url, dest)
tasks/congresso/legislatura.py - Small JSON that feeds Deputados/Frentes
Copy code
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import fetch_json

# Replace with real endpoint
LEGISLATURA_URL = "<https://api.congresso.example/legislatura>"

@task(retries=2, retry_delay_seconds=5, timeout_seconds=60, log_prints=True)
def fetch_legislatura() -> dict:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Congresso: fetching Legislatura")
    return fetch_json(LEGISLATURA_URL, timeout=60.0)
tasks/congresso/deputados.py - Depends on Legislatura; save to disk or return JSON. Below: save to disk.
Copy code
from pathlib import Path
from prefect import task, get_run_logger
from <http://utils.io|utils.io> import save_json

# Template; build the actual URL from legislatura fields
def deputados_url(legislatura: dict) -> str:
    # e.g., f"<https://api.congresso.example/deputados?legislatura={legislatura['id']}>"
    return "<https://api.congresso.example/deputados>"

@task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True)
def fetch_deputados(legislatura: dict, out_dir: str = "data/congresso") -> str:
    logger = get_run_logger()
    url = deputados_url(legislatura)
    dest = Path(out_dir) / "deputados.json"
    <http://logger.info|logger.info>(f"Congresso: fetching Deputados from {url} -> {dest}")
    return save_json(url, dest, timeout=60.0)
tasks/congresso/frentes.py - Also depends on Legislatura; run in parallel with Deputados ``` from pathlib import Path from prefect import task, get_run_logger from utils.io import save_json # Template; build the actual URL from legislatura fields
def frentes_url(legislatura: dict) -> str: # e.g., f"https://api.congresso.example/frentes?legislatura={legislatura['id']}" return "https://api.congresso.example/frentes" @task(retries=2, retry_delay_seconds=5, timeout_seconds=120, log_prints=True) def fetch_frentes(legislatura: dict, out_dir: str = "data/congresso") -> str: logger = get_run_logger() url = frentes_url(legislatura) dest = Path(out_dir) / "frentes.json" logger.info(f"Congresso: fetching Frentes from {url} -> {dest}") return save_json(url, dest, timeout=60.0)
Copy code
flows/pipeline.py
- Orchestrate everything; TSE downloads run in parallel; Legislatura first, then Deputados and Frentes in parallel
from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from tasks.tse import TSE_ENDPOINTS, download_tse_zip from tasks.congresso.legislatura import fetch_legislatura from tasks.congresso.deputados import fetch_deputados from tasks.congresso.frentes import fetch_frentes @flow(task_runner=ThreadPoolTaskRunner(max_workers=16), log_prints=True) def pipeline(): logger = get_run_logger() logger.info("Starting pipeline") # TSE: fan out ~30 endpoints in parallel tse_fs = [download_tse_zip.submit(url) for url in TSE_ENDPOINTS] # Congresso: fetch Legislatura first (dependency) legislatura = fetch_legislatura() # Then Deputados and Frentes in parallel (both depend on Legislatura) deputados_f = fetch_deputados.submit(legislatura) frentes_f = fetch_frentes.submit(legislatura) # Gather results tse_paths = [f.result() for f in tse_fs] deputados_path = deputados_f.result() frentes_path = frentes_f.result() logger.info(f"TSE files: {len(tse_paths)}") logger.info(f"Deputados -> {deputados_path}") logger.info(f"Frentes -> {frentes_path}") return { "tse": tse_paths, "congresso_deputados": deputados_path, "congresso_frentes": frentes_path, } if name == "__main__": pipeline() ``` Notes and best practices - Keep big files streamed to disk; return file paths from tasks. - Keep small JSON in-memory only when you need it downstream; otherwise save to disk (as above). - Add retries/timeouts to all network tasks (already shown). - Control concurrency with ThreadPoolTaskRunner(max_workers=...). Even with “no request limit”, a sane cap (e.g., 10–20) prevents local resource exhaustion. - Keep URLs and per-API assembly logic near each task module (as shown with deputados_url/frentes_url) so dependency wiring stays obvious. - If any endpoints need auth, load credentials from Secret blocks inside the task. If you drop in your real URLs/parameterization (especially how Legislatura shapes Deputados/Frentes URLs), I can fill those helper functions precisely.
g
Ok now im executing successfully. But im getting errors on some endpoints. I would like to reexecute only those tasks to troubleshoot. What is the patters? Do i need to create a deploy to run only some tasks?
@Marvin Ok now im executing successfully. But im getting errors on some endpoints. I would like to reexecute only those tasks to troubleshoot. What is the patters? Do i need to create a deploy to run only some tasks?
m
✅ thought for 276.6 seconds
Great to hear you’re executing successfully! Short answer: you don’t need a deployment to re-run only some work. In Prefect 3.x there isn’t a built-in “re-run just this failed task” button after a flow run completes. The common pattern is to start a new flow run that targets only the endpoints you care about, and design your flow to skip or cache already-successful work. Recommended patterns: 1) Parameterize which endpoints to run - Add a parameter (e.g.,
only
) and use native Python filtering so a new run can execute just a subset. - Combine with task retries for transient failures.
Copy code
from datetime import timedelta
from prefect import flow, task
from prefect.cache_policies import CachePolicy

@task(
    retries=2,
    retry_delay_seconds=10,
    cache_policy=CachePolicy(),
    cache_expiration=timedelta(days=1),
)
def call_endpoint(name: str):
    # call your endpoint here
    ...

@flow
def sync_all(endpoints: list[str], only: list[str] | None = None):
    targets = [e for e in endpoints if not only or e in only]
    # Either map or loop — both are fine
    call_endpoint.map(targets)
- First run:
Copy code
sync_all(endpoints=ALL_ENDPOINTS)
- Re-run only failing ones:
Copy code
sync_all(endpoints=ALL_ENDPOINTS, only=["/users", "/orders"])
2) Use result caching to avoid re-running successful work - With the
CachePolicy
above, when you re-run the flow with the same inputs, Prefect loads cached results for tasks that already succeeded and re-executes only the ones that need work. - If you want to force re-execution of a task even if there’s a cache, set
refresh_cache=True
on the task (in the decorator) temporarily, or adjust your cache key policy. - Docs: Result caching 3) Subflows per unit of work - Wrap each endpoint in its own subflow, then call just the ones you want in a subsequent run.
Copy code
from prefect import flow

@flow
def sync_one(endpoint: str):
    call_endpoint(endpoint)

@flow
def sync_all(endpoints: list[str], only: list[str] | None = None):
    for e in endpoints:
        if not only or e in only:
            sync_one(e)
4) Retries for transient errors - Configure retries on tasks so transient issues are automatically retried during the same run. - Docs: Task retries Do I need a deployment? - No. You can just call the flow function with the
only
parameter for ad-hoc re-runs. - Deployments are optional but handy if you want to trigger these filtered re-runs from the UI/CLI on demand or on a schedule. If you already have a deployment, you can pass parameters via CLI:
Copy code
prefect deployment run my-flow/my-deployment -p only='["/users","/orders"]' --watch
- Deployments overview: Deployments UI note - There’s no one-click “retry this single task” after a flow run completes. The idiom is to start a new run that targets the subset of work you want, using parameters and caching to avoid redoing everything. If you can share how you’re structuring the endpoints today (e.g., a list you map over vs. individual tasks), I can tailor the pattern and code to your setup.
g
Abou the name of tasks. Im getting generic names like "download_tse_endpoint-252". How can i customize the name of the task to show on the UI?
@Marvin Abou the name of tasks. Im getting generic names like "download_tse_endpoint-252". How can i customize the name of the task to show on the UI?
m
✅ thought for 242.5 seconds
You can customize task names shown in the UI with the
@task
decorator: -
name=
sets the static task name (the node label) -
task_run_name=
sets the dynamic run name for each invocation (including mapped runs) Examples 1) Static task name + dynamic run name from parameters
Copy code
from prefect import task, flow

@task(
    name="Download TSE endpoint",
    task_run_name="Download {endpoint}"   # uses the task parameter names
)
def download_tse_endpoint(endpoint: str):
    ...

@flow
def sync(endpoints: list[str]):
    # Each mapped run is named "Download <endpoint>"
    download_tse_endpoint.map(endpoints)
2) Set/override per call with with_options
Copy code
download_tse_endpoint.with_options(
    task_run_name="Download {endpoint}"
).map(endpoints)
3) Use a callable for complex naming
Copy code
def run_name(params: dict) -> str:
    ep = params.get("endpoint", "unknown")
    return f"Download {ep}"

@task(task_run_name=run_name)
def download_tse_endpoint(endpoint: str):
    ...
Notes - The placeholders in
task_run_name
refer to the task’s parameter names (top-level fields). - Prefect may append a short suffix for uniqueness (e.g.,
-252
). You can’t remove it, but putting your informative part first makes names clear in the UI. - You can also set flow run names similarly:
@flow(flow_run_name="Sync {date}")
. References (implementation details) - resolve_custom_task_run_name: source - Task engine setting custom names: source - Task decorator definition: source If you share your current task signature (parameter names), I can suggest an exact
task_run_name
string for your case.
g
@Marvin About renaming the task name in my code i have the endpoints. I want to use the keys, that names the endpoints to use in the name of the task. Something like: fetch_tse_*candidatos_2018*: from pathlib import Path from prefect import task, get_run_logger from utils.io import download_stream from utils.br_data import BR_STATES, ELECTIONS_YEARS # MONTAR ENDPOINT REDES SOCIAIS POR ESTADO REDES_SOCIAIS_ENDPOINTS = { f"redes_sociais_{year}_{state}": f"https://cdn.tse.jus.br/estatistica/sead/odsele/consulta_cand/rede_social_candidato_{year}_{state}.zip" for state in BR_STATES for year in ELECTIONS_YEARS } TSE_ENDPOINTS = { "candidatos_2018": "https://cdn.tse.jus.br/estatistica/sead/odsele/consulta_cand/consulta_cand_2018.zip", "candidatos_2022": "https://cdn.tse.jus.br/estatistica/sead/odsele/consulta_cand/consulta_cand_2022.zip", "prestaca_contas_2018": "https://cdn.tse.jus.br/estatistica/sead/odsele/prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2018.zip", "prestacao_contas_2022": "https://cdn.tse.jus.br/estatistica/sead/odsele/prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2022.zip", "resultado_eleicao_2018": "https://cdn.tse.jus.br/estatistica/sead/odsele/votacao_candidato_munzona/votacao_candidato_munzona_2018.zip", "resultado_eleicao_2022": "https://cdn.tse.jus.br/estatistica/sead/odsele/votacao_candidato_munzona/votacao_candidato_munzona_2022.zip" } TSE_ENDPOINTS = TSE_ENDPOINTS | REDES_SOCIAIS_ENDPOINTS @task(retries=5, retry_delay_seconds=5, timeout_seconds=120, log_prints=True) def fetch_tse(name: str, url: str, out_dir: str = "data/tse") -> str: logger = get_run_logger() dest = Path(out_dir) / f"{name}.csv" logger.info(f"Fazendo download do endpoint TSE '{name}' -> {dest}") return download_stream(url, dest)
m
✅ thought for 36.5 seconds
Totally doable. Use
task_run_name
to format the run name from your
name
parameter so each run shows the endpoint key in the UI. Option A: set it on the task decorator
Copy code
from pathlib import Path
from prefect import task, get_run_logger

@task(
    name="Fetch TSE endpoint",                 # static node label
    task_run_name="fetch_tse_{name}",          # dynamic run name per call
    retries=5, retry_delay_seconds=5, timeout_seconds=120, log_prints=True
)
def fetch_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()
    dest = Path(out_dir) / f"{name}.csv"
    <http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
    return download_stream(url, dest)
Then map over your dict so
name
and
url
align:
Copy code
from prefect import flow, unmapped

@flow
def sync_all():
    names, urls = zip(*TSE_ENDPOINTS.items())   # preserves pairing
    fetch_tse.map(name=names, url=urls, out_dir=unmapped("data/tse"))
Option B: set it at call-site with with_options (no change to decorator)
Copy code
from prefect import unmapped

names, urls = zip(*TSE_ENDPOINTS.items())
fetch_tse.with_options(task_run_name="fetch_tse_{name}").map(
    name=names, url=urls, out_dir=unmapped("data/tse")
)
Notes - The placeholders in
task_run_name
must match your task’s parameter names (
name
,
url
, etc.). Top-level keys only. - Prefect may append a short suffix for uniqueness (e.g.,
-252
), which is expected. - You can also use a callable for complex formatting:
Copy code
def rn(params: dict) -> str:
    return f"fetch_tse_{params['name']}"
fetch_tse = fetch_tse.with_options(task_run_name=rn)
Relevant internals if you’re curious: - Task decorator signature (supports
task_run_name
): source - Name resolution: resolve_custom_task_run_name
g
@Marvin This is in my main.py: from flows.pipeline import pipeline def main(): pipeline() if name == "__main__": main() This is the pipeline.py: from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from datetime import date, datetime from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) def pipeline(date_today: date = datetime.now().date()): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [extract_tse.submit(name, url) for name, url in TSE_ENDPOINTS.items()] # CONGRESSO legislatura = extract_legislatura() deputados_f = extract_deputados.submit(legislatura) frentes_f = extract_frentes.submit(legislatura) # Reunindo os resultados tse_paths = [f.result() for f in tse_fs] deputados_path = deputados_f.result() frentes_path = frentes_f.result() logger.info(f"Arquivos TSE: {len(tse_paths)}") logger.info(f"Deputados -> {deputados_path}") logger.info(f"Frentes -> {frentes_path}") return { "tse": tse_paths, "congresso_deputados": deputados_path, "congresso_frentes": frentes_path } If i want to create a deploy should i create it in main?
m
✅ thought for 53.9 seconds
Short answer: don’t create the deployment in main. In Prefect 3.x you create a deployment for the flow function itself (your
pipeline
in
flows/pipeline.py
) and reference that entrypoint with the CLI. Suggested setup - Keep
main.py
for local dev runs only (your
if __name__ == "__main__": pipeline()
is fine). The deployment won’t use `main.py`; it imports your flow directly from
flows/pipeline.py
. Create a deployment from project root
Copy code
# From the root of your repo (where ./flows/pipeline.py exists)
prefect deploy ./flows/pipeline.py:pipeline -n pipeline-dev
Start a worker to pick up runs
Copy code
prefect worker start -p <your-work-pool>
Trigger the deployment
Copy code
# Run now and watch logs
prefect deployment run pipeline/pipeline-dev --watch

# Pass parameters if needed (example):
prefect deployment run pipeline/pipeline-dev -p date_today='"2025-10-21"'
Notes - The
ENTRYPOINT
format is
path/to/file.py:flow_function_name
(so:
./flows/pipeline.py:pipeline
). - You can create multiple deployments for the same flow (e.g., dev/prod, different work pools, schedules). - If you ever want to define a deployment in code (e.g., from a remote repo), use the Prefect 3.x pattern:
Copy code
from prefect import flow

flow.from_source(
    "<https://github.com/owner/repo>",
    entrypoint="flows/pipeline.py:pipeline",
).deploy(name="pipeline-dev", work_pool_name="<pool-name>")
Docs: Deployments in Prefect 3.x If you share your work pool name and how you plan to schedule it, I can give you an exact
prefect deploy
command tailored to your setup.
g
@Marvin given my pipeline.py, how it should be a simple deployment?
m
✅ thought for 81.4 seconds
Here’s a simple, minimal deployment for your
pipeline
flow in Prefect 3.x. Assumptions: - Your flow is defined at
./flows/pipeline.py
as
@flow def pipeline(...)
. - You want to run it on a local Process worker (default/simple option). Step 1) Create a work pool (once)
Copy code
prefect work-pool create "process-pool" --type process
Step 2) Create the deployment from your project root - The entrypoint format is
path/to/file.py:flow_function_name
Copy code
prefect deploy ./flows/pipeline.py:pipeline -n pipeline-dev
The CLI will prompt you for a work pool if not provided; choose
process-pool
. Step 3) Start a worker to pick up runs
Copy code
prefect worker start -p process-pool
Step 4) Run the deployment (and watch logs)
Copy code
prefect deployment run pipeline/pipeline-dev --watch
Pass parameters (e.g., your
date_today
flow parameter) if needed:
Copy code
prefect deployment run pipeline/pipeline-dev -p date_today='"2025-10-22"' --watch
Notes: - You don’t deploy `main.py`; deployments target the flow entrypoint directly (
./flows/pipeline.py:pipeline
). Keep
main.py
for local dev runs if you like. - Your
ThreadPoolTaskRunner
in the flow is fine; it will be honored when the flow runs from a deployment. - You can add a schedule later with
--cron
,
--interval
, etc., or via the UI. Docs: - Deployments: Prefect 3.x deployments - Work pools and workers: Work pools and workers
g
@Marvin Should i do this on pipeline.py? if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" ) Im running with uv run .\src\main.py command but deployment istn working.
m
✅ thought for 237.4 seconds
Short answer: don’t put serve() in pipeline.py if you’re running main.py. The main block in pipeline.py won’t execute when you start main.py. Use one of these: Option A: Serve (embedded runner, no worker needed) - Put serve in main.py so it actually runs:
Copy code
from flows.pipeline import pipeline

if __name__ == "__main__":
    # Creates a deployment and starts a local runner that polls for runs
    pipeline.serve(
        name="deploy-1",
        # optional: schedule it
        # interval=3600,  # run every hour
        # or cron="0 7 * * *",
    )
Then start it:
Copy code
uv run src/main.py
Notes: - serve() blocks and keeps running. It registers the deployment and polls for scheduled/manual runs. - To run it immediately without a schedule, trigger it from another terminal or the UI:
Copy code
prefect deployment run pipeline/deploy-1 --watch
- No worker/work pool required for serve; the process running serve executes the runs. - By default, the deployment pauses on shutdown; if you want it to stay active after the process ends, set
pause_on_shutdown=False
. Option B: Classic deployment + worker - Create a work pool and deployment, then start a worker:
Copy code
prefect work-pool create process-pool --type process
prefect deploy ./flows/pipeline.py:pipeline -n deploy-1
prefect worker start -p process-pool
prefect deployment run pipeline/deploy-1 --watch
Common gotchas - Calling
pipeline()
in main.py executes the flow directly and bypasses deployments. Use
pipeline.serve(...)
(Option A) or create a deployment via CLI (Option B). - If you put
if __name__ == "__main__": pipeline.serve(...)
in pipeline.py, you must run that file directly for it to execute; running main.py won’t trigger pipeline.py’s main block. - If serve is running but “nothing happens,” you likely haven’t scheduled or triggered a run yet. Use a schedule in serve() or
prefect deployment run ...
. Docs - Serve flows (embedded runner): Run flows in local processes - Create deployments: Create deployments If you tell me which option you prefer (serve vs worker), I’ll tailor the exact commands and file changes for your setup.
g
@Marvin I want to define the value of cache_policy in my TOML config file: from pathlib import Path from prefect import task, get_run_logger from prefect.cache_policies import INPUTS from utils.io import download_stream from utils.br_data import BR_STATES, ELECTIONS_YEARS from config import loader APP_SETTINGS = loader.load_config() # MONTAR ENDPOINT REDES SOCIAIS POR ESTADO REDES_SOCIAIS_ENDPOINTS = { f"redes_sociais_{year}_{state}": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/rede_social_candidato_{year}_{state}.zip" for state in BR_STATES for year in ELECTIONS_YEARS } TSE_ENDPOINTS = { "candidatos_2018": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2018.zip", "candidatos_2022": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2022.zip", "prestaca_contas_2018": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2018.zip", "prestacao_contas_2022": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2022.zip", "resultado_eleicao_2018": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2018.zip", "resultado_eleicao_2022": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2022.zip" } TSE_ENDPOINTS = TSE_ENDPOINTS | REDES_SOCIAIS_ENDPOINTS @task( retries=APP_SETTINGS.TSE.RETRIES, retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY, timeout_seconds=APP_SETTINGS.TSE.TIMEOUT, log_prints=True, cache_policy=INPUTS ) def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str: logger = get_run_logger() dest = Path(out_dir) / f"{name}.csv" logger.info(f"Fazendo download do endpoint TSE '{name}' -> {dest}") return download_stream(url, dest) But how can i do it if the value of inptus comes from from prefect.cache_policies import INPUTS?
m
✅ thought for 47.8 seconds
Great question. Since TOML can’t import Python symbols, the usual pattern is: - Store a simple string in your TOML (e.g., "INPUTS", "NONE", "DEFAULT", etc.) - In your config loader, map that string to the actual Prefect object (e.g., the
INPUTS
constant or a
CachePolicy()
instance) - Use the mapped object in your
@task(cache_policy=...)
Example TOML (e.g., config.toml)
Copy code
[tse]
retries = 5
retry_delay = 5
timeout = 120
cache_policy = "INPUTS"          # or "NONE", "DEFAULT", "FLOW_PARAMETERS", "TASK_SOURCE", "RUN_ID"
cache_expiration_seconds = 86400 # optional, if you also set cache_expiration
Config loader mapping
Copy code
from datetime import timedelta
from prefect.cache_policies import (
    CachePolicy,
    INPUTS,
    NONE,          # aka NO_CACHE in some contexts
    DEFAULT,
    TASK_SOURCE,
    FLOW_PARAMETERS,
    RUN_ID,
)

_CACHE_POLICY_MAP = {
    "INPUTS": INPUTS,
    "NONE": NONE,
    "NO_CACHE": NONE,
    "DEFAULT": DEFAULT,
    "TASK_SOURCE": TASK_SOURCE,
    "FLOW_PARAMETERS": FLOW_PARAMETERS,
    "RUN_ID": RUN_ID,
}

def coerce_cache_policy(policy_name: str | None):
    if policy_name is None:
        return None  # means "NotSet" if you don’t pass cache_policy
    if isinstance(policy_name, str):
        key = policy_name.strip().upper()
        if key in _CACHE_POLICY_MAP:
            return _CACHE_POLICY_MAP[key]
        raise ValueError(f"Unknown cache_policy '{policy_name}'. "
                         f"Expected one of: {', '.join(_CACHE_POLICY_MAP)}")
    # You could also allow pre-built CachePolicy objects here if needed
    raise TypeError(f"cache_policy must be a string or None, got {type(policy_name)}")
Use in your settings/model
Copy code
class TSEConfig(BaseModel):
    RETRIES: int
    RETRY_DELAY: int
    TIMEOUT: int
    CACHE_POLICY: str | None = None
    CACHE_EXPIRATION_SECONDS: int | None = None

    @property
    def cache_policy_obj(self):
        return coerce_cache_policy(self.CACHE_POLICY)

    @property
    def cache_expiration_td(self):
        return timedelta(seconds=self.CACHE_EXPIRATION_SECONDS) if self.CACHE_EXPIRATION_SECONDS else None
Task usage
Copy code
APP_SETTINGS = loader.load_config()

@task(
    retries=APP_SETTINGS.TSE.RETRIES,
    retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
    log_prints=True,
    cache_policy=APP_SETTINGS.TSE.cache_policy_obj,
    cache_expiration=APP_SETTINGS.TSE.cache_expiration_td,
    task_run_name="fetch_tse_{name}",
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()
    dest = Path(out_dir) / f"{name}.csv"
    <http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
    return download_stream(url, dest)
Notes -
task(cache_policy=...)
expects a
CachePolicy
or one of the provided constants (e.g.,
INPUTS
). Mapping strings from TOML to those objects is the safest approach. - If you need a fully custom
CachePolicy
(e.g., specific
key_storage
or
isolation_level
), put the parameters in TOML and build a
CachePolicy(...)
in your loader when
cache_policy = "CUSTOM"
. - Caching docs (for more detail): Result caching in Prefect 3.x If you share your loader’s structure, I can fold the mapping logic directly into it.
g
@Marvin i want to cache the results of downloaded large size files, more than 1gb. Is this ok?
m
✅ thought for 47.4 seconds
Yes—with a couple of important caveats it’s a good fit. What “caching” means here - Prefect caches the task’s return value, not the file contents. If your task returns a small string (e.g., the destination path or S3 URI), caching is efficient even when the actual file is >1GB. Recommendations for large files - Return a small value: return the file path/URI (e.g.,
"data/tse/candidatos_2018.csv"
or
"<s3://bucket/key.csv>"
), not the bytes. - Keep
cache_policy=INPUTS
so identical inputs skip re-downloads. Consider adding
cache_expiration
to bound staleness. - Ensure results are persisted so the cache can be reused across runs. Either: - set
persist_result=True
on the task, or - enable globally with
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
. - Make the file location durable: - If you run everything on one machine (serve or a single worker), a local path is fine. - If you run on multiple machines/workers, store the file in shared storage (e.g., S3/GCS) and return that URI so downstream steps can access it anywhere. Practical pattern
Copy code
from datetime import timedelta
from pathlib import Path
from prefect import task, get_run_logger
from prefect.cache_policies import INPUTS

@task(
    retries=APP_SETTINGS.TSE.RETRIES,
    retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
    log_prints=True,
    persist_result=True,                  # make cache reusable across runs
    cache_policy=INPUTS,                  # cache by inputs (name, url, out_dir)
    cache_expiration=timedelta(days=30),  # optional TTL for freshness
    task_run_name="fetch_tse_{name}",
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()
    dest = Path(out_dir) / f"{name}.csv"

    # Local idempotency guard: if file already exists, skip network download
    if dest.exists() and dest.stat().st_size > 0:
        <http://logger.info|logger.info>(f"Arquivo jĂĄ existe, reutilizando: {dest}")
        return str(dest)

    <http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
    return download_stream(url, dest)
Optional: smarter cache busting - If the remote file can change at the same URL, you can include a version signal (e.g., ETag or Last-Modified) in the cache key via
cache_key_fn
. That way a changed upstream invalidates the cache. Trade-offs to be aware of - A cached success does not verify the file still exists on disk. If the file gets deleted, the cached run will still “succeed.” The existence check above mitigates this. - If you use multiple workers, a local path returned from one machine won’t exist on another. Prefer object storage for the actual file and return the object URI. Docs - Result caching - Result persistence and storage If you share whether you’re running on a single machine or multiple workers (and whether you have S3/GCS), I can tailor the storage/result settings exactly.
g
@Marvin See my code: from pathlib import Path from prefect import task, get_run_logger from datetime import timedelta from utils.io import download_stream from utils.br_data import BR_STATES, ELECTIONS_YEARS from config.loader import load_config, CACHE_POLICY_MAP APP_SETTINGS = load_config() # MONTAR ENDPOINT REDES SOCIAIS POR ESTADO REDES_SOCIAIS_ENDPOINTS = { f"redes_sociais_{year}_{state}": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/rede_social_candidato_{year}_{state}.zip" for state in BR_STATES for year in ELECTIONS_YEARS } TSE_ENDPOINTS = { "candidatos_2018": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2018.zip", "candidatos_2022": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2022.zip", "prestaca_contas_2018": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2018.zip", "prestacao_contas_2022": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2022.zip", "resultado_eleicao_2018": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2018.zip", "resultado_eleicao_2022": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2022.zip" } TSE_ENDPOINTS = TSE_ENDPOINTS | REDES_SOCIAIS_ENDPOINTS @task( retries=APP_SETTINGS.TSE.RETRIES, retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY, timeout_seconds=APP_SETTINGS.TSE.TIMEOUT, log_prints=True, cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY], cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION) ) def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str: logger = get_run_logger() dest = Path(out_dir) / f"{name}.csv" logger.info(f"Fazendo download do endpoint TSE '{name}' -> {dest}") return download_stream(url, dest) *What i want is have a custom name for the task so i can distinguish each task easily on the UI. For example, currently i have something like "*extract_tse-34b" but i want the name of the key string in the list used to download the files, like "*candidatos_2018" so it would be "extract_tse_candidatos_2018-34b"*
m
✅ thought for 29.6 seconds
Yes — you can set the task run name from the
name
parameter so each run shows the key in the UI. Add
task_run_name="extract_tse_{name}"
to your task decorator:
Copy code
from pathlib import Path
from prefect import task, get_run_logger
from datetime import timedelta

from <http://utils.io|utils.io> import download_stream
from config.loader import load_config, CACHE_POLICY_MAP

APP_SETTINGS = load_config()

@task(
    retries=APP_SETTINGS.TSE.RETRIES,
    retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
    log_prints=True,
    cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
    cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION),
    task_run_name="extract_tse_{name}",   # <- dynamic run name per invocation
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()
    dest = Path(out_dir) / f"{name}.csv"
    <http://logger.info|logger.info>(f"Fazendo download do endpoint TSE '{name}' -> {dest}")
    return download_stream(url, dest)
Then, when you call/submit the task like:
Copy code
# inside your flow
for key, url in TSE_ENDPOINTS.items():
    extract_tse.submit(name=key, url=url)
each task run will appear as
extract_tse_<key>-<suffix>
, e.g.,
extract_tse_candidatos_2018-34b
. Notes - The placeholders in
task_run_name
must match the task’s parameter names (
name
,
url
, ...). Here
{name}
is the dict key you’re passing. - Prefect appends a short suffix (e.g.,
-34b
) for uniqueness — that’s expected. - If you prefer to set this at call time (without changing the decorator), you can do:
Copy code
extract_tse.with_options(task_run_name="extract_tse_{name}").submit(name=key, url=url)
For complex formatting/truncation, you can use a callable:
Copy code
def rn(params: dict) -> str:
    raw = params["name"]
    short = raw[:60]
    return f"extract_tse_{short}"

extract_tse = extract_tse.with_options(task_run_name=rn)
g
@Marvin Ok it worked! Now i want a more complex logic to cache. I have some endpoints where i download the data based on the year. So i would download the CSV from 2023, 2024 and 2025. Since 2023 and 2024 are already past i would need it only the CSV from the current year. How could i cache only the CSVs from the past years?
m
✅ thought for 68.0 seconds
Yes — you can make caching conditional on the year. There are two straightforward patterns in Prefect 3.x: Pattern 1: Choose options per call with with_options - Cache past years (long TTL) - Disable caching for the current year
Copy code
from datetime import date, timedelta
import re

CURRENT_YEAR = date.today().year

def extract_year_from_name(name: str) -> int | None:
    # e.g., "candidatos_2018", "redes_sociais_2024_SP"
    m = re.search(r"\b(20\d{2})\b", name)
    return int(m.group(1)) if m else None

def cache_opts_for_year(year: int | None):
    if year is None:
        # no year in name -> default behavior
        return {}
    if year < CURRENT_YEAR:
        # cache past years
        return {
            "cache_policy": CACHE_POLICY_MAP["INPUTS"],
            "cache_expiration": timedelta(days=365*5),
            "persist_result": True,
        }
    else:
        # do not cache current year
        return {
            "cache_policy": CACHE_POLICY_MAP["NO_CACHE"],
            # OR: "refresh_cache": True,
        }

# inside your flow
for name, url in TSE_ENDPOINTS.items():
    year = extract_year_from_name(name)
    opts = cache_opts_for_year(year)
    extract_tse.with_options(**opts).submit(name=name, url=url)
Pattern 2: Pass year as an explicit parameter If you already know the year, add it to the task signature to avoid parsing from the name:
Copy code
@task(
    # defaults here are fine — we’ll override per call
    cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
    cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION),
    task_run_name="extract_tse_{name}",
)
def extract_tse(name: str, url: str, year: int | None = None, out_dir: str = "data/tse") -> str:
    ...
Then:
Copy code
for name, url in TSE_ENDPOINTS.items():
    year = extract_year_from_name(name)
    opts = cache_opts_for_year(year)
    extract_tse.with_options(**opts).submit(name=name, url=url, year=year)
Notes - NO_CACHE vs refresh_cache: - NO_CACHE cleanly disables caching for that call. - refresh_cache=True forces bypass even if a cache exists; useful if you want to “always re-download” for current year. - Keep returning a small value (the file path/URI), not the file bytes. - Add a local existence guard to skip re-downloads regardless of Prefect cache: - if the file already exists and is non-empty, just return the path. - If you run on multiple machines, store files in shared/object storage and return the URI. Config suggestion (TOML) - You can keep your global defaults and still override per-call:
Copy code
[tse]
cache_policy = "INPUTS"              # default for past years
cache_expiration = 3650              # 10 years
no_cache_policy = "NO_CACHE"         # used for current year
Then in loader:
Copy code
def cache_opts_for_year(year):
    if year is None or year < CURRENT_YEAR:
        return {
            "cache_policy": CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
            "cache_expiration": timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION),
            "persist_result": True,
        }
    else:
        return {
            "cache_policy": CACHE_POLICY_MAP[APP_SETTINGS.TSE.NO_CACHE_POLICY],
        }
Docs: result caching in Prefect 3.x - Result caching If you share your naming scheme for all endpoints, I can provide an exact
extract_year_from_name
tailored to your keys.
g
@Marvin If a Deployment is running and i change my appsettings config cache i need to restart de deployment. [TSE] BASE_URL = "https://cdn.tse.jus.br/estatistica/sead/odsele/" RETRIES = 5 RETRY_DELAY = 5 # Segundos TIMEOUT = 120 # Segundos CACHE_POLICY = "NO_CACHE" CACHE_EXPIRATION = 90 # Dias Is there a pattern to deal with this?
m
✅ thought for 275.2 seconds
Short answer: it depends on how you run the deployment and where you read config. - If you’re using flow.serve: that process is long‑lived. Since your code does
APP_SETTINGS = load_config()
at import time, changes in TOML won’t be picked up until you restart the serve process. Pattern: load config at run time (inside the flow) instead of at import, or restart serve after changing config. - If you’re using deploy + worker: each flow run starts in a fresh process, so if your code reads the TOML at run time, new runs will pick up the change without redeploy/restart. If you bake config into a Docker image, you’ll need to rebuild or mount the config. Best-practice patterns to avoid restarts/redeploys 1) Load config at runtime, not at import - Move
APP_SETTINGS = load_config()
into your flow function (or a small helper that’s called at the start of each run). This ensures each run reads the latest config. - Avoid baking config into decorators. In your code,
cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY]
is evaluated at import time and “freezes” that value. Prefer to set task options per call with
with_options(...)
. 2) Make cache policy dynamic per run - Remove cache-related arguments from the decorator and set them at submit time, using the config you load at the start of the flow.
Copy code
from datetime import timedelta
from prefect import task, flow
from prefect.cache_policies import INPUTS, NO_CACHE

@task(
    retries=5,
    retry_delay_seconds=5,
    timeout_seconds=120,
    log_prints=True,
    task_run_name="extract_tse_{name}",
)
def extract_tse(name: str, url: str, out_dir: str="data/tse") -> str:
    ...

@flow
def pipeline(...):
    settings = load_config()  # read TOML per run
    cache_policy = CACHE_POLICY_MAP[settings.TSE.CACHE_POLICY]  # e.g. "NO_CACHE" -> NO_CACHE
    cache_exp = timedelta(days=settings.TSE.CACHE_EXPIRATION)

    for name, url in TSE_ENDPOINTS.items():
        extract_tse.with_options(
            cache_policy=cache_policy,
            cache_expiration=cache_exp,
            persist_result=True,   # if you want cross-run reuse
        ).submit(name=name, url=url)
- Now you can change the TOML and the next run will use the new values, even with a running worker. For flow.serve, this also works as long as you read config in the flow, not at import. 3) Use flow parameters to override per run - Expose caching knobs as flow parameters and set them when you trigger a run. No redeploy needed for per‑run overrides:
Copy code
@flow
def pipeline(cache_policy: str = "NO_CACHE", cache_expiration_days: int = 90):
    cache_policy_obj = CACHE_POLICY_MAP[cache_policy]
    cache_exp = timedelta(days=cache_expiration_days)
    ...
Trigger with:
Copy code
prefect deployment run pipeline/deploy-1 -p cache_policy='"INPUTS"' -p cache_expiration_days=365 --watch
4) Use Prefect Variables or job variables for runtime config - Prefect Variables: set in UI or via API, then read at runtime. Change anytime; no redeploy.
Copy code
from prefect.variables import Variable
policy_name = Variable.get("TSE_CACHE_POLICY", default="NO_CACHE")
exp_days = int(Variable.get("TSE_CACHE_EXPIRATION_DAYS", default="90"))
Docs: Prefect Variables - Job variables: pass env vars per run that your loader reads:
Copy code
prefect deployment run pipeline/deploy-1 \
  -jv TSE_CACHE_POLICY='"INPUTS"' \
  -jv TSE_CACHE_EXPIRATION_DAYS=365 \
  --watch
Docs: Customize job variables When you DO need a restart or redeploy - serve(): restart the process if config is loaded at import time or if you changed code. If you move config reads inside the flow, you don’t need to restart for TOML changes.
- If you changed task decorator arguments (e.g., you edited
@task(cache_policy=...)
in code), that’s a code change. For worker-based deployments, new runs pick it up automatically if you’re running from source. For Docker/K8s images or pinned-source deployments, rebuild/redeploy. - If your deployment pins parameters or a specific image/revision, changing those defaults requires a new deployment version. In short: refactor to read config at run time and set task options with
with_options(...)
or flow parameters. Then you won’t need to restart the deployment to apply caching changes.
g
@Marvin RunnerSettings Settings for controlling runner behavior
process_limit
Maximum number of processes a runner will execute in parallel. Type: `integer` Default: `5` TOML dotted key path: `runner.process_limit` Supported environment variables: `PREFECT_RUNNER_PROCESS_LIMIT` Setting this on my prefect.toml will make my task runner in my flow: @flow( task_runner=ThreadPoolTaskRunner(max_workers=16), # type: ignore log_prints=True ) follow whats defined in the toml?
m
✅ thought for 40.2 seconds
Short answer: No.
runner.process_limit
does not change your flow’s
ThreadPoolTaskRunner(max_workers=16)
. What each knob controls -
runner.process_limit
(setting or
PREFECT_RUNNER_PROCESS_LIMIT
): caps how many flow runs the embedded Runner will execute in parallel when you use
flow.serve(...)
. It’s about concurrent flow runs, not tasks within a flow. Docs: process_limit - `ThreadPoolTaskRunner(max_workers=...)`: controls task-level concurrency inside a single flow run. This is set by your code and is not overridden by
runner.process_limit
. Implications - If you’re using
flow.serve
, you can have: - up to
runner.process_limit
concurrent flow runs in that one serve process - and inside each flow run, up to
max_workers
concurrent tasks (your 16) - If you’re using worker-based deployments (prefect deploy + worker),
runner.process_limit
does not apply. Workers typically execute one flow run per process; scale by running more workers or using work pool concurrency limits. If you want to make
max_workers
configurable via TOML, read it in your own config and pass it to the task runner:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

APP_SETTINGS = load_config()

@flow(
    task_runner=ThreadPoolTaskRunner(
        max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS  # from your TOML
    ),
    log_prints=True,
)
def pipeline(...):
    ...
Useful links - Runner (serve) behavior: Run flows in local processes - process_limit setting: Runner process_limit
g
@Marvin
@Marvin How to clear the cache from my results?
m
✅ thought for 299.1 seconds
There isn’t a single “clear cache” button in Prefect 3.x, but you have several reliable ways to invalidate or bypass cached results. Use the one that fits your scenario: Quick ways to force fresh runs - Per-task, one-off: - Set
refresh_cache=True
on the task (decorator or via
with_options
) to bypass and overwrite any existing cache for that task. - Example:
Copy code
extract_tse.with_options(refresh_cache=True).submit(name=..., url=...)
- Globally for a run: - Set env var
PREFECT_TASKS_REFRESH_CACHE=true
for the process running your flow. All tasks will refresh their cache. Disable caching - Per-task call: - Use
cache_policy=NO_CACHE
via
with_options(...)
for a specific run:
Copy code
from prefect.cache_policies import NO_CACHE
  extract_tse.with_options(cache_policy=NO_CACHE).submit(...)
- Globally: - Set
PREFECT_TASKS_DEFAULT_NO_CACHE=true
or
PREFECT_TASKS_DISABLE_CACHING=true
in the environment to disable caching across tasks. Expire or invalidate by configuration - cache_expiration: Reduce TTL so caches expire sooner:
Copy code
extract_tse.with_options(cache_expiration=timedelta(days=1)).submit(...)
- Change inputs or code: With the DEFAULT/INPUTS policy, changing task inputs or the task’s source code will produce a new cache key automatically. Delete stored cache/results - Local default result store is typically at
~/.prefect/storage/
(can be changed via
PREFECT_LOCAL_STORAGE_PATH
). Deleting entries here removes stored results and cache records. This is the blunt instrument—safe but removes everything. - If you configured a custom
CachePolicy(key_storage=...)
or remote result storage (S3/GCS), clear the corresponding keys/objects there to invalidate caches. - There’s no built-in CLI like
prefect cache clear
. If needed, you can script deletion. Operational tips - If you’re using flow.serve, remember it’s a long-running process. Environment variable changes apply when you restart that process. - For worker-based deployments, each run is a fresh process. Env vars and code changes take effect on the next run. - Prefer targeted invalidation (refresh_cache or NO_CACHE) over deleting the entire store unless you truly want to wipe all caches. Docs for reference - Caching concepts: Caching - Advanced caching: Advanced caching - Results and storage: Results If you tell me whether you’re running locally with serve or via workers, and where your results are stored (local vs S3/GCS), I can give you exact commands/steps to clear them.
g
@Marvin I got run_logger on my tasks: from pathlib import Path from prefect import task, get_run_logger from utils.io import fetch_json, save_json from config.loader import load_config APP_SETTINGS = load_config() def frentes_url(legislatura: dict) -> str: id_legislatura = legislatura["dados"][0]["id"] return f"{APP_SETTINGS.CAMARA.REST_BASE_URL}/frentes?idLegislatura={id_legislatura}" @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) def extract_frentes(legislatura: dict, out_dir: str = "data/camara") -> str: logger = get_run_logger() url = frentes_url(legislatura) dest = Path(out_dir) / "frentes.json" logger.info(f"Congresso: buscando Frentes de {url} -> {dest}") json = fetch_json(url) return save_json(json, dest) Do i need it if i set [logging] level = "INFO" log_prints = "True" on my prefect.toml?
m
✅ thought for 175.0 seconds
Short answer: you don’t need
get_run_logger()
if you’re happy using
print()
and you’ve enabled print-capture correctly. With log_prints on,
print()
output is sent to Prefect logs. Key points: - Your current code uses
<http://logger.info|logger.info>(...)
. That does require
get_run_logger()
. The
log_prints
setting does not create a logger; it only captures `print()`/stdout/stderr. - If you want to drop
get_run_logger()
, replace the log line with
print(...)
and keep
log_prints
enabled. Fix your prefect.toml (TOML booleans are unquoted and lowercase):
Copy code
[logging]
level = "INFO"
log_prints = true
If you leave it as
log_prints = "True"
, it won’t enable print capture. When to prefer each: - Use
get_run_logger()
when you want structured, Prefect-aware logs (correct run/task context, log levels, works even if
log_prints
is off). - Use
print()
if you just want simple messages and have
log_prints = true
. Per-task override is also available:
Copy code
@task(log_prints=True)
def my_task():
    print("this will be captured even if global log_prints is false")
More details: - Add logging to flows/tasks: Add logging - Advanced logging (levels, extra loggers for third‑party libraries): Advanced logging customization - Settings reference (including
logging.log_prints
): Settings reference
g
@Marvin from pathlib import Path from prefect import task, get_run_logger from typing import cast from utils.io import fetch_json_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def frentes_membros_urls(frentes_ids: list[str]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str: logger = get_run_logger() urls = frentes_membros_urls(frentes_ids) logger.info(f"Congresso: buscando Membros de {len(urls)} Frentes") jsons = fetch_json_many_async( urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, follow_pagination=True ) dest = Path(out_dir) return save_ndjson(cast(list[dict], jsons), dest) fetch_json_many_async IS ASYNC. Do i need to set async await in the extract_frentes_membros or do i need to set something when calling the function in the pipeline?
m
✅ thought for 395.6 seconds
Great question. Since
fetch_json_many_async
is async, you must either await it or run it via a sync bridge. You have two good options: Option A — Make the task async and await inside
Copy code
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger

from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config

APP_SETTINGS = load_config()

def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
    return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()
    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    jsons = await fetch_json_many_async(
        urls,
        concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
        timeout=APP_SETTINGS.CAMARA.TIMEOUT,
        follow_pagination=True,
    )
    dest = Path(out_dir)
    return save_ndjson(cast(list[dict], jsons), dest)
How to call it: - In an async flow, you can either: - Run concurrently:
fut = extract_frentes_membros.submit(...); result = fut.result()
- Or directly await:
result = await extract_frentes_membros(...)
Option B — Keep the task sync and bridge the coroutine
Copy code
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.utilities.asyncutils import run_coro_as_sync

from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config

APP_SETTINGS = load_config()

def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
    return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()
    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    jsons = run_coro_as_sync(
        fetch_json_many_async(
            urls,
            concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
            timeout=APP_SETTINGS.CAMARA.TIMEOUT,
            follow_pagination=True,
        )
    )
    dest = Path(out_dir)
    return save_ndjson(cast(list[dict], jsons), dest)
Notes: - Don’t call the async function without awaiting/bridging; it will just return a coroutine and not execute. - You do not need to set anything special when calling the task from the flow. Use
.submit()
(returns a PrefectFuture; call
.result()
to get the value) or call the task directly depending on whether your flow is async and whether you want concurrency. References: - Tasks and async patterns: Tasks concept - Running work concurrently with submit/result: Run work concurrently - Async utilities (
run_coro_as_sync
): Async utilities API
g
@Marvin this is how im doing in the pipeline. Show me following whats being done: from prefect import flow, get_run_logger, serve from prefect.task_runners import ThreadPoolTaskRunner from datetime import date, datetime from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) def pipeline( date: date = datetime.now().date(), refresh_cache: bool = False ): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [ extract_tse .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) frentes_f = extract_frentes.submit(legislatura) # Reunindo os resultados tse_paths = [f.result() for f in tse_fs] deputados_path = deputados_f.result() frentes_path = frentes_f.result() logger.info(f"Arquivos TSE: {len(tse_paths)}") logger.info(f"Deputados -> {deputados_path}") logger.info(f"Frentes -> {frentes_path}") return { "tse": tse_paths, "congresso_deputados": deputados_path, "congresso_frentes": frentes_path } if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" )
m
✅ thought for 237.0 seconds
Got it — you’re running a synchronous flow with a ThreadPoolTaskRunner and using .submit()/result() for concurrency. To keep that pattern, define your async-based task as a synchronous Prefect task that bridges the coroutine. That way your flow stays sync and your call sites don’t change. Here’s the task following your style:
Copy code
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.utilities.asyncutils import run_coro_as_sync

from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config

APP_SETTINGS = load_config()

def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
    return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()
    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    # Bridge the async coroutine into this sync task
    jsons = run_coro_as_sync(
        fetch_json_many_async(
            urls,
            concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
            timeout=APP_SETTINGS.CAMARA.TIMEOUT,
            follow_pagination=True,
        )
    )
    dest = Path(out_dir)
    return save_ndjson(cast(list[dict], jsons), dest)
And here’s how you’d call it from your existing pipeline (unchanged call pattern):
Copy code
# ... your imports and flow definition

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS),  # type: ignore
    log_prints=True,
)
def pipeline(
    date: date = datetime.now().date(),
    refresh_cache: bool = False,
):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

    # TSE in parallel
    tse_fs = [
        # NOTE: pass function parameters to .submit; use with_options() only for task OPTIONS
        extract_tse.submit(name, url, refresh_cache=refresh_cache)
        for name, url in TSE_ENDPOINTS.items()
    ]

    # CONGRESSO
    legislatura = extract_legislatura(date)
    deputados_f = extract_deputados.submit(legislatura)
    frentes_f = extract_frentes.submit(legislatura)

    # If you have frentes_ids available, submit the async-backed task as usual
    # frentes_ids = ...  # derive this from your data
    # frentes_membros_f = extract_frentes_membros.submit(frentes_ids)

    # Gather results
    tse_paths = [f.result() for f in tse_fs]
    deputados_path = deputados_f.result()
    frentes_path = frentes_f.result()
    # frentes_membros_path = frentes_membros_f.result()

    <http://logger.info|logger.info>(f"Arquivos TSE: {len(tse_paths)}")
    <http://logger.info|logger.info>(f"Deputados -> {deputados_path}")
    <http://logger.info|logger.info>(f"Frentes -> {frentes_path}")
    # <http://logger.info|logger.info>(f"Frentes Membros -> {frentes_membros_path}")

    return {
        "tse": tse_paths,
        "congresso_deputados": deputados_path,
        "congresso_frentes": frentes_path,
        # "congresso_frentes_membros": frentes_membros_path,
    }
Why this works: - The task remains synchronous (so your sync flow + .submit/.result pattern is unchanged). - The async I/O is handled inside the task by
run_coro_as_sync
, which safely runs the coroutine. Two small tips: - Use
.submit(..., refresh_cache=refresh_cache)
to pass function parameters.
with_options()
is for task options (name, retries, tags, timeout, etc.), not function args. - If you decide to convert the flow to async later, you can also make the task async and
await
internally; then call it with
.submit()
or
await task(...)
from the async flow. Docs: - Running work concurrently: Run work concurrently
- Async utilities (run_coro_as_sync): Async utilities API - Tasks concept: Tasks concept
g
@Marvin
@Marvin legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) frentes_f = extract_frentes.submit(legislatura) frentes_membros_f = extract_frentes_membros(frentes_f) # Reunindo os resultados tse_paths = [f.result() for f in tse_fs] deputados_path = deputados_f.result() frentes_path = frentes_f.result() This way i have to repeat a lot of code. Is there a cleaner way?
m
✅ thought for 82.3 seconds
Yes — use resolve_futures_to_results to resolve many futures at once and keep your code DRY. It preserves the input structure (lists, dicts, tuples), so you can build a single object and resolve it in one call. Example applied to your flow:
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS),  # type: ignore
    log_prints=True,
)
def pipeline(date: date = datetime.now().date(), refresh_cache: bool = False):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

    # TSE in parallel
    tse_fs = [
        extract_tse.submit(name, url, refresh_cache=refresh_cache)
        for name, url in TSE_ENDPOINTS.items()
    ]

    # CONGRESSO
    legislatura = extract_legislatura(date)
    deputados_f = extract_deputados.submit(legislatura)
    frentes_f = extract_frentes.submit(legislatura)

    # If your task accepts the resolved frentes output, you can pass the future;
    # Prefect will resolve it as an upstream dependency
    frentes_membros_f = extract_frentes_membros.submit(frentes_f)

    # Resolve everything at once, preserving structure
    results = resolve_futures_to_results({
        "tse": tse_fs,
        "congresso_deputados": deputados_f,
        "congresso_frentes": frentes_f,
        "congresso_frentes_membros": frentes_membros_f,
    })

    <http://logger.info|logger.info>(f"Arquivos TSE: {len(results['tse'])}")
    <http://logger.info|logger.info>(f"Deputados -> {results['congresso_deputados']}")
    <http://logger.info|logger.info>(f"Frentes -> {results['congresso_frentes']}")
    <http://logger.info|logger.info>(f"Frentes Membros -> {results['congresso_frentes_membros']}")

    return results
Notes: - Prefer
.submit(...)
for tasks you want to run concurrently. Passing a future as an argument is fine — Prefect wires the dependency and resolves it before the downstream task runs. -
resolve_futures_to_results
works with nested structures, so you can add more futures without growing boilerplate. Refs: - Futures helpers: Run work concurrently - API: prefect.futures
g
frentes_membros_f = extract_frentes_membros.submit(frentes_f) Argument of type "PrefectFuture[list[str]]" cannot be assigned to parameter "frentes_ids" of type "list[str]" in function "submit" "PrefectFuture[list[str]]" is not assignable to "list[str]"Pylance*reportArgumentType* (variable) frentes_f: PrefectFuture[list[str]]
@Marvin frentes_membros_f = extract_frentes_membros.submit(frentes_f) Argument of type "PrefectFuture[list[str]]" cannot be assigned to parameter "frentes_ids" of type "list[str]" in function "submit" "PrefectFuture[list[str]]" is not assignable to "list[str]"Pylance*reportArgumentType* (variable) frentes_f: PrefectFuture[list[str]]
m
✅ thought for 44.2 seconds
You’re doing the right thing by passing the upstream future; Prefect will resolve it at runtime. The error is just the static type checker (PyLance) not knowing that Prefect accepts futures as inputs. Three clean fixes: 1) Relax the task’s type annotation to accept a PrefectFuture
Copy code
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.futures import PrefectFuture

@task(...)
def extract_frentes_membros(
    frentes_ids: list[str] | PrefectFuture[list[str]],
    out_dir: str | Path = "data/camara",
) -> str:
    ...
Then this is type-checker happy:
Copy code
frentes_membros_f = extract_frentes_membros.submit(frentes_f)
2) Cast at the call site (minimal change)
Copy code
from typing import Any, cast

frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
3) Ignore the warning inline (PyLance/Pyright)
Copy code
frentes_membros_f = extract_frentes_membros.submit(frentes_f)  # pyright: ignore[reportArgumentType]
And to reduce the repeated
.result()
calls, resolve everything at once with structure-preserving helpers:
Copy code
from prefect.futures import resolve_futures_to_results

results = resolve_futures_to_results({
    "tse": tse_fs,
    "congresso_deputados": deputados_f,
    "congresso_frentes": frentes_f,
    "congresso_frentes_membros": frentes_membros_f,
})
References: - Run work concurrently - prefect.futures API - Tasks concept
g
@Marvin Explain to me again: from prefect import flow, get_run_logger, serve from prefect.task_runners import ThreadPoolTaskRunner from datetime import date, datetime from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from tasks.camara.frentes_membros import extract_frentes_membros from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) def pipeline( date: date = datetime.now().date(), refresh_cache: bool = False ): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [ extract_tse .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) frentes_f = extract_frentes.submit(legislatura) frentes_membros_f = extract_frentes_membros.submit(frentes_f) results = { "tse": tse_fs, "congresso_deputados": deputados_f, "congresso_frentes": frentes_f, "congresso_frentes_membros": frentes_membros_f, } if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" ) Error on frentes_membros_f = extract_frentes_membros.submit(frentes_f): Argument of type "PrefectFuture[list[str]]" cannot be assigned to parameter "frentes_ids" of type "list[str]" in function "submit" "PrefectFuture[list[str]]" is not assignable to "list[str]"Pylance*reportArgumentType* (variable) frentes_f: Any from pathlib import Path from prefect import task, get_run_logger from prefect.futures import PrefectFuture from typing import cast from utils.io import fetch_json_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def frentes_membros_urls(frentes_ids: list[str]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str: logger = get_run_logger() urls = frentes_membros_urls(frentes_ids) logger.info(f"Congresso: buscando Membros de {len(urls)} Frentes") jsons = fetch_json_many_async( urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, follow_pagination=True ) dest = Path(out_dir) return save_ndjson(cast(list[dict], jsons), dest)
m
✅ thought for 37.0 seconds
Thanks for the full snippet — here’s what’s going on and how to fix it cleanly. What the PyLance error means - Prefect lets you pass an upstream
PrefectFuture
as an argument to another task. At runtime, Prefect resolves the future before running the downstream task — so your call
extract_frentes_membros.submit(frentes_f)
is functionally correct. - The error you see is just static type checking (PyLance) complaining because your task parameter is annotated as
list[str]
while you’re passing a
PrefectFuture[list[str]]
. Three ways to quiet the type checker Pick one — the behavior at runtime is the same. 1) Allow the task to accept the future type in its signature
Copy code
from prefect.futures import PrefectFuture

@task(...)
def extract_frentes_membros(
    frentes_ids: list[str] | PrefectFuture[list[str]],
    out_dir: str | Path = "data/camara",
) -> str:
    ...
2) Cast at the call site (keeps task signature “pure”)
Copy code
from typing import Any, cast

frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
3) Ignore the warning inline
Copy code
frentes_membros_f = extract_frentes_membros.submit(frentes_f)  # pyright: ignore[reportArgumentType]
Important fix: your async call inside the task
fetch_json_many_async
is async; in a synchronous task you must bridge it. Use `run_coro_as_sync`:
Copy code
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger
from prefect.utilities.asyncutils import run_coro_as_sync

from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config

APP_SETTINGS = load_config()

def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
    return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()
    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    jsons = run_coro_as_sync(
        fetch_json_many_async(
            urls,
            concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
            timeout=APP_SETTINGS.CAMARA.TIMEOUT,
            follow_pagination=True,
        )
    )
    dest = Path(out_dir)
    return save_ndjson(cast(list[dict], jsons), dest)
Clean up your flow call sites - Don’t use
with_options
to pass function arguments. Use `.submit(..., refresh_cache=refresh_cache)`;
with_options
is for task options (name, retries, tags, etc.), not function args. - To avoid repetitive
.result()
calls, resolve everything at once and keep your code DRY.
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS),  # type: ignore
    log_prints=True,
)
def pipeline(date: date = datetime.now().date(), refresh_cache: bool = False):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Iniciando pipeline")

    # TSE in parallel
    tse_fs = [
        extract_tse.submit(name, url, refresh_cache=refresh_cache)
        for name, url in TSE_ENDPOINTS.items()
    ]

    # CONGRESSO
    legislatura = extract_legislatura(date)
    deputados_f = extract_deputados.submit(legislatura)
    frentes_f = extract_frentes.submit(legislatura)

    # Passing a future to another task is fine; Prefect resolves it
    frentes_membros_f = extract_frentes_membros.submit(frentes_f)

    # Resolve everything at once (preserves structure)
    results = resolve_futures_to_results({
        "tse": tse_fs,
        "congresso_deputados": deputados_f,
        "congresso_frentes": frentes_f,
        "congresso_frentes_membros": frentes_membros_f,
    })

    return results
References - Run work concurrently and resolve futures: Run work concurrently - Async utils (
run_coro_as_sync
): Async utilities API - Tasks concept: Tasks concept
g
1 - What if i prefer to set async await in the extract_frentes_membros? Im more familiar with this method. How should be? 2 - when i set: def frentes_membros_urls(frentes_ids: list[str] | PrefectFuture[list[str]]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] it says: "PrefectFuture[list[str]]" is not iterable "__iter__" method not definedPylance*reportGeneralTypeIssues* (parameter) frentes_ids: list[str] | PrefectFuture[list[str]] 3 - if i do: tse_fs = [ extract_tse .submit(name, url, refresh_cache=refresh_cache) for name, url in TSE_ENDPOINTS.items() ] it gives an error: No overloads for "submit" match the provided arguments Argument types: (str, str, bool)PylancereportCallIssue
@Marvin 1 - What if i prefer to set async await in the extract_frentes_membros? Im more familiar with this method. How should be? 2 - when i set: def frentes_membros_urls(frentes_ids: list[str] | PrefectFuture[list[str]]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] it says: "PrefectFuture[list[str]]" is not iterable "__iter__" method not definedPylance*reportGeneralTypeIssues* (parameter) frentes_ids: list[str] | PrefectFuture[list[str]] 3 - if i do: tse_fs = [ extract_tse .submit(name, url, refresh_cache=refresh_cache) for name, url in TSE_ENDPOINTS.items() ] it gives an error: No overloads for "submit" match the provided arguments Argument types: (str, str, bool)PylancereportCallIssue
m
✅ thought for 112.6 seconds
Great questions — tackling each: 1) If you prefer async/await in extract_frentes_membros Make the task async and await your coroutine inside. You can keep your flow synchronous and still call the async task with .submit() for concurrency, or make the flow async and use await directly — both work. Async task
Copy code
from pathlib import Path
from typing import cast
from prefect import task, get_run_logger

from <http://utils.io|utils.io> import fetch_json_many_async, save_ndjson
from config.loader import load_config

APP_SETTINGS = load_config()

def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
    return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()
    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    jsons = await fetch_json_many_async(
        urls,
        concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
        timeout=APP_SETTINGS.CAMARA.TIMEOUT,
        follow_pagination=True,
    )
    dest = Path(out_dir)
    return save_ndjson(cast(list[dict], jsons), dest)
How to call it: - Keep your current sync flow and use concurrency: -
frentes_membros_f = extract_frentes_membros.submit(frentes_f)
# pass the upstream future; Prefect wires dependencies and resolves it at runtime - Later, get the value with
.result()
or use
resolve_futures_to_results(...)
- Or convert the flow to async and await directly: -
frentes_ids = await extract_frentes(legislatura)
then
result = await extract_frentes_membros(frentes_ids)
- Or still use
.submit()
for concurrency and collect later with
.result()
Docs: - Add logging: Add logging - Concurrency patterns: Run work concurrently 2) "PrefectFuture[list[str]] is not iterable" in frentes_membros_urls Don’t add PrefectFuture to the helper’s signature. Prefect resolves upstream futures before calling your task function, so inside the task you will receive a concrete list[str]. Keep your helper typed strictly:
Copy code
def frentes_membros_urls(frentes_ids: list[str]) -> list[str]:
    return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids]
If PyLance complains at the flow call site when you pass a future to the task, use one of these to quiet the type checker: - Cast at call site:
Copy code
from typing import Any, cast
  frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))
- Or ignore inline:
Copy code
frentes_membros_f = extract_frentes_membros.submit(frentes_f)  # pyright: ignore[reportArgumentType]
3) "No overloads for submit match" when calling extract_tse.submit(name, url, refresh_cache=refresh_cache) This is a PyLance typing limitation with Prefect’s Task/submit wrappers, not a runtime problem. Two options: - Cast the task to Any at the call site (simple and common with static checkers):
Copy code
from typing import Any, cast

  tse_fs = [
      cast(Any, extract_tse).submit(name, url, refresh_cache=refresh_cache)
      for name, url in TSE_ENDPOINTS.items()
  ]
- Or ensure your task’s function signature matches exactly what you’re passing (names and types). If it does, the cast above is still the pragmatic fix for PyLance. Bonus: resolving futures without repetitive .result()
Copy code
from prefect.futures import resolve_futures_to_results

results = resolve_futures_to_results({
    "tse": tse_fs,
    "congresso_deputados": deputados_f,
    "congresso_frentes": frentes_f,
    "congresso_frentes_membros": frentes_membros_f,
})
References:
- Concurrency and futures helpers: Run work concurrently - Async utilities: Async utilities API - Tasks concept: Tasks concept
g
@Marvin async def fetch_json_many_async( urls: list[str], out_dir: str | Path | None = None, concurrency: int = 10, timeout: float = 30.0, follow_pagination: bool = True ) -> list[str] | list[dict]: """ - Se out_dir for fornecido, salva cada JSON em um arquivo e retorna a lista de caminhos - Caso contrårio, retorna a lista de dicionårios em memória """ sem = asyncio.Semaphore(concurrency) limits = httpx.Limits(max_connections=max(concurrency, 10)) timeout_cfg = httpx.Timeout(timeout) ensure_dir(out_dir) if out_dir else None processed_urls = set() # Evita processar a mesma URL duas vezes results = [] async def one(u: str): if u in processed_urls: return [] processed_urls.add(u) async with sem: async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client: r = await client.get(u) r.raise_for_status() data = r.json() # Salvar ou retornar o resultado atual if out_dir: # Nome do arquivo determinado pelo Hash da URL name = hashlib.sha1(u.encode()).hexdigest() + ".json" path = Path(out_dir) / name with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) current_result = str(path) else: current_result = data # Verifica se deve serguir a paginação additional_results = [] if follow_pagination and "links" in data: links = {link["rel"]: link["href"] for link in data["links"]} if "self" in links and "last" in links: if links["self"] != links["last"] and "next" in links: next_url = links["next"] additional_results = await one(next_url) # Retorna resultado atual + resultados adicionais da paginação if isinstance(additional_results, list): return [current_result] + additional_results else: return [current_result] tasks = [one(u) for u in urls] *nested_results = await asyncio.gather(tasks) # Achata a lista de resultados for item in nested_results: if isinstance(item, list): results.extend(item) else: results.append(item) return results in this function (thats runs on tasks) i want to print a log for each url thats downloading
m
✅ thought for 133.4 seconds
Two easy ways to log one line per URL while this function runs inside a Prefect task: Option A — Use Prefect’s run logger (structured logs in the task run) - Pass a logger into the function (recommended), or lazily get it with get_run_logger() when running under a task. Fall back to print if called outside Prefect.
Copy code
from typing import Any
from prefect import get_run_logger
from prefect.exceptions import MissingContextError

def _get_prefect_logger_or_none() -> Any | None:
    try:
        return get_run_logger()
    except MissingContextError:
        return None

async def fetch_json_many_async(
    urls: list[str],
    out_dir: str | Path | None = None,
    concurrency: int = 10,
    timeout: float = 30.0,
    follow_pagination: bool = True,
    logger: Any | None = None,   # <- NEW
) -> list[str] | list[dict]:
    """
    - Se out_dir for fornecido, salva cada JSON em um arquivo e retorna a lista de caminhos
    - Caso contrĂĄrio, retorna a lista de dicionĂĄrios em memĂłria
    """
    logger = logger or _get_prefect_logger_or_none()

    def log(msg: str):
        if logger:
            <http://logger.info|logger.info>(msg)
        else:
            print(msg)

    sem = asyncio.Semaphore(concurrency)
    limits = httpx.Limits(max_connections=max(concurrency, 10))
    timeout_cfg = httpx.Timeout(timeout)
    ensure_dir(out_dir) if out_dir else None

    processed_urls = set()
    results: list[str] | list[dict] = []

    async def one(u: str):
        if u in processed_urls:
            log(f"Skipping duplicate URL: {u}")
            return []
        processed_urls.add(u)

        log(f"Downloading {u}")
        async with sem:
            async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client:
                r = await client.get(u)
                r.raise_for_status()
                data = r.json()

        if out_dir:
            name = hashlib.sha1(u.encode()).hexdigest() + ".json"
            path = Path(out_dir) / name
            with open(path, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False)
            current_result = str(path)
            log(f"Downloaded {u} -> {current_result}")
        else:
            current_result = data
            log(f"Downloaded {u}")

        additional_results = []
        if follow_pagination and isinstance(data, dict) and "links" in data:
            links = {link["rel"]: link["href"] for link in data.get("links", []) if isinstance(link, dict)}
            if links.get("self") and links.get("last") and links["self"] != links["last"] and "next" in links:
                next_url = links["next"]
                log(f"Following pagination: {u} -> next {next_url}")
                additional_results = await one(next_url)

        if isinstance(additional_results, list):
            return [current_result] + additional_results
        else:
            return [current_result]

    tasks = [one(u) for u in urls]
    nested_results = await asyncio.gather(*tasks)

    for item in nested_results:
        if isinstance(item, list):
            results.extend(item)
        else:
            results.append(item)

    log(f"Finished: {len(results)} items")
    return results
Usage from your task:
Copy code
@task(log_prints=False, ...)  # log_prints not needed when using logger
def my_task(...):
    logger = get_run_logger()
    jsons = run_coro_as_sync(fetch_json_many_async(urls, out_dir=None, logger=logger))
Option B — Just use print and capture via log_prints - If you prefer prints, turn on print capture: - Globally in prefect.toml:
Copy code
[logging]
    level = "INFO"
    log_prints = true
- Or per task:
@task(log_prints=True)
- Then add print lines in the same spots:
Copy code
print(f"Downloading {u}")
print(f"Downloaded {u} -> {current_result}")
print(f"Following pagination: {u} -> next {next_url}")
print(f"Finished: {len(results)} items")
Notes:
- Using get_run_logger provides Prefect-aware logs with levels and run/task context. - If this function might also run outside Prefect, the try/except around get_run_logger prevents errors and falls back to print. - Because downloads run concurrently, logs will interleave; including the URL in each line makes them easy to follow. Docs: - Add logging: Add logging - Advanced logging customization: Advanced logging customization
g
@Marvin what is an link artifact?
m
✅ thought for 182.5 seconds
A link artifact in Prefect 3.x is a small, persisted UI record that displays a clickable hyperlink associated with a flow or task run. It’s great for pointing from a run to external resources like dashboards, logs, or reports. Where it shows up - Without a key: in the run’s Artifacts tab (for that specific flow/task run). - With a key: also on the global Artifacts page in the Prefect UI, where subsequent creations with the same key appear as versions. How to create one - Use
prefect.artifacts.create_link_artifact
. It stores the link as markdown so it renders as a clickable link in the UI. The function returns the artifact’s UUID. Example
Copy code
from prefect import flow
from prefect.artifacts import create_link_artifact

@flow
def demo():
    create_link_artifact(
        link="<https://dash.mycompany.com/reports/123>",
        link_text="Open KPI dashboard",    # optional; defaults to the URL if omitted
        key="kpi-dashboard-link",          # optional; lowercase letters, numbers, dashes
        description="Daily KPI dashboard for 2025-10-25",
    )

demo()
How it differs from other artifact types - LinkArtifact: renders a single clickable link. - MarkdownArtifact: arbitrary markdown content. - TableArtifact: tabular data that renders as a table. - ImageArtifact: displays an image from a public URL. - ProgressArtifact: shows progress (0–100) and is the only type you update in place. Docs and API - Concepts: Artifacts concept - How-to guide: Create and use artifacts - API reference: prefect.artifacts API If you share where you want to link out to (e.g., S3 objects, BI dashboards, Snowflake worksheets), I can tailor an example.
g
@Marvin from pathlib import Path from prefect import task, get_run_logger from prefect.artifacts import create_progress_artifact from typing import cast from utils.io import fetch_json_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def frentes_membros_urls(frentes_ids: list[str]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str: logger = get_run_logger() progress_artifact = create_progress_artifact( progress=0.0, description="Download de membros de frentes da Cùmara" ) urls = frentes_membros_urls(frentes_ids) logger.info(f"Congresso: buscando Membros de {len(urls)} Frentes") jsons = await fetch_json_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, follow_pagination=True, progress_artifact_id=progress_artifact ) dest = Path(out_dir) / "frentes_membros.ndjson" return save_ndjson(cast(list[dict], jsons), dest) # Armazena em memória ou grava em disco uma lista de JSONs async def fetch_json_many_async( urls: list[str], out_dir: str | Path | None = None, concurrency: int = 10, timeout: float = 30.0, follow_pagination: bool = True, logger: Any | None = None, progress_artifact_id: Any | None = None, ) -> list[str] | list[dict]: """ - Se out_dir for fornecido, salva cada JSON em um arquivo e retorna a lista de caminhos - Caso contrårio, retorna a lista de dicionårios em memória """ logger = logger or _get_prefect_logger_or_none() def log(msg: str): if logger: logger.info(msg) else: print(msg) sem = asyncio.Semaphore(concurrency) limits = httpx.Limits(max_connections=max(concurrency, 10)) timeout_cfg = httpx.Timeout(timeout) ensure_dir(out_dir) if out_dir else None processed_urls = set() # Evita processar a mesma URL duas vezes results = [] downloaded_urls = 0 update_lock = asyncio.Lock() # Evita race conditions ao atualizar o progresso de forma assíncrona async def one(u: str): nonlocal downloaded_urls if u in processed_urls: return [] processed_urls.add(u) async with sem: async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client: log(f"Fazendo download da URL: {u}") r = await client.get(u) r.raise_for_status() data = r.json() # Salvar ou retornar o resultado atual if out_dir: # Nome do arquivo determinado pelo Hash da URL name = hashlib.sha1(u.encode()).hexdigest() + ".json" path = Path(out_dir) / name with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) current_result = str(path) else: current_result = data if progress_artifact_id and len(urls) > 0: async with update_lock: downloaded_urls += 1 update_progress_artifact( artifact_id=progress_artifact_id, *progress=(downloaded_urls / len(urls)) * 100* ) # Verifica se deve serguir a paginação additional_results = [] if follow_pagination and "links" in data: links = {link["rel"]: link["href"] for link in data["links"]} if "self" in links and "last" in links: if links["self"] != links["last"] and "next" in links: next_url = links["next"] additional_results = await one(next_url) # Retorna resultado atual + resultados adicionais da paginação if isinstance(additional_results, list): return [current_result] + additional_results else: return [current_result] tasks = [one(u) for u in urls] *nested_results = await asyncio.gather(tasks) # Achata a lista de resultados for item in nested_results: if isinstance(item, list): results.extend(item) else: results.append(item) return results 1 - Why theres no progress indicator in the UI?
m
Your message was too long, here's your message at the allowed limit: from pathlib import Path from prefect import task get_run_logger from prefect artifacts import create_progress_artifact from typing import cast from <http //utils io|utils io> import fetch_json_many_async save_ndjson from config loader import load_config APP_SETTINGS = load_config() *def frentes_membros_urls(frentes_ids list[str]) -> list[str] * return [f {APP_SETTINGS CAMARA REST_BASE_URL}frentes/{id}/membros for id in frentes_ids] @task( *retries=APP_SETTINGS CAMARA RETRIES * *retry_delay_seconds=APP_SETTINGS CAMARA RETRY_DELAY * timeout_seconds=APP_SETTINGS CAMARA TIMEOUT ) *async def extract_frentes_membros(frentes_ids list[str] out_dir str | Path = data/camara ) -> str * logger = get_run_logger() progress_artifact = create_progress_artifact( *progress=0 0 * *description= Download de membros de frentes da Cùmara * ) urls = frentes_membros_urls(frentes_ids) <http //logger info|logger info>(f Congresso buscando Membros de {len(urls)} Frentes ) jsons = await fetch_json_many_async( *urls=urls * *concurrency=APP_SETTINGS CAMARA CONCURRENCY * *timeout=APP_SETTINGS CAMARA TIMEOUT * *follow_pagination=True * progress_artifact_id=progress_artifact ) *dest = Path(out_dir) / frentes_membros ndjson * return save_ndjson(cast(list[dict] jsons) dest) # Armazena em memória ou grava em disco uma lista de JSONs async def fetch_json_many_async( *urls list[str] * *out_dir str | Path | None = None * *concurrency int = 10 * *timeout float = 30 0 * *follow_pagination bool = True * *logger Any | None = None * *progress_artifact_id Any | None = None * *) -> list[str] | list[dict] * * * - Se out_dir for fornecido salva cada JSON em um arquivo e retorna a lista de caminhos - Caso contrårio retorna a lista de dicionårios em memória * * logger = logger or _get_prefect_logger_or_none() *def log(msg str) * *if logger * <http //logger info|logger info>(msg) *else * print(msg) sem = asyncio Semaphore(concurrency) limits = httpx Limits(max_connections=max(concurrency 10)) timeout_cfg = httpx Timeout(timeout) ensure_dir(out_dir) if out_dir else None processed_urls = set() # Evita processar a mesma URL duas vezes results = [] downloaded_urls = 0 update_lock = asyncio Lock() # Evita race conditions ao atualizar o progresso de forma assíncrona *async def one(u str) * nonlocal downloaded_urls *if u in processed_urls * return [] processed_urls add(u) *async with sem * *async with httpx AsyncClient(limits=limits timeout=timeout_cfg) as client * log(f Fazendo download da URL {u} ) r = await client get(u) r raise_for_status() data = r json() # Salvar ou retornar o resultado atual *if out_dir * # Nome do arquivo determinado pelo Hash da URL *name = hashlib sha1(u encode()) hexdigest() + json * path = Path(out_dir) / name *with open(path w encoding= utf-8 ) as f * json dump(data f ensure_ascii=False) current_result = str(path) *else * current_result = data *if progress_artifact_id and len(urls) > 0 * *async with update_lock * downloaded_urls += 1 update_progress_artifact( *artifact_id=progress_artifact_id * *progress=(downloaded_urls / len(urls)) * 100* ) # Verifica se deve serguir a paginação additional_results = [] *if follow_pagination and links in data * links = {link[ rel ] link[ href ] for link in data[ links ]} *if self in links and last in links * *if links[ self ] != links[ last ] and next in links * next_url = links[ next ] additional_results = await one(next_url) # Retorna resultado atual + resultados adicionais da paginação *if isinstance(additional_results list) * *return
g
@Marvin jsons = await fetch_json_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, follow_pagination=True, progress_artifact_id=progress_artifact ) async def fetch_json_many_async( urls: list[str], out_dir: str | Path | None = None, concurrency: int = 10, timeout: float = 30.0, follow_pagination: bool = True, logger: Any | None = None, progress_artifact_id: Any | None = None, ) -> list[str] | list[dict]: """ - Se out_dir for fornecido, salva cada JSON em um arquivo e retorna a lista de caminhos - Caso contrårio, retorna a lista de dicionårios em memória """ logger = logger or _get_prefect_logger_or_none() def log(msg: str): if logger: logger.info(msg) else: print(msg) sem = asyncio.Semaphore(concurrency) limits = httpx.Limits(max_connections=max(concurrency, 10)) timeout_cfg = httpx.Timeout(timeout) ensure_dir(out_dir) if out_dir else None processed_urls = set() # Evita processar a mesma URL duas vezes results = [] downloaded_urls = 0 update_lock = asyncio.Lock() # Evita race conditions ao atualizar o progresso de forma assíncrona async def one(u: str): nonlocal downloaded_urls if u in processed_urls: return [] processed_urls.add(u) async with sem: async with httpx.AsyncClient(limits=limits, timeout=timeout_cfg) as client: log(f"Fazendo download da URL: {u}") r = await client.get(u) r.raise_for_status() data = r.json() # Salvar ou retornar o resultado atual if out_dir: # Nome do arquivo determinado pelo Hash da URL name = hashlib.sha1(u.encode()).hexdigest() + ".json" path = Path(out_dir) / name with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) current_result = str(path) else: current_result = data if progress_artifact_id and len(urls) > 0: async with update_lock: downloaded_urls += 1 update_progress_artifact( artifact_id=progress_artifact_id, *progress=(downloaded_urls / len(urls)) * 100* ) # Verifica se deve serguir a paginação additional_results = [] if follow_pagination and "links" in data: links = {link["rel"]: link["href"] for link in data["links"]} if "self" in links and "last" in links: if links["self"] != links["last"] and "next" in links: next_url = links["next"] additional_results = await one(next_url) # Retorna resultado atual + resultados adicionais da paginação if isinstance(additional_results, list): return [current_result] + additional_results else: return [current_result] tasks = [one(u) for u in urls] *nested_results = await asyncio.gather(tasks) # Achata a lista de resultados for item in nested_results: if isinstance(item, list): results.extend(item) else: results.append(item) return results Why theres no progress indicator in the UI?
m
✅ thought for 114.0 seconds
Short answer: a progress bar only shows up in the UI if you first create a Progress artifact, update that exact artifact’s ID during a flow/task run (or give it a key if outside a run), and look in the right place in the UI. Common reasons it doesn’t appear - No artifact was created:
progress_artifact_id
is None or not a UUID returned by create_progress_artifact/acreate_progress_artifact, so your
if progress_artifact_id
block never runs. - Created outside a Prefect run with no key: artifacts without a key only show on the associated flow/task run’s Artifacts tab. If there’s no run context, it won’t be visible anywhere. Add a
key
to see it on the global Artifacts page. - Checking the wrong UI page: - Without a key: see Flow Run -> Artifacts tab - With a key: see the global Artifacts page - Wrong type or version mismatch: make sure you’re using a Progress artifact in Prefect 3.x and updating it with the correct function. Known-good pattern
Copy code
from prefect import flow
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact

@flow
async def main(urls):
    # 1) Create the progress artifact and capture its UUID
    progress_id = await acreate_progress_artifact(
        progress=0,
        description="Starting downloads
",
        # Optional: add a key to also see it on the global Artifacts page
        key="camara-download-progress"
    )

    # 2) Pass the UUID to your async helper
    results = await fetch_json_many_async(
        urls=urls,
        concurrency=10,
        timeout=30.0,
        follow_pagination=True,
        progress_artifact_id=progress_id,
    )

    # 3) Final update to 100%
    await aupdate_progress_artifact(
        artifact_id=progress_id,
        progress=100,
        description="Downloads complete"
    )
Notes for your function - Your
if progress_artifact_id and len(urls) > 0:
guard means no updates happen if the ID is falsy. Confirm you’re passing the UUID returned by create/acreate. - In an async function, prefer
aupdate_progress_artifact
to avoid blocking, though using the sync version won’t prevent it from showing up. - If this runs outside a flow/task, set a
key
on creation; otherwise it won’t be visible in the UI. Docs - Concepts: Artifacts concept - How-to: Create and use artifacts - API: prefect.artifacts API If you can share how you set
progress_artifact_id
(the creation snippet) and where you’re looking in the UI, I can pinpoint the exact issue.
g
@Marvin from pathlib import Path from prefect import task, get_run_logger from prefect.artifacts import create_progress_artifact from typing import cast from utils.io import fetch_json_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def frentes_membros_urls(frentes_ids: list[str]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str: logger = get_run_logger() progress_artifact = create_progress_artifact( progress=0.0, description="Download de membros de frentes da CĂąmara" ) urls = frentes_membros_urls(frentes_ids) logger.info(f"Congresso: buscando Membros de {len(urls)} Frentes") jsons = await fetch_json_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, follow_pagination=True, progress_artifact_id=progress_artifact ) dest = Path(out_dir) / "frentes_membros.ndjson" return save_ndjson(cast(list[dict], jsons), dest) This is the code that im calling the artifact. Im clicking on runs and on the bar of the fetch_frentes_membros i expand it but theres no indicator
m
✅ thought for 128.8 seconds
Got it — thanks for sharing the code. Two key points that usually explain “no progress indicator” in the UI: 1) Where to look in the UI - Prefect does not render progress artifacts inline on the task row/timeline. Expanding the task row will not show a progress bar. - Instead: - Open the task run details for your
extract_frentes_membros
task, then go to its Artifacts tab. - Or give the artifact a key when creating it, then check the global Artifacts page. 2) Make sure you’re creating and updating the same Progress artifact - Your pattern is right: create a Progress artifact at the start of the task, pass its UUID into your helper, and call update on that UUID. - Since your task and helper are async, I recommend using the async API to avoid blocking and ensure the UI gets timely updates. Suggested changes
Copy code
from prefect import task, get_run_logger
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()

    # Create the progress artifact (optionally add a key to also see it on the global Artifacts page)
    progress_artifact = await acreate_progress_artifact(
        progress=0.0,
        description="Download de membros de frentes da CĂąmara",
        # key="frentes-membros-progress"  # optional
    )
    <http://logger.info|logger.info>(f"Progress artifact id: {progress_artifact}")

    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    jsons = await fetch_json_many_async(
        urls=urls,
        concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
        timeout=APP_SETTINGS.CAMARA.TIMEOUT,
        follow_pagination=True,
        progress_artifact_id=progress_artifact,
    )

    # Final update to 100% is a nice touch
    await aupdate_progress_artifact(
        artifact_id=progress_artifact,
        progress=100.0,
        description="Downloads concluĂ­dos"
    )

    dest = Path(out_dir) / "frentes_membros.ndjson"
    return save_ndjson(cast(list[dict], jsons), dest)
And in your helper, use the async updater:
Copy code
from prefect.artifacts import aupdate_progress_artifact

# ...
if progress_artifact_id and len(urls) > 0:
    async with update_lock:
        downloaded_urls += 1
        await aupdate_progress_artifact(
            artifact_id=progress_artifact_id,
            progress=(downloaded_urls / len(urls)) * 100
        )
Quick checklist - Confirm you’re opening the task run’s details page and checking the Artifacts tab. - Log the artifact UUID right after creation to ensure it’s not None. - Try adding a key to the artifact on creation; then check the global Artifacts page to confirm it exists. - Ensure your
update_progress_artifact
calls are actually executed (e.g.,
urls
length > 0, no early returns). Docs - Concepts: Artifacts concept - How-to: Create and use artifacts - API: prefect.artifacts API If you can share a screenshot of where you’re looking in the UI (task run details vs. expanded row) and your Prefect version, I can confirm precisely what you should see.
g
@Marvin I added the key and async update but seeing the logs of the taks i got: Task run failed with exception: PrefectHTTPStatusError("Client error '404 Not Found' for url 'http://127.0.0.1:4200/api/artifacts/%3Ccoroutine%20object%20acreate_progress_artifact%20at%200x00000227FFBBA440%3E'\nResponse: {'detail': 'Not Found'}\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/404") - Retry 2/30 will start 5 second(s) from now
m
✅ thought for 17.6 seconds
That 404 tells us the artifact_id you’re sending is a coroutine object, not a UUID. See the URL fragment:
<coroutine object acreate_progress_artifact at ...>
— that happens when
acreate_progress_artifact(...)
wasn’t awaited. Fix - Await the async creation and updates, and pass the UUID to your helper. Correct pattern
Copy code
from uuid import UUID
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact

@task(...)
async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str:
    logger = get_run_logger()

    progress_id = await acreate_progress_artifact(
        progress=0.0,
        description="Download de membros de frentes da CĂąmara",
        key="frentes-membros-progress"
    )
    <http://logger.info|logger.info>(f"Progress artifact id: {progress_id} (type={type(progress_id)})")
    assert isinstance(progress_id, UUID)

    urls = frentes_membros_urls(frentes_ids)
    <http://logger.info|logger.info>(f"Congresso: buscando Membros de {len(urls)} Frentes")

    jsons = await fetch_json_many_async(
        urls=urls,
        concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
        timeout=APP_SETTINGS.CAMARA.TIMEOUT,
        follow_pagination=True,
        progress_artifact_id=progress_id,  # <- pass the UUID
    )

    await aupdate_progress_artifact(
        artifact_id=progress_id,
        progress=100.0,
        description="Downloads concluĂ­dos"
    )

    dest = Path(out_dir) / "frentes_membros.ndjson"
    return save_ndjson(cast(list[dict], jsons), dest)
And inside your helper:
Copy code
from prefect.artifacts import aupdate_progress_artifact

# ...
if progress_artifact_id and len(urls) > 0:
    async with update_lock:
        downloaded_urls += 1
        await aupdate_progress_artifact(
            artifact_id=progress_artifact_id,
            progress=(downloaded_urls / len(urls)) * 100
        )
Also, make sure you’re looking at: - Task run details -> Artifacts tab (without a key), or - Global Artifacts page (since you added a key). Docs for reference: - API: prefect.artifacts API - How-to: Create and use artifacts If you still hit 404 after this, paste the exact lines where you create the artifact and log its value/type. That’ll confirm whether it’s a UUID or not.
g
@Marvin I did everything you told me to do, creating an async artifact updater but in the ui the progress circle is stuck visually but when i access the artifact it shows the correct data
m
✅ thought for 199.7 seconds
Thanks for the detail — this symptom matches a UI refresh issue: the progress artifact data is updating correctly in the API/DB, but the circle you’re looking at isn’t re-rendering as updates come in. What’s likely happening - The UI polls the API periodically. Some views don’t live-update the progress circle until the next poll or until you navigate/refresh the page, even though the artifact itself shows the latest value when opened. - If you’re updating very frequently (e.g., per-URL with high concurrency), the UI can appear “stuck” between polls. The artifact detail view will show the correct value because it fetches fresh data when you open it. Quick checks and workarounds - Try a hard refresh on the page (Shift+Reload) or navigate away and back; see if the circle advances. - Throttle updates: only call
aupdate_progress_artifact
on meaningful increments (e.g., every N items or every 1–2s) so the UI has time to poll and show visible changes. - Confirm you’re looking in the task run’s Artifacts tab or the global Artifacts page (since you added a key). - Make sure you’re using the async updater and awaiting it: -
progress_id = await acreate_progress_artifact(...)
-
await aupdate_progress_artifact(artifact_id=progress_id, progress=...)
Minimal repro you can run to observe UI behavior
Copy code
from prefect import flow
from prefect.artifacts import acreate_progress_artifact, aupdate_progress_artifact
import asyncio

@flow
async def progress_ui_test():
    aid = await acreate_progress_artifact(progress=0, key="ui-progress-test", description="UI progress test")
    for i in range(0, 101, 10):
        await aupdate_progress_artifact(artifact_id=aid, progress=i, description=f"{i}%")
        await asyncio.sleep(2)  # allow the UI to poll between updates

progress_ui_test()
- Open the global Artifacts page and watch the artifact with key
ui-progress-test
. If the circle doesn’t move unless you refresh, it confirms the polling/refresh behavior. If it still looks stuck after trying the above - Share these details and I can dig deeper or file an issue: - Prefect version (prefect version) - Prefect Cloud or self-hosted (and UI URL/port — I see 127.0.0.1:4200) - Which exact page/view shows the stuck circle (global Artifacts list, task run Artifacts tab, run summary, etc.) - Browser and version - Whether throttling updates (e.g., every 1–2s) changes the behavior References - Artifacts how-to: Create and use artifacts - API reference: prefect.artifacts API
g
@Marvin Im self hosting Im in the run details where it shows the bars of the tasks. i have added : if progress_artifact_id and len(urls) > 0: async with update_lock: downloaded_urls += 1 # SĂł atualiza a cada 5 urls baixadas, para atualizar o progress bar da UI if downloaded_urls % 5 == 0 or downloaded_urls == len(urls): await aupdate_progress_artifact( artifact_id=progress_artifact_id, *progress=(downloaded_urls / len(urls)) * 100* ) So it would update only after 5 urls downloaded (in this example there are 305 to download). But the progress circle still not updating automatically, without having to refresh. Also i got another question:
m
✅ thought for 39.9 seconds
Thanks for the extra context. Since you’re self‑hosting and watching the run details page (the view with the task bars), what you’re seeing is expected today: that view does not live‑render progress artifact updates. The artifact data is correct (as you observed when opening the artifact), but the circle in that run details view doesn’t update until the next UI poll or a manual refresh. What you can do right now - Verify where it does show: open the task run details and go to its Artifacts tab, or the global Artifacts page (since you added a key). Those views fetch fresh data when opened and on their own polling cadence. - Throttle by time, not just item count: even with “every 5 URLs”, high concurrency can still fire updates faster than the UI polls. Consider updating at most every ~1–2 seconds.
Copy code
import time
  last_sent = 0.0

  # inside your async update block
  now = time.monotonic()
  if (now - last_sent) >= 2.0 or downloaded_urls == len(urls):
      last_sent = now
      await aupdate_progress_artifact(
          artifact_id=progress_artifact_id,
          progress=(downloaded_urls / len(urls)) * 100
      )
- Hard refresh the page (Shift + Reload) or navigate away/back if you need to see changes immediately. - Confirm you’re on the latest 3.x; improvements land regularly. Run
prefect version
. If you want to double‑check the backend is updating (it is), you can hit the API directly:
Copy code
# by artifact id
curl <http://127.0.0.1:4200/api/artifacts/<ARTIFACT_ID>>

# by key (list latest first)
curl "<http://127.0.0.1:4200/api/artifacts?key=frentes-membros-progress&sort=desc>"
API refs: Read artifacts, Update artifact. If the circle still never advances without a manual refresh on the run details page, that’s likely a UI limitation/bug in that specific view. If you can share: - Prefect version (
prefect version
) - UI build (self-host URL/port, e.g., 127.0.0.1:4200) - Which exact page is “stuck” (run details -> task bars vs Artifacts tab vs global Artifacts) - Browser/version 
I can open an issue with the UI team on your behalf. Also — what’s your other question? Happy to help.
g
The other question is: frentes_membros recieves the ids of frentes to mount the urls and download them async. but in the flow run page, where we have the live graph of the run, when it finishes downloading frentes i need to use the mouse to click the arrow to expand and see the taks of frentes_membros. Why? I want it to automatically show in the graph. Heres my pipeline where im calling the tasks: from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from prefect.futures import resolve_futures_to_results from datetime import date, datetime from typing import Any, cast from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from tasks.camara.frentes_membros import extract_frentes_membros from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) async def pipeline( date: date = datetime.now().date(), refresh_cache: bool = False ): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [ cast(Any, extract_tse) .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) frentes_f = extract_frentes.submit(legislatura) frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f)) return resolve_futures_to_results({ "tse": tse_fs, "congresso_deputados": deputados_f, "congresso_frentes": frentes_f, "congresso_frentes_membros": frentes_membros_f, }) if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" )
@Marvin The other question is: frentes_membros recieves the ids of frentes to mount the urls and download them async. but in the flow run page, where we have the live graph of the run, when it finishes downloading frentes i need to use the mouse to click the arrow to expand and see the taks of frentes_membros. Why? I want it to automatically show in the graph. Heres my pipeline where im calling the tasks: from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from prefect.futures import resolve_futures_to_results from datetime import date, datetime from typing import Any, cast from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from tasks.camara.frentes_membros import extract_frentes_membros from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) async def pipeline( date: date = datetime.now().date(), refresh_cache: bool = False ): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [ cast(Any, extract_tse) .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) frentes_f = extract_frentes.submit(legislatura) frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f)) return resolve_futures_to_results({ "tse": tse_fs, "congresso_deputados": deputados_f, "congresso_frentes": frentes_f, "congresso_frentes_membros": frentes_membros_f, }) if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" )
m
✅ thought for 35.0 seconds
Thanks — that helps. On your second question: What you’re seeing is the current UI behavior. On the flow run page: - Timeline tab (the one with the horizontal bars) groups tasks into collapsible rows for readability and performance. It does not auto-expand when a downstream task (like frentes_membros) starts; you have to click the arrow to reveal the child tasks. - Graph tab shows the DAG dependencies live, but it also won’t auto-expand collapsed groups by itself. There isn’t a setting today to auto-expand those groups as tasks start. Workarounds: - Watch the Graph tab to see when frentes_membros becomes scheduled/running, then click it to navigate to the task run details. - In the Timeline, expand the relevant group once; it will stay expanded while you’re on that page. If you navigate away, you’ll need to expand again. - If you want clearer visibility, consider making frentes_membros a subflow (purely for UX hierarchy); you’d still click into it, but the separation can make it easier to spot in the Graph. If you’d like, I can file a feature request to: - Auto-expand a task group when its first child starts - Or add a “Expand all/remember expanded state” toggle on the Timeline/Graph views Let me know your Prefect version and I’ll open it with the UI team.
g
@Marvin
Copy code
from pathlib import Path
from typing import cast
from uuid import UUID
from prefect import task, get_run_logger
from prefect.artifacts import create_progress_artifact
from datetime import timedelta

from <http://utils.io|utils.io> import download_stream
from utils.br_data import BR_STATES, ELECTIONS_YEARS
from config.loader import load_config, CACHE_POLICY_MAP

APP_SETTINGS = load_config()

# MONTAR ENDPOINT REDES SOCIAIS POR ESTADO
REDES_SOCIAIS_ENDPOINTS = {
    f"redes_sociais_{year}_{state}": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/rede_social_candidato_{year}_{state}.zip"
    for state in BR_STATES
    for year in ELECTIONS_YEARS
}

TSE_ENDPOINTS = {
    "candidatos_2018": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2018.zip",
    "candidatos_2022": f"{APP_SETTINGS.TSE.BASE_URL}consulta_cand/consulta_cand_2022.zip",
    "prestaca_contas_2018": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2018.zip",
    "prestacao_contas_2022": f"{APP_SETTINGS.TSE.BASE_URL}prestacao_contas/prestacao_de_contas_eleitorais_candidatos_2022.zip",
    "resultado_eleicao_2018": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2018.zip",
    "resultado_eleicao_2022": f"{APP_SETTINGS.TSE.BASE_URL}votacao_candidato_munzona/votacao_candidato_munzona_2022.zip"
}

TSE_ENDPOINTS = TSE_ENDPOINTS | REDES_SOCIAIS_ENDPOINTS

@task(
    task_run_name="extract_tse_{name}",
    retries=APP_SETTINGS.TSE.RETRIES,
    retry_delay_seconds=APP_SETTINGS.TSE.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.TSE.TIMEOUT,
    cache_policy=CACHE_POLICY_MAP[APP_SETTINGS.TSE.CACHE_POLICY],
    cache_expiration=timedelta(days=APP_SETTINGS.TSE.CACHE_EXPIRATION)
)
def extract_tse(name: str, url: str, out_dir: str = "data/tse") -> str:
    logger = get_run_logger()

    progress_id = create_progress_artifact(
        progress=0.0,
        description=f"Download do arquivo {name}, do TSE"
    )

    dest = Path(out_dir) / f"{name}.zip"
    <http://logger.info|logger.info>(f"Fazendo download  do endpoint TSE '{url}' -> {dest}")
    dest_path = download_stream(url, dest, unzip=True, progress_artifact_id=cast(UUID, progress_id))
    return dest_path
# TSE: ~30 endpoints em paralelo tse_fs = [ cast(Any, extract_tse) .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] Acima tenho uma função a qual quero criar um artefato agregado de todos esses downloads feitos em paralelo. Como posso juntar esses dados em um artefato prefect ou atualizar uma tabela de artefato ja criado?
m
✅ thought for 135.9 seconds
Great question. Short answer: only Progress artifacts support in-place updates. For an aggregate “table of all TSE downloads,” you’ll create a Table artifact once you have the parallel results, and if you need to refresh it later, create a new version using the same key. Two good patterns you can choose from: Option A — Aggregate at the flow level after all tasks complete - Submit your ~30 extract_tse tasks in parallel. - Resolve their results in the flow. - Build a table and create a single Table artifact with a stable key. Example:
Copy code
from prefect import flow
from prefect.futures import resolve_futures_to_results
from prefect.artifacts import create_table_artifact

@flow
def pipeline(...):
    # submit tasks
    tse_fs = [
        extract_tse.with_options(refresh_cache=refresh_cache).submit(name, url)
        for name, url in TSE_ENDPOINTS.items()
    ]

    # resolve results to paths (or whatever you want in the table)
    results = resolve_futures_to_results(tse_fs)

    # build a table: list[dict] works nicely
    table = [
        {"name": name, "url": url, "path": path}
        for (name, url), path in zip(TSE_ENDPOINTS.items(), results)
    ]

    # create/update the aggregate table artifact (new version each run)
    create_table_artifact(
        table=table,
        key="tse-downloads-summary",
        description="Resumo dos downloads TSE desta execução"
    )

    return results
Option B — Stream partials and republish new versions - If you want the table to evolve during the run, you can publish a new table artifact with the same key as chunks complete. This does NOT update in place; it creates new versions and the UI will show the latest version. - Throttle how often you republish (e.g., every N completions or every few seconds) to avoid spamming. Notes and limitations - Table, Markdown, Link, and Image artifacts are immutable; re-creating with the same key creates a new version. - Only Progress artifacts can be updated in place (
update_progress_artifact
or
aupdate_progress_artifact
). - To retrieve the latest version later, you can query by key (e.g.,
Artifact.get(key="tse-downloads-summary")
) if you need it downstream. If you want a row-per-file download with live-ish updates, I recommend Option A for simplicity. If you really want incremental visibility, do a periodic republish:
Copy code
from prefect.artifacts import create_table_artifact
from time import monotonic

last_pub = 0.0
def maybe_publish(partial_rows):
    global last_pub
    now = monotonic()
    if now - last_pub >= 5.0 or len(partial_rows) == TOTAL:
        create_table_artifact(
            table=partial_rows,
            key="tse-downloads-summary",
            description="Resumo parcial dos downloads TSE"
        )
        last_pub = now
If you share what columns you want in the summary (e.g., name, url, destination path, bytes, duration, success/error), I can draft the exact table building code.
g
@Marvin def frentes_membros_urls(frentes_ids: list[str]) -> list[str]: return [f"{APP_SETTINGS.CAMARA.REST_BASE_URL}frentes/{id}/membros" for id in frentes_ids] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) async def extract_frentes_membros(frentes_ids: list[str], out_dir: str | Path = "data/camara") -> str: logger = get_run_logger() progress_id = await acreate_progress_artifact( progress=0.0, description="Download de membros de frentes da CĂąmara" ) assert isinstance(progress_id, UUID) urls = frentes_membros_urls(frentes_ids) logger.info(f"Congresso: buscando Membros de {len(urls)} Frentes") jsons = await fetch_json_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, follow_pagination=True, progress_artifact_id=progress_id ) await aupdate_progress_artifact( artifact_id=progress_id, progress=100.0, description="Downloads concluĂ­dos" ) artifact_data = [] for json in jsons: json = cast(dict, json) link_self = next(l["href"] for l in json.get("links", []) if l.get("rel") == "self") id_frente = link_self.split("/")[-2] membros = json.get("data", []) # type: ignore artifact_data.append({ "Id Frente": id_frente, "Quantidade Membros": len(membros) }) create_table_artifact( key="frentes_membros", table=artifact_data, description="Total de membros encontrados nas frentes." ) dest = Path(out_dir) / "frentes_membros.ndjson" return save_ndjson(cast(list[dict], jsons), dest) The artifact wasnt created. it said: RuntimeWarning: coroutine 'acreate_table_artifact' was never awaited create_table_artifact( RuntimeWarning: Enable tracemalloc to get the object allocation traceback It is obligatory to use acreate_table_artifact in this case? Why?
m
✅ thought for 32.6 seconds
You’re in an async task, so Prefect’s sync-compatible functions will return a coroutine when called from that context. That’s why you saw “coroutine 'acreate_table_artifact' was never awaited” at the line where you called create_table_artifact(...). Answer to “Is it obligatory to use acreate_table_artifact here? Why?” - Not strictly obligatory to import the async name, but you must await the call in an async context. - Prefect decorates many functions as “sync-compatible”. In a sync context you just call create_table_artifact(...). In an async context, the same call returns an awaitable (the underlying acreate_table_artifact), so you must await it. If you don’t, you get that RuntimeWarning and nothing is created. Two equivalent fixes - Explicit async variant:
Copy code
from prefect.artifacts import acreate_table_artifact

# ...
artifact_id = await acreate_table_artifact(
    key="frentes_membros",
    table=artifact_data,
    description="Total de membros encontrados nas frentes."
)
- Or keep the sync name but await it in async code:
Copy code
from prefect.artifacts import create_table_artifact

# ...
artifact_id = await create_table_artifact(
    key="frentes_membros",
    table=artifact_data,
    description="Total de membros encontrados nas frentes."
)
Tip - Table artifacts are immutable; calling (a)create_table_artifact again with the same key creates a new version, not an in-place update. - Keep using the async variants consistently inside async tasks: you already do this for progress artifacts, so mirroring that for table artifacts is a good practice. Docs - How-to: Create and use artifacts - API: prefect.artifacts API
g
@Marvin from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from prefect.futures import resolve_futures_to_results from datetime import date, datetime from typing import Any, cast from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from tasks.camara.frentes_membros import extract_frentes_membros from tasks.camara.assiduidade import extract_assiduidade_deputados from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) async def pipeline( date: date = datetime.now().date(), refresh_cache: bool = False ): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [ cast(Any, extract_tse) .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) assiduidade_f = extract_assiduidade_deputados.submit(deputados_f, legislatura) frentes_f = extract_frentes.submit(legislatura) frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f)) return resolve_futures_to_results({ "tse": tse_fs, "congresso_deputados": deputados_f, "congresso_assiduidade": assiduidade_f, "congresso_frentes": frentes_f, "congresso_frentes_membros": frentes_membros_f, }) if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" ) Im having a problem in: assiduidade_f = extract_assiduidade_deputados.submit(deputados_f, legislatura) because deputados_f is a PrefectFuture of a list of ids but i need to pass it to assiduidade. Also another question. Assiduidade should run in parallel to all other tasks right after deputados finishes. How this can be accomplished?
m
✅ thought for 52.5 seconds
You’ve got two related things here: passing a future into another task, and fanning out work after that future resolves. 1) Passing a PrefectFuture to a task - It’s valid to pass a PrefectFuture as an argument to another task. Prefect will resolve it to the underlying value right before the downstream task runs. - So this is fine if your downstream task consumes the whole list in a single task:
Copy code
deputados_f = extract_deputados.submit(legislatura)
  assiduidade_f = extract_assiduidade_deputados.submit(deputados_f, legislatura)
This creates a dependency: assiduidade waits for deputados to finish, then starts. It will run in parallel with all other branches that don’t depend on deputados. 2) If you want per-id parallelism for assiduidade Tasks cannot schedule other tasks (only flows can). To fan out per-id work, use a subflow or resolve the list in the flow, then submit per-id tasks. Option A: Subflow that fans out - Make a small subflow that takes the list of IDs and submits per-id tasks. Then call that subflow with the future; Prefect will start the subflow as soon as deputados finishes.
Copy code
from prefect import flow, task
from prefect.futures import resolve_futures_to_results

@task
def extract_assiduidade_for_one(dep_id: str, legislatura: int) -> dict:
    # ... compute one deputado’s assiduidade ...
    return {"id": dep_id, "ok": True}

@flow
def extract_assiduidade_deputados(ids: list[str], legislatura: int) -> list[dict]:
    fs = [extract_assiduidade_for_one.submit(i, legislatura) for i in ids]
    return resolve_futures_to_results(fs)

# In your main flow:
deputados_f = extract_deputados.submit(legislatura)
assiduidade_f = extract_assiduidade_deputados.submit(deputados_f, legislatura)
- This runs “right after deputados finishes” and in parallel with other tasks already running. Option B: Await the future in the flow, then submit per-id tasks - Since your flow is async, you can await just that future to get the list and immediately fan out. Other tasks submitted earlier keep running in parallel.
Copy code
deputados_ids = await deputados_f.result()  # or just await deputados_f if you prefer
assiduidade_fs = [extract_assiduidade_for_one.submit(i, legislatura) for i in deputados_ids]
Which should you choose? - If you want assiduidade as a single aggregated task, keep your original pattern with
.submit(deputados_f, legislatura)
. - If you want per-id parallelism, use the subflow pattern (Option A) or await-and-fan-out (Option B). Don’t call
.submit
inside a task; it won’t schedule child tasks. Notes - Your ThreadPoolTaskRunner will run these tasks concurrently as they are submitted. - Avoid calling
.result()
on futures too early in the flow unless you intend to gate subsequent scheduling on that completion. In your case, awaiting just
deputados_f
to fan out its dependents is perfectly fine; everything else already submitted keeps running. If you share the signature of
extract_assiduidade_deputados
(what it expects and returns), I can tailor the exact code for either single-task or per-id parallel patterns.
g
@Marvin ok got it about passing future values. from pathlib import Path from prefect import task, get_run_logger from prefect.artifacts import ( acreate_progress_artifact, aupdate_progress_artifact, acreate_table_artifact ) from prefect.futures import PrefectFuture from typing import cast import re from selectolax.parser import HTMLParser from utils.io import fetch_html_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def assiduidade_urls(deputados_ids: list[str], leg_years: list[int]) -> list[str]: return [ f"{APP_SETTINGS.CAMARA.PORTAL_BASE_URL}deputados/{id}/presenca-plenario/{year}" for id in deputados_ids for year in leg_years ] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) async def extract_assiduidade_deputados( deputados_ids: list[str] | PrefectFuture[list[int]], legislatura: dict, out_dir: str | Path = "data/camara/assiduidade" ) -> str: logger = get_run_logger() leg_years = legislatura.get("dados", []).get("anosPassados") progress_id = await acreate_progress_artifact( progress=0.0, description="Progresso do download da assiduiadde de deputados" ) urls = assiduidade_urls(deputados_ids, leg_years) logger.info(f"CĂąmara: buscando assiduidade de {len(deputados_ids)} deputados dos anos {leg_years}") htmls = await fetch_html_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, progress_artifact_id=progress_id ) await aupdate_progress_artifact( artifact_id=progress_id, progress=100.0, description="Downloads concluĂ­dos" ) href_pattern = re.compile(r'https://www\.camara\.leg\.br/deputados/\d+') *id_ano_pattern= r'/deputados/(?P<id>\d+)\?.ano=(?P<ano>\d+)' json_results = [] for html in htmls: tree = HTMLParser(cast(str, html)) all_links = tree.css("a") for link in all_links: href = link.attributes.get("href", "") if isinstance(href, str): if href_pattern.match(href): match = re.search(id_ano_pattern, href) if match: deputado_id = int(match.group('id')) ano = int(match.group('ano')) json_results.append({ "deputado_id": deputado_id, "ano": ano, "html": html }) else: logger.warning(f"NĂŁo foram encontrados dados suficientes na pĂĄgina HTML") else: logger.warning(f"O href {href} nĂŁo Ă© string") dest = Path(out_dir) / "assiduidade.ndjson" dest_path = save_ndjson(json_results, dest) return dest_path This is the function but pylance is agnry about the types: Argument of type "list[str] | PrefectFuture[list[int]]" cannot be assigned to parameter "deputados_ids" of type "list[str]" in function "assiduidade_urls" Type "list[str] | PrefectFuture[list[int]]" is not assignable to type "list[str]" "PrefectFuture[list[int]]" is not assignable to "list[str]"Pylance*reportArgumentType* (parameter) deputados_ids: list[str] | PrefectFuture[list[int]]
m
✅ thought for 46.9 seconds
You’re seeing that Pylance error because the task parameter is annotated as a union with PrefectFuture, then you pass it to a function that expects a concrete list[str]. But at runtime, Prefect will always resolve futures before calling your task. So your task should be typed to receive concrete values (not futures). What to change - Do not type your task parameter as PrefectFuture[
]. Type it as a concrete sequence/list. - If your IDs might be ints in some places, normalize them to str before building URLs. - Access legislatura safely and default to an empty list if missing. Cleaned-up version
Copy code
from pathlib import Path
from typing import Sequence, cast
from uuid import UUID
from prefect import task, get_run_logger
from prefect.artifacts import (
    acreate_progress_artifact,
    aupdate_progress_artifact,
)
from selectolax.parser import HTMLParser
import re

from <http://utils.io|utils.io> import fetch_html_many_async, save_ndjson
from config.loader import load_config

APP_SETTINGS = load_config()

def assiduidade_urls(deputados_ids: Sequence[str], leg_years: Sequence[int]) -> list[str]:
    return [
        f"{APP_SETTINGS.CAMARA.PORTAL_BASE_URL}deputados/{dep_id}/presenca-plenario/{year}"
        for dep_id in deputados_ids
        for year in leg_years
    ]

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT,
)
async def extract_assiduidade_deputados(
    deputados_ids: list[str],
    legislatura: dict,
    out_dir: str | Path = "data/camara/assiduidade",
) -> str:
    logger = get_run_logger()

    # Get years safely
    leg_years = legislatura.get("dados", {}).get("anosPassados", [])
    if not isinstance(leg_years, list):
        leg_years = []

    # Normalize IDs to strings
    dep_ids = [str(i) for i in deputados_ids]

    progress_id = await acreate_progress_artifact(
        progress=0.0,
        description="Progresso do download da assiduidade de deputados",
    )
    <http://logger.info|logger.info>(f"Progress artifact id: {progress_id}")

    urls = assiduidade_urls(dep_ids, cast(list[int], leg_years))
    <http://logger.info|logger.info>(f"CĂąmara: buscando assiduidade de {len(dep_ids)} deputados dos anos {leg_years}")

    htmls = await fetch_html_many_async(
        urls=urls,
        concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
        timeout=APP_SETTINGS.CAMARA.TIMEOUT,
        progress_artifact_id=progress_id,
    )

    await aupdate_progress_artifact(
        artifact_id=progress_id,
        progress=100.0,
        description="Downloads concluĂ­dos",
    )

    href_pattern = re.compile(r"<https://www>\.camara\.leg\.br/deputados/\d+")
    id_ano_pattern = r"/deputados/(?P<id>\d+)\?.*ano=(?P<ano>\d+)"

    json_results = []
    for html in htmls:
        tree = HTMLParser(cast(str, html))
        for link in tree.css("a"):
            href = link.attributes.get("href", "") or ""
            if not isinstance(href, str):
                logger.warning(f"O href {href} nĂŁo Ă© string")
                continue
            if href_pattern.match(href):
                match = re.search(id_ano_pattern, href)
                if match:
                    deputado_id = int(match.group("id"))
                    ano = int(match.group("ano"))
                    json_results.append({"deputado_id": deputado_id, "ano": ano, "html": html})
                else:
                    logger.warning("NĂŁo foram encontrados dados suficientes na pĂĄgina HTML")

    dest = Path(out_dir) / "assiduidade.ndjson"
    dest_path = save_ndjson(json_results, dest)
    return dest_path
At the call site - Keep your pipeline code passing the future directly; Prefect will resolve it. If Pylance complains at the submit line, silence it with a cast just at the call site (only for the linter; runtime is fine): ``` assiduidade_f = extract_assiduidade_deputados.submit(cast(list[str], deputados_f), legislatura) # or, more broadly:
assiduidade_f = extract_assiduidade_deputados.submit(cast(Any, deputados_f), legislatura) ``` Even better: make sure your
extract_deputados
task is annotated to return
list[str]
, so the future is
PrefectFuture[list[str]]
. Then use a small helper subflow to fan out per-id if needed (we discussed patterns earlier). Why not accept PrefectFuture in the task signature? - Tasks should receive materialized values. Prefect handles resolving futures automatically before invoking your task. Including PrefectFuture in the type hints confuses static type checkers and your own code, as you saw when you tried to use the union in places that expect concrete types. If you want, paste the signature and return type of
extract_deputados
and I’ll align the types end-to-end to make Pylance happy at both the definition and submit sites.
g
@Marvin File "C:\Projetos\magnetron\.venv\Lib\site-packages\prefect\utilities\timeout.py", line 32, in timeout_async raise timeout_exc_type(f"Scope timed out after {seconds} second(s).") prefect.task_engine.TaskRunTimeoutError: Scope timed out after 120.0 second(s). i got this error on exectuting assuduidade what it could be? from pathlib import Path from prefect import task, get_run_logger from prefect.artifacts import ( acreate_progress_artifact, aupdate_progress_artifact, acreate_table_artifact ) from typing import cast import re from selectolax.parser import HTMLParser from utils.io import fetch_html_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def assiduidade_urls(deputados_ids: list[str], leg_years: list[int]) -> list[str]: return [ f"{APP_SETTINGS.CAMARA.PORTAL_BASE_URL}deputados/{id}/presenca-plenario/{year}" for id in deputados_ids for year in leg_years ] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT ) async def extract_assiduidade_deputados( deputados_ids: list[str], legislatura: dict, out_dir: str | Path = "data/camara/assiduidade" ) -> str: logger = get_run_logger() leg_years = legislatura.get("dados", [])[0].get("anosPassados") progress_id = await acreate_progress_artifact( progress=0.0, description="Progresso do download da assiduiadde de deputados" ) urls = assiduidade_urls(deputados_ids, leg_years) logger.info(f"CĂąmara: buscando assiduidade de {len(deputados_ids)} deputados dos anos {leg_years}") htmls = await fetch_html_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, progress_artifact_id=progress_id ) await aupdate_progress_artifact( artifact_id=progress_id, progress=100.0, description="Downloads concluĂ­dos" ) href_pattern = re.compile(r'https://www\.camara\.leg\.br/deputados/\d+') *id_ano_pattern= r'/deputados/(?P<id>\d+)\?.ano=(?P<ano>\d+)' json_results = [] for html in htmls: tree = HTMLParser(cast(str, html)) all_links = tree.css("a") for link in all_links: href = link.attributes.get("href", "") if isinstance(href, str): if href_pattern.match(href): match = re.search(id_ano_pattern, href) if match: deputado_id = int(match.group('id')) ano = int(match.group('ano')) json_results.append({ "deputado_id": deputado_id, "ano": ano, "html": html }) else: logger.warning(f"NĂŁo foram encontrados dados suficientes na pĂĄgina HTML") else: logger.warning(f"O href {href} nĂŁo Ă© string") dest = Path(out_dir) / "assiduidade.ndjson" dest_path = save_ndjson(json_results, dest) return dest_path
m
✅ thought for 267.5 seconds
That error means your task hit its Prefect task timeout, not an HTTP timeout. The value comes from your @task decorator: timeout_seconds=APP_SETTINGS.CAMARA.TIMEOUT. If that setting is 120, Prefect will cancel the task after 120s of wall-clock runtime and raise TaskRunTimeoutError. Why this is happening - You’re doing a lot of work: building many URLs and fetching them concurrently, parsing HTML, and writing output. Even if each HTTP request has its own per-request timeout, the total task runtime can easily exceed 120s. - You’re reusing the same TIMEOUT value for both HTTP requests and the overall task. Those are very different time scales. How to fix - Increase the task timeout or disable it, and keep a separate (usually smaller) HTTP timeout. Options: 1) Raise the task timeout in the decorator
Copy code
@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=APP_SETTINGS.CAMARA.TASK_TIMEOUT_SECONDS,  # e.g., 900
)

and use a different setting for HTTP per-request timeout:
Copy code
htmls = await fetch_html_many_async(
    urls=urls,
    concurrency=APP_SETTINGS.CAMARA.CONCURRENCY,
    timeout=APP_SETTINGS.CAMARA.HTTP_TIMEOUT_SECONDS,  # e.g., 20–60
    progress_artifact_id=progress_id,
)
2) Override per submission
Copy code
assiduidade_f = extract_assiduidade_deputados.with_options(
    timeout_seconds=900  # or None to disable
).submit(deputados_f, legislatura)
3) Split the work into smaller tasks - Chunk the URLs and submit multiple tasks (e.g., 200 URLs per task). Each task finishes faster and stays below the timeout. - Or use a subflow to fan out and collect results, as discussed earlier. Helpful sanity checks - Log the number of URLs and your concurrency: - total_urls = len(deputados_ids) * len(leg_years) - With concurrency C and average per-request time T seconds, the lower bound is roughly ceil(total_urls / C) * T. Make sure your task timeout comfortably exceeds that. - Verify your years extraction is correct; the current line assumes a list at dados:
Copy code
# safer:
  leg_years = (legislatura.get("dados") or {}).get("anosPassados") or []
  # ensure ints
  leg_years = [int(y) for y in leg_years]
Using [...][0] will throw if dados isn’t a list. Summary - This is a Prefect task timeout. Increase timeout_seconds (or set to None) for the task, and keep a separate smaller HTTP timeout. - Consider chunking work or fanning out if total runtime is large.
g
@Marvin from pathlib import Path from prefect import task, get_run_logger from prefect.artifacts import ( acreate_progress_artifact, aupdate_progress_artifact, acreate_table_artifact ) from typing import cast import re from selectolax.parser import HTMLParser from utils.io import fetch_html_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def assiduidade_urls(deputados_ids: list[str], leg_years: list[int]) -> list[str]: return [ f"{APP_SETTINGS.CAMARA.PORTAL_BASE_URL}deputados/{id}/presenca-plenario/{year}" for id in deputados_ids for year in leg_years ] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=1800 ) async def extract_assiduidade_deputados( deputados_ids: list[str], legislatura: dict, out_dir: str | Path = "data/camara" ) -> str: logger = get_run_logger() leg_years = legislatura.get("dados", [])[0].get("anosPassados") progress_id = await acreate_progress_artifact( progress=0.0, description="Progresso do download da assiduiadde de deputados" ) urls = assiduidade_urls(deputados_ids, leg_years) logger.info(f"CĂąmara: buscando assiduidade de {len(deputados_ids)} deputados dos anos {leg_years}") htmls = await fetch_html_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, progress_artifact_id=progress_id ) await aupdate_progress_artifact( artifact_id=progress_id, progress=100.0, description="Downloads concluĂ­dos" ) href_pattern = re.compile(r'https://www\.camara\.leg\.br/deputados/\d+') *id_ano_pattern= r'/deputados/(?P<id>\d+)\?.ano=(?P<ano>\d+)' # Montando os resultados JSON e o artefato artifact_data = [] json_results = [] for html in htmls: tree = HTMLParser(cast(str, html)) all_links = tree.css("a") for link in all_links: href = link.attributes.get("href", "") if isinstance(href, str): if href_pattern.match(href): match = re.search(id_ano_pattern, href) if match: deputado_id = int(match.group('id')) year = int(match.group('ano')) json_results.append({ "deputado_id": deputado_id, "ano": year, "html": html }) tables = tree.css('table.table.table-bordered') name = tree.css_first("h1.titulo-internal") name_text = name.text(strip=True) if name else None artifact_row = { "id": deputado_id, "nome": name_text, "ano": year } if tables: artifact_row["possui_dados"] = "Sim" else: artifact_row["possui_dados"] = "NĂŁo" artifact_data.append(artifact_row) else: logger.warning(f"NĂŁo foram encontrados dados suficientes na pĂĄgina HTML") else: logger.warning(f"O href {href} nĂŁo Ă© string") await acreate_table_artifact( key="assiduidade", table=artifact_data, description="Assiduidade de deputados" ) dest = Path(out_dir) / "assiduidade.ndjson" dest_path = save_ndjson(json_results, dest) return dest_path I have this task. It downloads data based on year. I want to cache the results from past years because they dont change, i want only always fetch new data of the current year.
m
✅ thought for 147.4 seconds
Great use case. You can cache per-year so past years are reused while the current year is always refreshed using a custom cache key function. In Prefect 3.x you can do this either inline on the task via cache_key_fn, or by defining a reusable CachePolicy. Key idea - Return a stable key for past years (so they reuse cached results). - Return None for the current year (so Prefect skips cache for that run). - Optionally set cache_expiration for how long to keep cached entries. Option A: Use cache_key_fn directly on the task
Copy code
from datetime import date
from prefect import task
from prefect.utilities.hashing import hash_objects

def assiduidade_cache_key(context, params) -> str | None:
    # Expect params to include legislatura (dict) and deputados_ids (list[str])
    today_year = date.today().year
    leg_years = (params.get("legislatura") or {}).get("dados", {}).get("anosPassados") or []
    dep_ids = params.get("deputados_ids") or []

    # If any requested year is current year, skip caching this run entirely
    # (You could be more granular if you split per-year work into separate tasks)
    if any(int(y) == today_year for y in leg_years):
        return None

    # Otherwise, cache by the combination of dep_ids and leg_years
    # Normalize to strings and a stable ordering
    key = hash_objects(sorted(map(str, dep_ids)), sorted(int(y) for y in leg_years))
    return f"assiduidade:{key}"

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=1800,
    cache_key_fn=assiduidade_cache_key,
    cache_expiration=timedelta(days=30),  # adjust to taste
)
async def extract_assiduidade_deputados(...):
    ...
Option B: Define a CachePolicy (reusable)
Copy code
from datetime import date, timedelta
from prefect.cache_policies import CachePolicy
from prefect.utilities.hashing import hash_objects

def assiduidade_cache_key(context, params) -> str | None:
    today_year = date.today().year
    leg_years = (params.get("legislatura") or {}).get("dados", {}).get("anosPassados") or []
    dep_ids = params.get("deputados_ids") or []
    if any(int(y) == today_year for y in leg_years):
        return None
    key = hash_objects(sorted(map(str, dep_ids)), sorted(int(y) for y in leg_years))
    return f"assiduidade:{key}"

ASSIDUIDADE_CACHE = CachePolicy.from_cache_key_fn(
    assiduidade_cache_key,
    # optional: key_storage=..., isolation_level=..., lock_manager=...
)

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=1800,
    cache_policy=ASSIDUIDADE_CACHE,
    cache_expiration=timedelta(days=30),
)
async def extract_assiduidade_deputados(...):
    ...
Granular per-year caching (optional but cleaner) - Since you fetch multiple years in one task, skipping cache for “any current year present” disables cache entirely for that run. - If you want past years to be cached and only the current year to be fetched, split the work per-year: - Have the task accept a single year - Submit one task per year - Use cache_key_fn that keys by year; return None when year == current_year. Example:
Copy code
@task(cache_key_fn=lambda _, p: None if p["year"] == date.today().year else f"assiduidade:{p['year']}", cache_expiration=timedelta(days=180))
async def extract_assiduidade_year(deputados_ids: list[str], year: int) -> str:
    # do the same logic but for one year
    ...

# In your flow after you have dep_ids and leg_years:
fs = [extract_assiduidade_year.submit(dep_ids, y) for y in leg_years]
This gives you the best of both worlds: current-year runs fresh; past years reuse cache and don’t re-download. A few extra correctness tweaks for your current code - Safer access to years:
Copy code
leg_years = (legislatura.get("dados") or {}).get("anosPassados") or []
  leg_years = [int(y) for y in leg_years]
Your
[0]
indexing will crash if
dados
isn’t a list.
- Use the async table artifact creator (you already are) and await it. - Keep HTTP timeout separate from task timeout, as discussed earlier. Docs - Caching concepts: Caching concepts - Cache policies API: prefect.cache_policies - Hash helpers: prefect.utilities.hashing If you’d like, I can refactor your assiduidade task into the per-year pattern and wire it into your pipeline so only the current year runs fresh.
g
@Marvin from pathlib import Path from prefect import task, get_run_logger from prefect.artifacts import ( acreate_progress_artifact, aupdate_progress_artifact, acreate_table_artifact ) from typing import cast import re from selectolax.parser import HTMLParser from datetime import date, timedelta from utils.io import fetch_html_many_async, save_ndjson from config.loader import load_config APP_SETTINGS = load_config() def assiduidade_urls(deputados_ids: list[str], year: int) -> list[str]: return [ f"{APP_SETTINGS.CAMARA.PORTAL_BASE_URL}deputados/{id}/presenca-plenario/{year}" for id in deputados_ids ] @task( retries=APP_SETTINGS.CAMARA.RETRIES, retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY, timeout_seconds=1800, cache_key_fn=lambda _, year: None if year == date.today().year else f"assiduidade:{year}", cache_expiration=timedelta(days=180) ) async def extract_assiduidade_deputados( deputados_ids: list[str], legislatura_year: int, out_dir: str | Path = "data/camara" ) -> str: logger = get_run_logger() progress_id = await acreate_progress_artifact( progress=0.0, description="Progresso do download da assiduiadde de deputados" ) urls = assiduidade_urls(deputados_ids, legislatura_year) logger.info(f"CĂąmara: buscando assiduidade de {len(deputados_ids)} deputados do ano {legislatura_year}") htmls = await fetch_html_many_async( urls=urls, concurrency=APP_SETTINGS.CAMARA.CONCURRENCY, timeout=APP_SETTINGS.CAMARA.TIMEOUT, progress_artifact_id=progress_id ) await aupdate_progress_artifact( artifact_id=progress_id, progress=100.0, description="Downloads concluĂ­dos" ) href_pattern = re.compile(r'https://www\.camara\.leg\.br/deputados/\d+') *id_ano_pattern= r'/deputados/(?P<id>\d+)\?.ano=(?P<ano>\d+)' # Montando os resultados JSON e o artefato artifact_data = [] json_results = [] for html in htmls: tree = HTMLParser(cast(str, html)) all_links = tree.css("a") for link in all_links: href = link.attributes.get("href", "") if isinstance(href, str): if href_pattern.match(href): match = re.search(id_ano_pattern, href) if match: deputado_id = int(match.group('id')) year = int(match.group('ano')) json_results.append({ "deputado_id": deputado_id, "ano": year, "html": html }) tables = tree.css('table.table.table-bordered') name = tree.css_first("h1.titulo-internal") name_text = name.text(strip=True) if name else None artifact_row = { "id": deputado_id, "nome": name_text, "ano": year } if tables: artifact_row["possui_dados"] = "Sim" else: artifact_row["possui_dados"] = "NĂŁo" artifact_data.append(artifact_row) else: logger.warning(f"NĂŁo foram encontrados dados suficientes na pĂĄgina HTML") else: logger.warning(f"O href {href} nĂŁo Ă© string") await acreate_table_artifact( key="assiduidade", table=artifact_data, description="Assiduidade de deputados" ) dest = Path(out_dir) / "assiduidade.ndjson" dest_path = save_ndjson(json_results, dest) return dest_path And in my pipeline.py: # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) anos_passados = legislatura.get("dados", [])[0].get("anosPassados", []) assiduidade_f = [ extract_assiduidade_deputados.submit( cast(Any, deputados_f), ano ) for ano in anos_passados ] The process gives me an error: OSError: [WinError 123] A sintaxe do nome do arquivo, do nome do diretĂłrio ou do rĂłtulo do volume estĂĄ incorreta: "C:\\Users\\Gustavo Trabalho\\.prefect\\storage\\assiduidade:{'deputados_ids': [215043, 139... n error was encountered while committing transaction "assiduidade:{'deputados_ids': [215043, 139285, 157130, 73768, 227370, 210989,...
m
✅ thought for 35.3 seconds
You hit a Windows path error because your cache_key_fn is wrong. In Prefect 3.x, cache_key_fn receives (context, params_dict). In your code you wrote:
Copy code
cache_key_fn=lambda _, year: None if year == date.today().year else f"assiduidade:{year}"
Here, “year” is actually the entire params dict, so Prefect tries to use a cache key like:
Copy code
"assiduidade:{'deputados_ids': [...], 'legislatura_year': 2022, 'out_dir': '...'}"
That string contains braces, colons, commas, etc. Prefect stores cache data under ~/.prefect/storage/<cache_key>, so on Windows this becomes an invalid directory name and you get WinError 123. Fix: read the year from the params dict and return a filesystem-safe key. Also, prefer hashing inputs if you want the cache to reflect the full input set. Example (robust and Windows-safe)
Copy code
from datetime import date, timedelta
from prefect.utilities.hashing import hash_objects

def assiduidade_cache_key(_, params) -> str | None:
    # pull year from params dict
    year = int(params["legislatura_year"])
    # skip caching for the current year
    if year == date.today().year:
        return None

    # Include deputies list in the key so cache changes if inputs change
    dep_ids = [str(i) for i in params.get("deputados_ids", [])]
    deps_hash = hash_objects(sorted(dep_ids)) or "noids"

    # Return a short, filesystem-safe key (avoid :, {}, [], spaces, etc.)
    return f"assiduidade-{year}-{deps_hash}"

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=1800,
    cache_key_fn=assiduidade_cache_key,
    cache_expiration=timedelta(days=180),
)
async def extract_assiduidade_deputados(
    deputados_ids: list[str],
    legislatura_year: int,
    out_dir: str | Path = "data/camara",
) -> str:
    ...
If you truly want to cache only by year (regardless of which IDs you pass), you can simplify:
Copy code
@task(
    ...,
    cache_key_fn=lambda _, p: None if int(p["legislatura_year"]) == date.today().year
                              else f"assiduidade-{int(p['legislatura_year'])}",
    cache_expiration=timedelta(days=180)
)
Note the lambda’s second arg is p (the params dict), and the key avoids colons/braces. Why this is necessary - Prefect stores cache entries under a directory named by your key. Keys must be filesystem-safe, especially on Windows. - The cache_key_fn must accept (context, params_dict). Returning None skips caching for that run. Related docs - Caching concepts: Caching concepts - Cache policies API: prefect.cache_policies - Hash helpers: prefect.utilities.hashing If you want, I can also add a small validation that logs the computed cache key on task start, so you can quickly confirm it looks clean and behaves as expected.
g
@Marvin What about saving the ndjson? The three tasks are beign executed in parallel. Could it cause problems? Heres the function used to save the data: # Salva uma lista de JSONs em um Ășnico NDJson def save_ndjson(records: list[dict], dest_path: str | Path) -> str: dest_path = Path(dest_path) ensure_dir(dest_path.parent) with open(dest_path, "w", encoding="utf-8") as f: for rec in records: f.write(json.dumps(rec, ensure_ascii=False) + "\n") return str(dest_path)
m
✅ thought for 29.4 seconds
Short answer: yes — if multiple tasks write to the same NDJSON path concurrently, you can get races, truncation, or last-writer-wins. On Windows, it’s especially brittle. Make each task write to a unique file, then merge in a downstream step. Also, if each per‑year task is creating a table artifact with the same key, you’ll end up with multiple versions; move artifact creation to an aggregate step or key by year. Safe pattern - In each per-year task: - Write to a unique path (e.g., assiduidade_{year}.ndjson) - Use an atomic write (write to tmp then os.replace) - After all tasks complete: - Merge the per-year NDJSONs into a single file in a downstream task or in the flow - Create the single aggregate artifact there Atomic writer (per task)
Copy code
import os
from pathlib import Path
import json

def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    tmp_path = dest_path.with_suffix(dest_path.suffix + ".tmp")

    with open(tmp_path, "w", encoding="utf-8") as f:
        for rec in records:
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")

    # Atomic replace on the same filesystem
    os.replace(tmp_path, dest_path)
    return str(dest_path)
Use unique filenames per task
Copy code
# inside extract_assiduidade_deputados(...)
dest = Path(out_dir) / f"assiduidade_{legislatura_year}.ndjson"
dest_path = save_ndjson(json_results, dest)
return dest_path
Merge step (downstream)
Copy code
import os, shutil
from pathlib import Path

def merge_ndjson(inputs: list[str | Path], dest: str | Path) -> str:
    dest = Path(dest)
    dest.parent.mkdir(parents=True, exist_ok=True)
    tmp = dest.with_suffix(dest.suffix + ".tmp")

    with open(tmp, "w", encoding="utf-8") as out:
        for p in inputs:
            p = Path(p)
            if not p.exists():
                continue
            with open(p, "r", encoding="utf-8") as f:
                shutil.copyfileobj(f, out)

    os.replace(tmp, dest)
    return str(dest)
In your flow
Copy code
# submit one task per year (what you’re already doing)
assiduidade_fs = [
    extract_assiduidade_deputados.submit(cast(Any, deputados_f), ano)
    for ano in anos_passados
]

# later, merge all NDJSONs deterministically
from prefect.futures import resolve_futures_to_results
paths = resolve_futures_to_results(assiduidade_fs)
final_path = merge_ndjson(paths, Path("data/camara") / "assiduidade.ndjson")

# create a single aggregate artifact here if desired
from prefect.artifacts import create_table_artifact
# build an aggregated table across years, then:
# create_table_artifact(key="assiduidade", table=agg_rows, description="...")
If you really must write to a single file from multiple tasks - Open in append mode with a file lock. This is still risky across processes and can interleave lines if not locked properly. A lock file (e.g., using the filelock package) can coordinate, but the aggregate step is cleaner:
Copy code
from filelock import FileLock

def append_ndjson(records: list[dict], dest_path: str | Path):
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    lock = FileLock(str(dest_path) + ".lock")
    with lock:
        with open(dest_path, "a", encoding="utf-8") as f:
            for rec in records:
                f.write(json.dumps(rec, ensure_ascii=False) + "\n")
About artifacts - You’re calling acreate_table_artifact(key="assiduidade") inside each per-year task. That will publish multiple versions with the same key; the latest one wins in the UI. - Options: - Key per year (e.g., assiduidade-2022, assiduidade-2023), or - Move table creation to the merge/aggregate step and publish a single combined table once. This will keep your writes safe and your artifacts consistent while tasks run in parallel. Want me to refactor your pipeline snippet to include the merge step and a single aggregate artifact?
g
@Marvin import os from pathlib import Path import json def save_ndjson(records: list[dict], dest_path: str | Path) -> str: dest_path = Path(dest_path) dest_path.parent.mkdir(parents=True, exist_ok=True) tmp_path = dest_path.with_suffix(dest_path.suffix + ".tmp") with open(tmp_path, "w", encoding="utf-8") as f: for rec in records: f.write(json.dumps(rec, ensure_ascii=False) + "\n") # Atomic replace on the same filesystem os.replace(tmp_path, dest_path) return str(dest_path) This modification of the function. Explain to me Why it has a tmp_path and atomic replace. It wont affect other places where it uses the save_ndjson?
m
✅ thought for 20.9 seconds
Great question. The tmp file + atomic replace pattern is about safety and consistency when writing files. Why write to tmp first, then replace? - Prevents partial files: If the process crashes or is interrupted mid-write, the destination file is never left half-written. Either the old file remains or the fully-written new file appears. - Readers never see in-progress writes: Other code that opens the destination path will either get the previous complete file or, after the replace, the new complete file. No torn/truncated content. - Atomic swap: On the same filesystem/volume, os.replace performs an atomic rename that replaces the target. This gives “all-or-nothing” behavior at the destination path. Will it affect other places that use save_ndjson? - Function signature is unchanged — it’s a drop-in replacement. - The only observable behavior change is timing: previously the file existed during the write; now it “appears” only when fully written. If anything was tailing/reading while you wrote line-by-line, it will now only see the file after completion. For most pipelines, that’s an improvement. - It does not fix concurrent writers to the same destination. If two tasks write the same dest concurrently, you still have last-writer-wins. The pattern prevents partial/truncated files but does not serialize concurrent writes. One refinement to avoid tmp-file collisions If there’s any chance two writers could target the same dest concurrently, use a unique tmp filename so they don’t both write to the same .tmp file before the replace:
Copy code
import os, json
from pathlib import Path
from uuid import uuid4

def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    # unique tmp file in the same directory to keep replace atomic
    tmp_path = dest_path.with_suffix(dest_path.suffix + f".{os.getpid()}.{uuid4().hex}.tmp")

    try:
        with open(tmp_path, "w", encoding="utf-8") as f:
            for rec in records:
                f.write(json.dumps(rec, ensure_ascii=False) + "\n")
        os.replace(tmp_path, dest_path)  # atomic swap on same filesystem
    finally:
        # best-effort cleanup if something went wrong before replace
        if tmp_path.exists():
            try:
                tmp_path.unlink()
            except OSError:
                pass

    return str(dest_path)
Notes - Keep the tmp file on the same filesystem/drive as dest; atomic replace semantics depend on that. - On Windows/NTFS, os.replace is atomic, but replacing a file that another process has open in exclusive mode can raise an error. Generally fine for standard usage. - For truly concurrent append-by-many-writers, use an append function with a file lock (e.g., filelock). The aggregate-then-merge pattern you’re using is safer and simpler. So, no adverse impact for typical usage — it just makes writes safer. If you have any readers that depended on reading lines while the file was being written, they’ll now need to wait until the write completes.
g
@Marvin from prefect import flow, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner from prefect.futures import resolve_futures_to_results from datetime import date, datetime from typing import Any, cast from pathlib import Path from tasks.tse import TSE_ENDPOINTS, extract_tse from tasks.camara.legislatura import extract_legislatura from tasks.camara.deputados import extract_deputados from tasks.camara.frentes import extract_frentes from tasks.camara.frentes_membros import extract_frentes_membros from tasks.camara.assiduidade import extract_assiduidade_deputados from utils.io import merge_ndjson from config.loader import load_config APP_SETTINGS = load_config() # IMPORTAR TASKS TSE, CONGRESSO, SENADO ETC... @flow( task_runner=ThreadPoolTaskRunner(max_workers=APP_SETTINGS.FLOW.MAX_RUNNERS), # type: ignore log_prints=True ) async def pipeline( date: date = datetime.now().date(), refresh_cache: bool = False ): logger = get_run_logger() logger.info("Iniciando pipeline") # TSE: ~30 endpoints em paralelo tse_fs = [ cast(Any, extract_tse) .with_options(refresh_cache=refresh_cache) .submit(name, url) for name, url in TSE_ENDPOINTS.items() ] # CONGRESSO legislatura = extract_legislatura(date) deputados_f = extract_deputados.submit(legislatura) anos_passados = legislatura.get("dados", [])[0].get("anosPassados", []) ### ASSIDUIDADE assiduidade_fs = [ extract_assiduidade_deputados.submit(cast(Any, deputados_f), ano) for ano in anos_passados ] paths = resolve_futures_to_results(assiduidade_fs) final_path = merge_ndjson(paths, Path("data/camara") / "assiduidade.ndjson") ###### frentes_f = extract_frentes.submit(legislatura) frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f)) return resolve_futures_to_results({ "tse": tse_fs, "congresso_deputados": deputados_f, "congresso_assiduidade": assiduidade_fs, "congresso_frentes": frentes_f, "congresso_frentes_membros": frentes_membros_f, }) if name == "__main__": pipeline.serve( # type: ignore name="deploy-1" ) I have applied the solution. This is how my flow is configured. Analyze it
m
✅ thought for 37.3 seconds
Overall, you’re very close. A few important fixes and optimizations: 1) Fix the cache_key_fn signature - cache_key_fn receives (context, params_dict). In your task you used lambda _, year: 
 which treats the entire params dict as year and produced invalid cache keys earlier. - Use a params dict and return a filesystem-safe key. Example
Copy code
from datetime import date, timedelta

def assiduidade_cache_key(_, p) -> str | None:
    year = int(p["legislatura_year"])
    if year == date.today().year:
        return None  # skip cache for current year
    return f"assiduidade-{year}"  # short, filesystem-safe key

@task(
    retries=APP_SETTINGS.CAMARA.RETRIES,
    retry_delay_seconds=APP_SETTINGS.CAMARA.RETRY_DELAY,
    timeout_seconds=1800,
    cache_key_fn=assiduidade_cache_key,
    cache_expiration=timedelta(days=180),
)
async def extract_assiduidade_deputados(...):
    ...
2) Write a unique NDJSON per year inside the task - Right now each per-year task writes to the same "assiduidade.ndjson", so the last writer wins. Return a per-year file and merge later (you already have merge_ndjson downstream). Inside the task
Copy code
dest = Path(out_dir) / f"assiduidade_{legislatura_year}.ndjson"
dest_path = save_ndjson(json_results, dest)
return dest_path
3) Don’t block the flow before submitting other branches - You call resolve_futures_to_results(assiduidade_fs) before submitting frentes/frentes_membros. That pauses scheduling of those tasks until assiduidade is fully done. - Submit frentes/frentes_membros first, then resolve assiduidade when you actually need the paths (or make merge a task). Example reordering
Copy code
# submit assiduidade tasks
assiduidade_fs = [
    extract_assiduidade_deputados.submit(cast(Any, deputados_f), ano)
    for ano in anos_passados
]

# submit independent branch BEFORE waiting
frentes_f = extract_frentes.submit(legislatura)
frentes_membros_f = extract_frentes_membros.submit(cast(Any, frentes_f))

# now resolve and merge
paths = resolve_futures_to_results(assiduidade_fs)
final_path = merge_ndjson(paths, Path("data/camara") / "assiduidade.ndjson")
Optional: make merge_ndjson a @task so it shows up in the UI and runs on the task runner:
Copy code
@task
def merge_ndjson_task(paths: list[str], dest: str | Path) -> str:
    return merge_ndjson(paths, dest)

merged_f = merge_ndjson_task.submit(assiduidade_fs, Path("data/camara") / "assiduidade.ndjson")
4) Artifacts - If you publish a table artifact per-year inside the task, use a per-year key to avoid overwriting versions immediately:
Copy code
await acreate_table_artifact(
    key=f"assiduidade-{legislatura_year}",
    table=artifact_data,
    description=f"Assiduidade de deputados ({legislatura_year})",
)
- If you want one combined table, move table creation to the merge/aggregate step instead. 5) Safer anos_passados extraction - Your pipeline uses legislatura.get("dados", [])[0] which assumes a list. If dados is a dict, this will raise.
Copy code
dados = (legislatura.get("dados") or {})
anos_passados = dados.get("anosPassados", [])
6) Optional: force-refresh current-year when needed - You can bypass cache per-run by setting refresh_cache at submit:
Copy code
extract_assiduidade_deputados.with_options(refresh_cache=refresh_cache).submit(...)
Quick checklist - Unique per-year filenames in the task: yes - Cache key uses params dict and is filesystem-safe: yes - Submit independent branches before resolving futures: yes - Table artifacts either per-year key or created once in aggregate: choose one - Safer anos_passados access: yes References - Caching: Caching concepts - Artifacts: Artifacts how-to
g
@Marvin I did it all but i got the error: FileNotFoundError: [WinError 2] O sistema nĂŁo pode encontrar o arquivo especificado: 'data\\camara\\assiduidade_2025.ndjson.tmp' -> 'data\\camara\\assiduidade_2025.ndjson'
m
✅ thought for 43.4 seconds
Thanks for the details. That Windows FileNotFoundError on os.replace usually means the tmp file didn’t exist at the moment of the replace. The most common cause in parallel pipelines is a tmp filename collision: two writers targeting the same destination path use the same tmp name (e.g., .ndjson.tmp). One finishes first and moves the tmp into place, removing the tmp path; the other then tries to replace a tmp that’s no longer there. Why this can happen in your case - You’re running multiple tasks in parallel. If more than one task ends up writing to the same final file (e.g., same year) at the same time (due to duplicate years, retries, or concurrent flows), they’ll share the same tmp filename when using .with_suffix(".tmp"). - Even with per-year outputs, duplicates or concurrent runs can cause two writers for the same year. Fix 1 — Use a unique tmp filename per writer This prevents tmp collisions while keeping the replace atomic.
Copy code
import os, json
from pathlib import Path
from uuid import uuid4

def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    # unique tmp in same directory (same filesystem) to keep replace atomic
    tmp_path = dest_path.with_name(dest_path.name + f".{os.getpid()}.{uuid4().hex}.tmp")

    try:
        with open(tmp_path, "w", encoding="utf-8") as f:
            for rec in records:
                f.write(json.dumps(rec, ensure_ascii=False) + "\n")
        os.replace(tmp_path, dest_path)  # atomic swap on same volume
    finally:
        # best-effort cleanup if something went wrong before replace
        if tmp_path.exists():
            try:
                tmp_path.unlink()
            except OSError:
                pass

    return str(dest_path)
Fix 2 — Ensure you don’t spawn duplicate writers - Deduplicate years before submitting tasks: -
anos_passados = sorted(set(anos_passados))
- If you run multiple flow runs concurrently (or task retries can overlap), make the output path unique per run to avoid cross-run conflicts, e.g., include flow run id or a timestamp: -
assiduidade_{year}_{flow_run_id}.ndjson
- Then merge downstream into a canonical file. Fix 3 — Keep one writer per final file - You’re already writing per-year and merging later — that’s good. - Double-check that each per-year task writes to
assiduidade_{year}.ndjson
(not all to the same filename). - If you ever need multiple writers to append to a single file, use a file lock and append mode instead, but the aggregate-then-merge pattern is simpler and safer. Optional diagnostics - Log the tmp_path and dest before the replace; if the error repeats, check for duplicate submissions for the same year in logs. - After the write block, add
assert Path(tmp_path).exists()
to catch early if the tmp is missing. This change is drop-in: it won’t affect other callers except that files will “appear” only when fully written (safer) and tmp files are unique per writer.
g
@Marvin Since ive applied your changes to save_ndjson im getting erros where it usess: def save_ndjson(records: list[dict], dest_path: str | Path) -> str: dest_path = Path(dest_path) dest_path.parent.mkdir(parents=True, exist_ok=True) tmp_path = dest_path.with_suffix(dest_path.suffix + ".tmp") with open(dest_path, "w", encoding="utf-8") as f: for rec in records: f.write(json.dumps(rec, ensure_ascii=False) + "\n") os.replace(tmp_path, dest_path) return str(dest_path) FileNotFoundError: [WinError 2] O sistema nĂŁo pode encontrar o arquivo especificado: 'data\\camara\\frentes.ndjson.tmp' -> 'data\\camara\\frentes.ndjson'
m
✅ thought for 9.9 seconds
You’re seeing that error because your function creates a tmp_path but writes to dest_path, then tries to replace tmp_path → dest_path. Since tmp_path was never created, os.replace raises FileNotFoundError. Fix: write to tmp_path, then replace it into dest_path. Minimal atomic version
Copy code
import os, json
from pathlib import Path

def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    tmp_path = dest_path.with_suffix(dest_path.suffix + ".tmp")

    with open(tmp_path, "w", encoding="utf-8") as f:
        for rec in records:
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")

    os.replace(tmp_path, dest_path)  # atomic on same filesystem
    return str(dest_path)
More robust (avoid tmp collisions when multiple writers target the same dest)
Copy code
import os, json
from pathlib import Path
from uuid import uuid4

def save_ndjson(records: list[dict], dest_path: str | Path) -> str:
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    tmp_path = dest_path.with_name(dest_path.name + f".{os.getpid()}.{uuid4().hex}.tmp")

    try:
        with open(tmp_path, "w", encoding="utf-8") as f:
            for rec in records:
                f.write(json.dumps(rec, ensure_ascii=False) + "\n")
        os.replace(tmp_path, dest_path)
    finally:
        if tmp_path.exists():
            try:
                tmp_path.unlink()
            except OSError:
                pass
    return str(dest_path)
Notes - Keep tmp in the same directory as dest so os.replace stays atomic. - This pattern doesn’t serialize concurrent writers; it just prevents partial files. If multiple tasks write the same dest concurrently, last-writer-wins. Use unique filenames per task (e.g., per-year) and merge later, as you’re doing. - The frentes.ndjson path in your error suggests another task may still be writing to a single shared filename; ensure each parallel task writes a unique file or you coordinate writes.