David Ojeda
04/21/2020, 12:20 PMfrom prefect import Flow, Task
class Generate(Task):
def run(self):
return list(range(10))
class GetConfig(Task):
def run(self):
return {'offset': 1000}
class Process(Task):
def run(self, *, data):
result = data + 1000
<http://self.logger.info|self.logger.info>('Process of %d = %d', data, result)
return result
generate = Generate()
config = GetConfig()
process = Process()
with Flow('flow1') as flow1:
dataset = generate()
config_result = config()
clean = process.map(data=dataset)
clean.set_upstream(config_result)
print('Flow1 edges:', flow1.edges)
flow1.run()
with Flow('flow2') as flow2:
dataset = generate()
config_result = config()
clean = process.map(data=dataset, upstream_tasks=[config_result])
print('Flow2 edges:', flow2.edges)
flow2.run()
The traceback is:
[2020-04-21 12:16:45,325] INFO - prefect.TaskRunner | Task 'Process': Starting task run...
[2020-04-21 12:16:45,325] ERROR - prefect.TaskRunner | Task 'Process': unexpected error while running task: KeyError(0)
Traceback (most recent call last):
File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 256, in run
state = self.run_mapped_task(
File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 721, in run_mapped_task
upstream_state.result[i],
KeyError: 0
emre
04/21/2020, 12:47 PMmapped=True
in flow1's set_upstream
, prefect will attemp to map over the result GetConfig
, which will result in the same error as flow2.
In imperative api, set_upstream()
defaults to mapped=False
. In this case this was what you wanted.
In functional api, process.map()
by default tries to map over the input tasks results. Because, you know, it is called map. To disable this, encapsulate tasks that you don’t want to be mapped with unmapped()
.
from prefect import Flow, Task, unmapped
with Flow('flow2') as flow2:
dataset = generate()
config_result = config()
clean = process.map(data=dataset, upstream_tasks=[unmapped(config_result)])
David Ojeda
04/21/2020, 12:51 PMunmapped
and this is the first time I encounter this problem.Jeremiah
04/21/2020, 1:17 PMupstream_tasks
represented iterables, so the mapping logic worked? As @emre said, prefect assumes all arguments to .map()
are “mappable” (iterable) unless wrapped in `unmapped`; all arguments to set_upstream
are assumed to be unmapped unless mapped=True.
Zachary Hughes
04/21/2020, 1:19 PMDavid Ojeda
04/21/2020, 3:16 PMZachary Hughes
04/21/2020, 3:30 PM