Python 3.10 despite the tuple typing annotation th...
# prefect-server
s
Python 3.10 despite the tuple typing annotation there is still
TypeError: Task is not iterable
exception.
Copy code
@task(timeout=timedelta(seconds=30))
def fetch_symbols(...) -> tuple[MyCustomType, ...]:
  ...
k
This is Orion right? Or Prefect 1.0?
The annotation is not read for Prefect 1 so you need to specify the number of output values
s
Copy code
poetry show prefect
name         : prefect
version      : 1.1.0
description  : The Prefect Core automation and scheduling engine.
I find it weird that Prefect server does not have a command to show its version
I cannot specify the precise number as it is dependent on the previous task that returns a homogeneous tuple of an unknown amount of items.
k
What are you trying to do downstream? Are you using a for loop or accessing the tuple by index?
s
Yes, loopin in batches to map other task
k
You can’t loop inside the Flow because that list is not defined when you are constructing the DAG. It only exists during flow run time. So in order to construct the DAG, you need to use mapping which is similar to looping. For loops are executed during build time which is why they fail. Mapping is executed during runtime
s
So no logic can be implemented in the flow without extra hoops or wrapping into another tasks?
k
You can in some extreme cases, but normally no. I would say it’s preferred to put the logic inside tasks but you can’t do stuff like using the native Python
if
,
while
,
for
inside the Flow block to loop over task results
s
Can I submit tasks dynamically?
k
Definitely not in Prefect 1.0 because that violates the DAG concept. The only dynamicism available is mapping and task looping, but in Prefect 2.0 we removed the DAG requirement so you can do dynamic task submission
s
I mean after I get a list of items, I would like to submit them in parallel for other tasks
k
Yeah that would be the use case of mapping
s
I am using local run, do I need a special configuration to see actual parallelism?
k
set
flow.executor = LocalDaskExecutor()
. You can see this
The default executor is just single-threaded synchronous so you need LocalDaskExecutor for parallelism. For 2.0, the default executor is a ConcurrentTaskRunenr