Robert Kowalski
07/07/2022, 1:06 PMmap
function fail? and works when map
was replace with bind
?
from prefect import task, Flow, unmapped
@task
def first():
return range(10)
@task
def second():
return True
@task
def third(numbers):
print(numbers)
with Flow(name='test') as flow:
numbers = first()
second_result = second()
# third.bind(numbers=numbers, upstream_tasks=[second_result])
third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
flow.run()
Kevin Kho
third
? Maybe you just need:
third(numbers, upstream_tasks=[second_result])
Kevin Kho
Robert Kowalski
07/07/2022, 5:49 PMRobert Kowalski
07/07/2022, 5:50 PMRobert Kowalski
07/07/2022, 5:51 PMmap
cause errors. It is possible to enable more logs ?Kevin Kho
numbers
? There is something wrong there, but I can’t tell what you are trying to do. Do you want third
to run 10 times or 1 time?Robert Kowalski
07/07/2022, 6:41 PMmap
with upstream_tasks generate error
from prefect import task, Flow
@task
def second():
return True
@task
def third(numbers):
print(numbers)
with Flow(name='test') as flow:
second_result = second()
# third.bind([1, 2, 3], upstream_tasks=[second_result]) # works and print list of ints
third.map([1, 2, 3], upstream_tasks=[second_result]) # dont work and i dont now why ?
# third.map([1, 2, 3], ) # works and print a single number in each mapped task
flow.run()
Kevin Kho
third.map([1, 2, 3], upstream_tasks=[unmapped(second_result)])
Robert Kowalski
07/07/2022, 6:46 PMTrue
why the second_result
mustbe wrapped by unmapped() function when is use as parameter in upstream_tasks?Kevin Kho
second()
is also mapped (which is assumed), then third-1
can start as soon as second-1
starts without needing to wait for second-2
and second-3
. Check thisRobert Kowalski
07/07/2022, 7:16 PMfrom datetime import timedelta
from prefect import task, Flow
from prefect.triggers import any_failed
@task(max_retries=3, retry_delay=timedelta(seconds=3))
def first():
# 1/0
return list(range(10))
@task(trigger=any_failed)
def second():
print('ERROR')
with Flow(name='test') as flow:
numbers = first()
second_result = second(upstream_tasks=[numbers])
flow.set_reference_tasks(tasks=[numbers])
flow.run()
I want run second
task only if all retries of task first
failed, tasks second
should not be executed when first
tasks success ( on first, second third time no matter ),
and flow should be success even the second
will not be executed ( first
success ).
In this example second tasks generate TriggerError when are not executed.
Maybe new trigger type run_only_any_failed
that not generate error and finished with success status ? or You know better solution ?
I know about on_failure
parameter in Task class, but callable is execute on every retries of task first
Kevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
from prefect import Flow, task
import prefect
from datetime import timedelta
def mystatehandler(task, old_state, new_state):
if (task.max_retries < prefect.context.task_run_count and
new_state.is_failed()):
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
return new_state
@task(max_retries=2, retry_delay=timedelta(seconds=2), state_handlers=[mystatehandler])
def abc(x):
if x == 2:
raise ValueError()
return 1
with Flow("ecs test") as flow:
abc.map([1,2,3])
flow.run()
Kevin Kho
Robert Kowalski
07/07/2022, 7:57 PM<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
why is empty ?Robert Kowalski
07/07/2022, 8:00 PMKevin Kho
.bind()
. It’s really not user-facing, it happens under the hoodKevin Kho
with Flow(..) as flow:
abc()
Robert Kowalski
07/07/2022, 8:27 PMdef mystatehandler(task, old_state, new_state):
if (task.max_retries < prefect.context.task_run_count and
new_state.is_failed()):
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
<http://prefect.context.logger.info|prefect.context.logger.info>(prefect.context.parameters)
return new_state
....
with Flow("ecs test") as flow:
abc(x=2)
Kevin Kho
with Flow("ecs test") as flow:
x = Parameter("x", 2)
Not the inputs into the taskRobert Kowalski
07/07/2022, 8:44 PM