Robert Kowalski
07/07/2022, 1:06 PMmap
function fail? and works when map
was replace with bind
?
from prefect import task, Flow, unmapped
@task
def first():
return range(10)
@task
def second():
return True
@task
def third(numbers):
print(numbers)
with Flow(name='test') as flow:
numbers = first()
second_result = second()
# third.bind(numbers=numbers, upstream_tasks=[second_result])
third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
flow.run()
Kevin Kho
07/07/2022, 4:38 PMthird
? Maybe you just need:
third(numbers, upstream_tasks=[second_result])
Robert Kowalski
07/07/2022, 5:49 PMmap
cause errors. It is possible to enable more logs ?Kevin Kho
07/07/2022, 6:33 PMnumbers
? There is something wrong there, but I can’t tell what you are trying to do. Do you want third
to run 10 times or 1 time?Robert Kowalski
07/07/2022, 6:41 PMmap
with upstream_tasks generate error
from prefect import task, Flow
@task
def second():
return True
@task
def third(numbers):
print(numbers)
with Flow(name='test') as flow:
second_result = second()
# third.bind([1, 2, 3], upstream_tasks=[second_result]) # works and print list of ints
third.map([1, 2, 3], upstream_tasks=[second_result]) # dont work and i dont now why ?
# third.map([1, 2, 3], ) # works and print a single number in each mapped task
flow.run()
Kevin Kho
07/07/2022, 6:42 PMthird.map([1, 2, 3], upstream_tasks=[unmapped(second_result)])
Robert Kowalski
07/07/2022, 6:46 PMTrue
why the second_result
mustbe wrapped by unmapped() function when is use as parameter in upstream_tasks?Kevin Kho
07/07/2022, 6:49 PMsecond()
is also mapped (which is assumed), then third-1
can start as soon as second-1
starts without needing to wait for second-2
and second-3
. Check thisRobert Kowalski
07/07/2022, 7:16 PMfrom datetime import timedelta
from prefect import task, Flow
from prefect.triggers import any_failed
@task(max_retries=3, retry_delay=timedelta(seconds=3))
def first():
# 1/0
return list(range(10))
@task(trigger=any_failed)
def second():
print('ERROR')
with Flow(name='test') as flow:
numbers = first()
second_result = second(upstream_tasks=[numbers])
flow.set_reference_tasks(tasks=[numbers])
flow.run()
I want run second
task only if all retries of task first
failed, tasks second
should not be executed when first
tasks success ( on first, second third time no matter ),
and flow should be success even the second
will not be executed ( first
success ).
In this example second tasks generate TriggerError when are not executed.
Maybe new trigger type run_only_any_failed
that not generate error and finished with success status ? or You know better solution ?
I know about on_failure
parameter in Task class, but callable is execute on every retries of task first
Kevin Kho
07/07/2022, 7:20 PMfrom prefect import Flow, task
import prefect
from datetime import timedelta
def mystatehandler(task, old_state, new_state):
if (task.max_retries < prefect.context.task_run_count and
new_state.is_failed()):
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
return new_state
@task(max_retries=2, retry_delay=timedelta(seconds=2), state_handlers=[mystatehandler])
def abc(x):
if x == 2:
raise ValueError()
return 1
with Flow("ecs test") as flow:
abc.map([1,2,3])
flow.run()
Robert Kowalski
07/07/2022, 7:57 PM<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
why is empty ?Kevin Kho
07/07/2022, 8:12 PM.bind()
. It’s really not user-facing, it happens under the hoodwith Flow(..) as flow:
abc()
Robert Kowalski
07/07/2022, 8:27 PMdef mystatehandler(task, old_state, new_state):
if (task.max_retries < prefect.context.task_run_count and
new_state.is_failed()):
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
return new_state
....
with Flow("ecs test") as flow:
abc(x=2)
Kevin Kho
07/07/2022, 8:29 PMwith Flow("ecs test") as flow:
x = Parameter("x", 2)
Not the inputs into the taskRobert Kowalski
07/07/2022, 8:44 PM