Robert Kowalski

    Robert Kowalski

    2 months ago
    Hi, quick question, why
    map
    function fail? and works when
    map
    was replace with
    bind
    ?
    from prefect import task, Flow, unmapped
    
    
    @task
    def first():
        return range(10)
    
    
    @task
    def second():
        return True
    
    
    @task
    def third(numbers):
        print(numbers)
    
    
    with Flow(name='test') as flow:
        numbers = first()
        second_result = second()
    
        # third.bind(numbers=numbers, upstream_tasks=[second_result])
        third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
    
    flow.run()
    Kevin Kho

    Kevin Kho

    2 months ago
    This is confusing. What do you want to map over in
    third
    ? Maybe you just need:
    third(numbers, upstream_tasks=[second_result])
    Is your third meant to print the list or each individual item?
    Robert Kowalski

    Robert Kowalski

    2 months ago
    Even the unmapped was removed, flow finished with state failed
    if remove upstream_tasks from third task , flow have status success but dependency between tasks are removed
    I don;t understand why
    map
    cause errors. It is possible to enable more logs ?
    Kevin Kho

    Kevin Kho

    2 months ago
    Why are you mapping but using unmapped with
    numbers
    ? There is something wrong there, but I can’t tell what you are trying to do. Do you want
    third
    to run 10 times or 1 time?
    Robert Kowalski

    Robert Kowalski

    2 months ago
    @Kevin Kho in this example are no matter what tasks should or can do, the point is why
    map
    with upstream_tasks generate error
    from prefect import task, Flow
    
    
    @task
    def second():
        return True
    
    
    @task
    def third(numbers):
        print(numbers)
    
    
    with Flow(name='test') as flow:
        second_result = second()
        # third.bind([1, 2, 3], upstream_tasks=[second_result]) # works and print list of ints
        third.map([1, 2, 3], upstream_tasks=[second_result]) # dont work  and i dont now why ?
        # third.map([1, 2, 3], )  # works and print a single number in each mapped task
    
    flow.run()
    Kevin Kho

    Kevin Kho

    2 months ago
    For this one can you try:
    third.map([1, 2, 3], upstream_tasks=[unmapped(second_result)])
    Robert Kowalski

    Robert Kowalski

    2 months ago
    ok, works, but It is weird, second task is binding an return only one value
    True
    why the
    second_result
    mustbe wrapped by unmapped() function when is use as parameter in upstream_tasks?
    Kevin Kho

    Kevin Kho

    2 months ago
    Because mapping will try to connect each element of the list to the upstream element so that it can execute is a depth first way. If
    second()
    is also mapped (which is assumed), then
    third-1
    can start as soon as
    second-1
    starts without needing to wait for
    second-2
    and
    second-3
    . Check this
    Robert Kowalski

    Robert Kowalski

    2 months ago
    Very similar problem😃 thanks, I understand it better now. One more question 🙂
    from datetime import timedelta
    
    from prefect import task, Flow
    from prefect.triggers import any_failed
    
    
    @task(max_retries=3, retry_delay=timedelta(seconds=3))
    def first():
        # 1/0
        return list(range(10))
    
    
    @task(trigger=any_failed)
    def second():
        print('ERROR')
    
    
    with Flow(name='test') as flow:
        numbers = first()
        second_result = second(upstream_tasks=[numbers])
    
    flow.set_reference_tasks(tasks=[numbers])
    flow.run()
    I want run
    second
    task only if all retries of task
    first
    failed, tasks
    second
    should not be executed when
    first
    tasks success ( on first, second third time no matter ), and flow should be success even the
    second
    will not be executed (
    first
    success ). In this example second tasks generate TriggerError when are not executed. Maybe new trigger type
    run_only_any_failed
    that not generate error and finished with success status ? or You know better solution ? I know about
    on_failure
    parameter in Task class, but callable is execute on every retries of task
    first
    Kevin Kho

    Kevin Kho

    2 months ago
    The easier part here is the flow success/fail. There is this thing called reference tasks that let you determine which one determines the flow state
    Ah wait I just saw the end you did have it
    I think I know what you want one second, let me make an example
    from prefect import Flow, task
    import prefect 
    from datetime import timedelta
    
    def mystatehandler(task, old_state, new_state):
        if (task.max_retries < prefect.context.task_run_count and 
            new_state.is_failed()):
            <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
        return new_state
    
    @task(max_retries=2, retry_delay=timedelta(seconds=2), state_handlers=[mystatehandler])
    def abc(x):
        if x == 2:
            raise ValueError()
        return 1
    
    with Flow("ecs test") as flow:
        abc.map([1,2,3])
    
    flow.run()
    Try this code, it will log the error in the state handler if the task fails
    Robert Kowalski

    Robert Kowalski

    2 months ago
    Nice, It is definitely better solution 🙂
    <http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
    why is empty ?
    if change map([1,2,3]) to .bind(2)
    Kevin Kho

    Kevin Kho

    2 months ago
    Uhh I would say just don’t use
    .bind()
    . It’s really not user-facing, it happens under the hood
    just call the task directly
    with Flow(..) as flow:
        abc()
    Robert Kowalski

    Robert Kowalski

    2 months ago
    still is empty
    def mystatehandler(task, old_state, new_state):
        if (task.max_retries < prefect.context.task_run_count and
                new_state.is_failed()):
            <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
        <http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
        return new_state
    
    ....
    
    with Flow("ecs test") as flow:
        abc(x=2)
    Kevin Kho

    Kevin Kho

    2 months ago
    Parameters is for the ones defined in like this:
    with Flow("ecs test") as flow:
        x = Parameter("x", 2)
    Not the inputs into the task
    Robert Kowalski

    Robert Kowalski

    2 months ago
    =oh, thanks for Your help 🙂