alvin goh
12/03/2019, 6:50 AMfrom prefect import Flow, Parameter, task
from prefect.tasks.core.constants import Constant
from prefect.engine.signals import FAIL
@task
def trythis(x, fail=False):
if fail:
raise FAIL()
return [k+1 for k in x]
@task
def tryprint(x):
print(x)
with Flow("math") as f:
x = Parameter("x")
y = trythis(x, fail=True)
y1 = trythis(y, fail=False)
z = tryprint.map(y1)
flow_state = f.run(x=[1,2,3,4,5,6,7])
print(flow_state.result[z].result)
Output:
[2019-12-03 06:50:35,756] INFO - prefect.FlowRunner | Beginning Flow run for 'math'
[2019-12-03 06:50:35,758] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-03 06:50:35,765] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2019-12-03 06:50:35,769] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2019-12-03 06:50:35,775] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
[2019-12-03 06:50:35,780] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'Failed'
[2019-12-03 06:50:35,786] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
[2019-12-03 06:50:35,789] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'TriggerFailed'
[2019-12-03 06:50:35,795] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
[2019-12-03 06:50:35,796] ERROR - prefect.TaskRunner | Task 'tryprint': unexpected error while running task: TypeError("'TRIGGERFAIL' object does not support indexing",)
Traceback (most recent call last):
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 259, in run
executor=executor,
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 719, in run_mapped_task
upstream_state.result[i],
TypeError: 'TRIGGERFAIL' object does not support indexing
[2019-12-03 06:50:35,800] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'Failed'
[2019-12-03 06:50:35,802] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
'TRIGGERFAIL' object does not support indexing
Seems like the mapped task should also raise triggerfailed if the upstream tasks failed.. maybe it's not so simple as it may just be a subset of upstream tasks which failed... what is the design philosophy on this?josh
12/03/2019, 2:50 PMFAIL
which becomes the result of your y
task. The you take that result and pass it into y1
with fail=False so it will reach the [k+1 for k in x]
with x
being the FAIL
returned from y
. This is causing the y1
task to raise the TypeError when it tries to iterate over something that is not iterable.
Take a look into using triggers to assist with cases like this! https://docs.prefect.io/api/unreleased/triggers.html
e.g. this flow will only run the aggregate task if all upstreams are successful
from prefect import task, Flow
from prefect.triggers import all_successful
@task
def divide(x):
return 1 / x
@task(trigger=all_successful)
def aggregate(results):
print(results)
with Flow("divide-fail") as flow:
results = divide.map([0, 1, 2])
aggregate(results)
flow.run()
[2019-12-03 14:58:33,956] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
[2019-12-03 14:58:33,956] DEBUG - prefect.TaskRunner | Task 'tryprint': TRIGGERFAIL signal raised during execution.
[2019-12-03 14:58:33,956] DEBUG - prefect.TaskRunner | Task 'tryprint': Handling state change from Pending to TriggerFailed
[2019-12-03 14:58:33,958] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'TriggerFailed'
[2019-12-03 14:58:33,958] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2019-12-03 14:58:33,959] DEBUG - prefect.FlowRunner | Flow 'math': Handling state change from Running to Failed
Trigger was "all_successful" but some of the upstream tasks failed.
Where the downstream should also raise a triggerfailed due to the upstreams when it comes to mapping.alvin goh
12/03/2019, 4:29 PMy1
with fail=False so it will reach the [k+1 for k in x]
with x
being the FAIL
returned from y
. <----- the y1 should have a TRIGGERFAIL result, after which the TRIGGERFAIL attempts to be iterated over for the mapping operation in the z task, thus the 'does not support indexing error'
Maybe a change can be made to not attempt to iterate over results and map tasks when the upstream result is a signal?josh
12/04/2019, 2:41 PM