https://prefect.io logo
Title
s

Slava Shor

03/15/2022, 5:28 PM
Python 3.10 despite the tuple typing annotation there is still
TypeError: Task is not iterable
exception.
@task(timeout=timedelta(seconds=30))
def fetch_symbols(...) -> tuple[MyCustomType, ...]:
  ...
k

Kevin Kho

03/15/2022, 5:32 PM
This is Orion right? Or Prefect 1.0?
The annotation is not read for Prefect 1 so you need to specify the number of output values
s

Slava Shor

03/15/2022, 7:47 PM
poetry show prefect
name         : prefect
version      : 1.1.0
description  : The Prefect Core automation and scheduling engine.
I find it weird that Prefect server does not have a command to show its version
I cannot specify the precise number as it is dependent on the previous task that returns a homogeneous tuple of an unknown amount of items.
k

Kevin Kho

03/15/2022, 7:54 PM
What are you trying to do downstream? Are you using a for loop or accessing the tuple by index?
s

Slava Shor

03/15/2022, 7:58 PM
Yes, loopin in batches to map other task
k

Kevin Kho

03/15/2022, 8:01 PM
You can’t loop inside the Flow because that list is not defined when you are constructing the DAG. It only exists during flow run time. So in order to construct the DAG, you need to use mapping which is similar to looping. For loops are executed during build time which is why they fail. Mapping is executed during runtime
s

Slava Shor

03/15/2022, 8:02 PM
So no logic can be implemented in the flow without extra hoops or wrapping into another tasks?
k

Kevin Kho

03/15/2022, 8:06 PM
You can in some extreme cases, but normally no. I would say it’s preferred to put the logic inside tasks but you can’t do stuff like using the native Python
if
,
while
,
for
inside the Flow block to loop over task results
s

Slava Shor

03/15/2022, 8:16 PM
Can I submit tasks dynamically?
k

Kevin Kho

03/15/2022, 8:18 PM
Definitely not in Prefect 1.0 because that violates the DAG concept. The only dynamicism available is mapping and task looping, but in Prefect 2.0 we removed the DAG requirement so you can do dynamic task submission
s

Slava Shor

03/15/2022, 8:18 PM
I mean after I get a list of items, I would like to submit them in parallel for other tasks
k

Kevin Kho

03/15/2022, 8:19 PM
Yeah that would be the use case of mapping
s

Slava Shor

03/15/2022, 8:20 PM
I am using local run, do I need a special configuration to see actual parallelism?
k

Kevin Kho

03/15/2022, 8:21 PM
set
flow.executor = LocalDaskExecutor()
. You can see this
The default executor is just single-threaded synchronous so you need LocalDaskExecutor for parallelism. For 2.0, the default executor is a ConcurrentTaskRunenr