Simon Ouellette
10/23/2025, 4:13 PMBen
10/23/2025, 4:20 PMfor loop. E.g.
for object in objects:
data = tasks.operation1(data=object)
tasks.operation2(data=data)
I'm reading the docs which mention using either .submit() or .map() to run tasks concurrently, however i'm not sure how to access the data returned and pass them to a secondary task (which can also be run concurrently if possible)skrawczyk
10/23/2025, 5:24 PMJanet Carson
10/24/2025, 5:45 PMMarcelo de Souza
10/24/2025, 8:07 PMMitch Zink
10/24/2025, 10:52 PMTushar Mahajan
10/25/2025, 8:15 PMkubernetes type work pool and trying to pass env variables using the method described here (via prefect yaml) -> https://github.com/zzstoatzz/prefect-pack/blob/4cfc53ae94c6bd65ff0eefcb1c5ce72b48446179/prefect.yaml#L106
But I always get env variable not found error, anything I might be missing here ?Jeff Rose
10/28/2025, 5:58 PMBen
10/30/2025, 9:23 PMprefect deploy, I get this error:
prefect.exceptions.ScriptError: Script at 'src/providers/Tapology/flows/data_ingestion.py' encountered an exception: TypeError('Result storage configuration must be persisted server-side. Please call `.save()` on your block before passing it in.')
An exception occurred.
The thing is that I have created the storage block directly in the Prefect web UI, and I'm loading it in my code, something like this:
from prefect import flow
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner
# Load the storage block you saved previously
r2_storage = RemoteFileSystem.load("r2-persistent-storage")
# Choose serializer for your payloads
json_ser = JSONSerializer()
@flow(
log_prints=True,
task_runner=ThreadPoolTaskRunner(max_workers=3),
persist_result=True,
result_storage=r2_storage,
result_serializer=json_ser,
)
def my_flow_function():
[...]
Another thing to note is that this code is working for already existing deployments, the error is only thrown when trying to deploy a new deployment.
Any idea whats going on?Tom Collom
10/31/2025, 8:32 AMMiguel Moncada
10/31/2025, 9:23 AMUnable to authenticate to the event stream. Please ensure the provided api_key or auth_token you are using is valid for this environment. Reason: Actor is not authenticated
Do you know what could be going on?Pierre L
10/31/2025, 10:57 AMdate.date object. Same error when using a datetime.datetime .
Here is my exact error :
Task run failed with exception: PrefectHTTPStatusError('Client error \'409 Conflict\' for url \'<http://prefect-server.prefectoss.svc.cluster.local:4200/api/deployments/d6150396-f839-4084-8091-401f5a6c53ca/create_flow_run>\'\nResponse:
{\'detail\': "Error creating flow run: Validation failed for field \'time_start\'. Failure reason: 1704067200.0 is not of type \'string\'"}\n
The bug doesnt occur when using Prefect cloud. Why ?Ahmad Bilal Khalid
11/03/2025, 12:01 PMfrom datetime import date
from workflows.flows.etl import extraction_and_transformation
if __name__ == "__main__":
extraction_and_transformation.serve(
name="Extraction Transformation Loading",
parameters={
"source_name": "ABC",
"database_export_date": date(2025, 8, 27),
"bucket_name": "data-migration",
},
)
workflows/flows/etl.py
@flow(name="Data Extraction and Transformation")
def extraction_and_transformation(
source_name: str, database_export_date: date, bucket_name: str
):
from prefect.runtime import flow_run, deployment, task_run
print("prefect version", prefect.__version__)
print("Deployment Parameters", deployment.parameters)
print("Flow Run Parameters", flow_run.parameters)
print("Task Run Parameters", task_run.parameters)
print("Flow Run Parent Deployment ID", flow_run.parent_deployment_id)
prefect version 3.4.20
Deployment Parameters {'bucket_name': 'data-migration', 'source_name': 'ABC', 'database_export_date': '2025-08-27'}
Flow Run Parameters {'source_name': 'ABC', 'database_export_date': datetime.date(2025, 8, 27), 'bucket_name': 'data-migration'}
Task Run Parameters {}
prefect version 3.4.21
Deployment Parameters {}
Flow Run Parameters {'source_name': 'ABC', 'database_export_date': datetime.date(2025, 8, 27), 'bucket_name': 'data-migration'}
Task Run Parameters {}Mattia Sappa
11/04/2025, 10:22 AMERROR | prefect.server.events.services.event_persister - Error trimming events and resources?José Agretti
11/04/2025, 3:20 PMraj
11/05/2025, 11:58 AMraj
11/05/2025, 12:00 PMRECV_PING) on a closed connection, which is invalid according to the HTTP/2 protocol specification.
At the core, the *`httpx` client*—used internally by *Prefect's Cloud API client*—attempted to send a PING frame (RECV_PING) on an HTTP/2 connection that had already transitioned to a CLOSED state. This caused the underlying httpcore transport layer to raise a LocalProtocolError due to the mismatch in expected connection state.
🧾 Root Cause Analysis (RCA):
• The system attempted to reuse an HTTP/2 connection that had already been closed, leading to an invalid protocol state.
• This is most likely due to a stale connection being kept in the async connection pool or a bug in the connection lifecycle management by httpcore.
• Prefect’s cloud client attempted communication over this dead connection, resulting in failure.Øyvind Monsen
11/05/2025, 3:34 PMArthur Ren
11/05/2025, 5:23 PMSlackbot
11/06/2025, 11:34 AMSergio Luceno
11/06/2025, 11:38 AMVít Brunner
11/06/2025, 1:34 PMArthur Ren
11/07/2025, 2:34 AMKiran
11/07/2025, 5:49 AMPierre L
11/07/2025, 9:41 AMGiacomo Chiarella
11/07/2025, 2:26 PMmy_task = task.submit()
wait for it to finish and
my_task.state
how should I change the last statement to solve the warning? Same question related to my_task.task_run_idAmrit Amar
11/07/2025, 4:56 PMprefect-aws library and I'm excited to start using it, however most of the patterns for running flows seem to evolve around using another EC2 instance.
I was wondering if it was possible to have a Prefect server as an orchestrator (on Fargate/EC2) that would run defined flows and deploy a 'task' on a lambda (I would have already configured the lambda, just need to send a task to it and then retrieve results to pass onto the next task which would be another lambda call). I see that lambda invoke that can happen 'within' a task but is it possible deploy the entire task as a lambda itself rather than invoking it from the main prefect instance?
Also, will the workers, if invoking a lambda from the main instance, actually wait the full 15 minutes for a response from the lambda or do I need to poll the status of the lambda job with another task? And is this scalable to having 1 prefect server instance with 4 defined flows having 10s-100s of concurrent flows running?
My main use case is to have an orchestrator that, upon running a flow with defined tasks, will send info to a lambda, and then get the result from it to pass it on to the next task until the end of a flow.
Thank you! (@Marvin)Quinton Nickum
11/07/2025, 9:46 PMAaron
11/08/2025, 11:56 AMRiya Sinha
11/08/2025, 7:33 PM^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 1079, in create_flow_run
return await client.create_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 568, in create_flow_run
response = await self.request("POST", "/flow_runs/", json=flow_run_create_json)
full trace in thread to avoid large message — just wanted to ask if this is an issue others are currently facing recently