https://prefect.io logo
Title
d

David Ojeda

04/21/2020, 12:20 PM
Hi, I am digging a bit on a bug and while doing a minimum example I found something strange. Am I using upstream_tasks on a map correctly? On the following example, “flow2” fails (this is how I am used to declare upstream tasks on a map) but “flow1" works:
from prefect import Flow, Task


class Generate(Task):
    def run(self):
        return list(range(10))


class GetConfig(Task):
    def run(self):
        return {'offset': 1000}


class Process(Task):
    def run(self, *, data):
        result = data + 1000
        <http://self.logger.info|self.logger.info>('Process of %d = %d', data,  result)
        return result


generate = Generate()
config = GetConfig()
process = Process()

with Flow('flow1') as flow1:
    dataset = generate()
    config_result = config()
    clean = process.map(data=dataset)
    clean.set_upstream(config_result)


print('Flow1 edges:', flow1.edges)
flow1.run()


with Flow('flow2') as flow2:
    dataset = generate()
    config_result = config()
    clean = process.map(data=dataset, upstream_tasks=[config_result])


print('Flow2 edges:', flow2.edges)
flow2.run()
The traceback is:
[2020-04-21 12:16:45,325] INFO - prefect.TaskRunner | Task 'Process': Starting task run...
[2020-04-21 12:16:45,325] ERROR - prefect.TaskRunner | Task 'Process': unexpected error while running task: KeyError(0)
Traceback (most recent call last):
  File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 256, in run
    state = self.run_mapped_task(
  File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 721, in run_mapped_task
    upstream_state.result[i],
KeyError: 0
e

emre

04/21/2020, 12:47 PM
If you set
mapped=True
in flow1's
set_upstream
, prefect will attemp to map over the result
GetConfig
, which will result in the same error as flow2. In imperative api,
set_upstream()
defaults to
mapped=False
. In this case this was what you wanted. In functional api,
process.map()
by default tries to map over the input tasks results. Because, you know, it is called map. To disable this, encapsulate tasks that you don’t want to be mapped with
unmapped()
.
from prefect import Flow, Task, unmapped
with Flow('flow2') as flow2:
    dataset = generate()
    config_result = config()
    clean = process.map(data=dataset, upstream_tasks=[unmapped(config_result)])
d

David Ojeda

04/21/2020, 12:51 PM
Right on the nail… thanks a lot. Was this changed recently? I have many flows that use the approach without
unmapped
and this is the first time I encounter this problem.
j

Jeremiah

04/21/2020, 1:17 PM
I don’t think this behavior has changed, but perhaps the tasks you were passing to
upstream_tasks
represented iterables, so the mapping logic worked? As @emre said, prefect assumes all arguments to
.map()
are “mappable” (iterable) unless wrapped in `unmapped`; all arguments to
set_upstream
are assumed to be unmapped unless
mapped=True.
z

Zachary Hughes

04/21/2020, 1:19 PM
Just finished following up with our Core folks-- sounds like we haven't changed this behavior recently. But Emre and Jeremiah are bang-on with their answers.
d

David Ojeda

04/21/2020, 3:16 PM
Ok thanks for the info. I will update my flows to account for this. I still have one bug lurking around and it may or may not be related to this.
z

Zachary Hughes

04/21/2020, 3:30 PM
Sounds good. Happy to help with anything you have remaining-- just give us a shout!