Hi, quick question, why `map` function fail? and ...
# prefect-community
r
Hi, quick question, why
map
function fail? and works when
map
was replace with
bind
?
Copy code
from prefect import task, Flow, unmapped


@task
def first():
    return range(10)


@task
def second():
    return True


@task
def third(numbers):
    print(numbers)


with Flow(name='test') as flow:
    numbers = first()
    second_result = second()

    # third.bind(numbers=numbers, upstream_tasks=[second_result])
    third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])

flow.run()
k
This is confusing. What do you want to map over in
third
? Maybe you just need:
Copy code
third(numbers, upstream_tasks=[second_result])
Is your third meant to print the list or each individual item?
r
Even the unmapped was removed, flow finished with state failed
if remove upstream_tasks from third task , flow have status success but dependency between tasks are removed
I don;t understand why
map
cause errors. It is possible to enable more logs ?
k
Why are you mapping but using unmapped with
numbers
? There is something wrong there, but I can’t tell what you are trying to do. Do you want
third
to run 10 times or 1 time?
r
@Kevin Kho in this example are no matter what tasks should or can do, the point is why
map
with upstream_tasks generate error
Copy code
from prefect import task, Flow


@task
def second():
    return True


@task
def third(numbers):
    print(numbers)


with Flow(name='test') as flow:
    second_result = second()
    # third.bind([1, 2, 3], upstream_tasks=[second_result]) # works and print list of ints
    third.map([1, 2, 3], upstream_tasks=[second_result]) # dont work  and i dont now why ?
    # third.map([1, 2, 3], )  # works and print a single number in each mapped task

flow.run()
k
For this one can you try:
Copy code
third.map([1, 2, 3], upstream_tasks=[unmapped(second_result)])
r
ok, works, but It is weird, second task is binding an return only one value
True
why the
second_result
mustbe wrapped by unmapped() function when is use as parameter in upstream_tasks?
k
Because mapping will try to connect each element of the list to the upstream element so that it can execute is a depth first way. If
second()
is also mapped (which is assumed), then
third-1
can start as soon as
second-1
starts without needing to wait for
second-2
and
second-3
. Check this
r
Very similar problem:) thanks, I understand it better now. One more question 🙂
Copy code
from datetime import timedelta

from prefect import task, Flow
from prefect.triggers import any_failed


@task(max_retries=3, retry_delay=timedelta(seconds=3))
def first():
    # 1/0
    return list(range(10))


@task(trigger=any_failed)
def second():
    print('ERROR')


with Flow(name='test') as flow:
    numbers = first()
    second_result = second(upstream_tasks=[numbers])

flow.set_reference_tasks(tasks=[numbers])
flow.run()
I want run
second
task only if all retries of task
first
failed, tasks
second
should not be executed when
first
tasks success ( on first, second third time no matter ), and flow should be success even the
second
will not be executed (
first
success ). In this example second tasks generate TriggerError when are not executed. Maybe new trigger type
run_only_any_failed
that not generate error and finished with success status ? or You know better solution ? I know about
on_failure
parameter in Task class, but callable is execute on every retries of task
first
k
The easier part here is the flow success/fail. There is this thing called reference tasks that let you determine which one determines the flow state
Ah wait I just saw the end you did have it
I think I know what you want one second, let me make an example
Copy code
from prefect import Flow, task
import prefect 
from datetime import timedelta

def mystatehandler(task, old_state, new_state):
    if (task.max_retries < prefect.context.task_run_count and 
        new_state.is_failed()):
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
    return new_state

@task(max_retries=2, retry_delay=timedelta(seconds=2), state_handlers=[mystatehandler])
def abc(x):
    if x == 2:
        raise ValueError()
    return 1

with Flow("ecs test") as flow:
    abc.map([1,2,3])

flow.run()
Try this code, it will log the error in the state handler if the task fails
r
Nice, It is definitely better solution 🙂
Copy code
<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
why is empty ?
if change map([1,2,3]) to .bind(2)
k
Uhh I would say just don’t use
.bind()
. It’s really not user-facing, it happens under the hood
just call the task directly
Copy code
with Flow(..) as flow:
    abc()
r
still is empty
Copy code
def mystatehandler(task, old_state, new_state):
    if (task.max_retries < prefect.context.task_run_count and
            new_state.is_failed()):
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
    <http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
    return new_state

....

with Flow("ecs test") as flow:
    abc(x=2)
k
Parameters is for the ones defined in like this:
Copy code
with Flow("ecs test") as flow:
    x = Parameter("x", 2)
Not the inputs into the task
r
=oh, thanks for Your help 🙂