  Robert Kowalski

2 months ago
Hi, quick question, why
``map``
function fail? and works when
``map``
was replace with
``bind``
?
``````from prefect import task, Flow, unmapped

def first():
return range(10)

def second():
return True

def third(numbers):
print(numbers)

with Flow(name='test') as flow:
numbers = first()
second_result = second()

flow.run()``````  Kevin Kho

2 months ago
This is confusing. What do you want to map over in
``third``
? Maybe you just need:
``third(numbers, upstream_tasks=[second_result])``
Is your third meant to print the list or each individual item?  Robert Kowalski

2 months ago
Even the unmapped was removed, flow finished with state failed
if remove upstream_tasks from third task , flow have status success but dependency between tasks are removed
I don;t understand why
``map``
cause errors. It is possible to enable more logs ?  Kevin Kho

2 months ago
Why are you mapping but using unmapped with
``numbers``
? There is something wrong there, but I can’t tell what you are trying to do. Do you want
``third``
to run 10 times or 1 time?  Robert Kowalski

2 months ago
@Kevin Kho in this example are no matter what tasks should or can do, the point is why
``map``
with upstream_tasks generate error
``````from prefect import task, Flow

def second():
return True

def third(numbers):
print(numbers)

with Flow(name='test') as flow:
second_result = second()
# third.bind([1, 2, 3], upstream_tasks=[second_result]) # works and print list of ints
third.map([1, 2, 3], upstream_tasks=[second_result]) # dont work  and i dont now why ?
# third.map([1, 2, 3], )  # works and print a single number in each mapped task

flow.run()``````  Kevin Kho

2 months ago
For this one can you try:
``third.map([1, 2, 3], upstream_tasks=[unmapped(second_result)])``  Robert Kowalski

2 months ago
ok, works, but It is weird, second task is binding an return only one value
``True``
why the
``second_result``
mustbe wrapped by unmapped() function when is use as parameter in upstream_tasks?  Kevin Kho

2 months ago
Because mapping will try to connect each element of the list to the upstream element so that it can execute is a depth first way. If
``second()``
is also mapped (which is assumed), then
``third-1``
can start as soon as
``second-1``
starts without needing to wait for
``second-2``
and
``second-3``
. Check this  Robert Kowalski

2 months ago
Very similar problem😃 thanks, I understand it better now. One more question 🙂
``````from datetime import timedelta

from prefect import task, Flow
from prefect.triggers import any_failed

def first():
# 1/0
return list(range(10))

def second():
print('ERROR')

with Flow(name='test') as flow:
numbers = first()

flow.run()``````
I want run
``second``
task only if all retries of task
``first``
``second``
should not be executed when
``first``
tasks success ( on first, second third time no matter ), and flow should be success even the
``second``
will not be executed (
``first``
success ). In this example second tasks generate TriggerError when are not executed. Maybe new trigger type
``run_only_any_failed``
that not generate error and finished with success status ? or You know better solution ? I know about
``on_failure``
parameter in Task class, but callable is execute on every retries of task
``first``  Kevin Kho

2 months ago
The easier part here is the flow success/fail. There is this thing called reference tasks that let you determine which one determines the flow state
Ah wait I just saw the end you did have it
I think I know what you want one second, let me make an example
``````from prefect import Flow, task
import prefect
from datetime import timedelta

def mystatehandler(task, old_state, new_state):
new_state.is_failed()):
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
return new_state

def abc(x):
if x == 2:
raise ValueError()
return 1

with Flow("ecs test") as flow:
abc.map([1,2,3])

flow.run()``````
Try this code, it will log the error in the state handler if the task fails  Robert Kowalski

2 months ago
Nice, It is definitely better solution 🙂
``<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)``
why is empty ?
if change map([1,2,3]) to .bind(2)  Kevin Kho

2 months ago
Uhh I would say just don’t use
``.bind()``
. It’s really not user-facing, it happens under the hood
just call the task directly
``````with Flow(..) as flow:
abc()``````  Robert Kowalski

2 months ago
still is empty
``````def mystatehandler(task, old_state, new_state):
new_state.is_failed()):
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
return new_state

....

with Flow("ecs test") as flow:
abc(x=2)``````  Kevin Kho

2 months ago
Parameters is for the ones defined in like this:
``````with Flow("ecs test") as flow:
x = Parameter("x", 2)``````
Not the inputs into the task  Robert Kowalski

2 months ago
=oh, thanks for Your help 🙂