Robert Kowalski
07/07/2022, 1:06 PMmapmapbindfrom prefect import task, Flow, unmapped
@task
def first():
    return range(10)
@task
def second():
    return True
@task
def third(numbers):
    print(numbers)
with Flow(name='test') as flow:
    numbers = first()
    second_result = second()
    # third.bind(numbers=numbers, upstream_tasks=[second_result])
    third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
flow.run()Kevin Kho
thirdthird(numbers, upstream_tasks=[second_result])Kevin Kho
Robert Kowalski
07/07/2022, 5:49 PMRobert Kowalski
07/07/2022, 5:50 PMRobert Kowalski
07/07/2022, 5:51 PMmapKevin Kho
numbersthirdRobert Kowalski
07/07/2022, 6:41 PMmapfrom prefect import task, Flow
@task
def second():
    return True
@task
def third(numbers):
    print(numbers)
with Flow(name='test') as flow:
    second_result = second()
    # third.bind([1, 2, 3], upstream_tasks=[second_result]) # works and print list of ints
    third.map([1, 2, 3], upstream_tasks=[second_result]) # dont work  and i dont now why ?
    # third.map([1, 2, 3], )  # works and print a single number in each mapped task
flow.run()Kevin Kho
third.map([1, 2, 3], upstream_tasks=[unmapped(second_result)])Robert Kowalski
07/07/2022, 6:46 PMTruesecond_resultKevin Kho
second()third-1second-1second-2second-3Robert Kowalski
07/07/2022, 7:16 PMfrom datetime import timedelta
from prefect import task, Flow
from prefect.triggers import any_failed
@task(max_retries=3, retry_delay=timedelta(seconds=3))
def first():
    # 1/0
    return list(range(10))
@task(trigger=any_failed)
def second():
    print('ERROR')
with Flow(name='test') as flow:
    numbers = first()
    second_result = second(upstream_tasks=[numbers])
flow.set_reference_tasks(tasks=[numbers])
flow.run()secondfirstsecondfirstsecondfirstrun_only_any_failedon_failurefirstKevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
from prefect import Flow, task
import prefect 
from datetime import timedelta
def mystatehandler(task, old_state, new_state):
    if (task.max_retries < prefect.context.task_run_count and 
        new_state.is_failed()):
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
    return new_state
@task(max_retries=2, retry_delay=timedelta(seconds=2), state_handlers=[mystatehandler])
def abc(x):
    if x == 2:
        raise ValueError()
    return 1
with Flow("ecs test") as flow:
    abc.map([1,2,3])
flow.run()Kevin Kho
Robert Kowalski
07/07/2022, 7:57 PM<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)Robert Kowalski
07/07/2022, 8:00 PMKevin Kho
.bind()Kevin Kho
with Flow(..) as flow:
    abc()Robert Kowalski
07/07/2022, 8:27 PMdef mystatehandler(task, old_state, new_state):
    if (task.max_retries < prefect.context.task_run_count and
            new_state.is_failed()):
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
    <http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
    return new_state
....
with Flow("ecs test") as flow:
    abc(x=2)Kevin Kho
with Flow("ecs test") as flow:
    x = Parameter("x", 2)Robert Kowalski
07/07/2022, 8:44 PM