Kyle McChesney
06/21/2022, 10:42 PM@task
def required():
pass
@task
def optional():
pass
@task
def final():
pass
with Flow('example'):
res = required()
with case(some_case, True):
optional_res = optional()
final(res, upstream_tasks=[optional_res])
If optional is skipped, its okay to run final. If required fails, or optional fails, I dont want to run finalKevin Kho
06/21/2022, 10:43 PMKyle McChesney
06/21/2022, 10:44 PMoptional
to use in another with case(..., False)
and then merge.Kevin Kho
06/21/2022, 10:44 PM@task(skip_on_upstream_skip=False)
def final():
pass
...
final(res, upstream_tasks=[optional_res, required])
Kyle McChesney
06/21/2022, 10:47 PMall_successful
seems to count skipped as successful. So I think it actually just does what I want out of the boxKevin Kho
06/21/2022, 10:48 PMfinal
will be skippedKyle McChesney
06/21/2022, 10:52 PMrequired
doesn’t seem to be needed in upstream tasksfrom prefect import task, Flow, Parameter, case
@task
def case_value(param):
return str(param).strip().lower() == 'true'
@task
def required():
return [1, 2, 3]
@task
def optional():
return [4, 5, 6]
@task(skip_on_upstream_skip=Fals)
def final(res):
print(res)
with Flow('example') as flow:
case_param = Parameter('case')
case_val = case_value(case_param)
res = required()
with case(case_val, True):
optional_res = optional()
final(res, upstream_tasks=[optional_res])
prefect run -p test.py --param case=true
Retrieving local flow... Done
Configured local flow run
└── Parameters: {'case': True}
Running flow locally...
└── 16:51:57 | INFO | Beginning Flow run for 'example'
└── 16:51:57 | INFO | Task 'case': Starting task run...
└── 16:51:57 | INFO | Task 'case': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO | Task 'required': Starting task run...
└── 16:51:57 | INFO | Task 'required': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO | Task 'case_value': Starting task run...
└── 16:51:57 | INFO | Task 'case_value': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO | Task 'case(True)': Starting task run...
└── 16:51:57 | INFO | Task 'case(True)': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO | Task 'optional': Starting task run...
└── 16:51:57 | INFO | Task 'optional': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO | Task 'final': Starting task run...
[1, 2, 3]
└── 16:51:57 | INFO | Task 'final': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
prefect run -p test.py --param case=false
Retrieving local flow... Done
Configured local flow run
└── Parameters: {'case': False}
Running flow locally...
└── 16:51:51 | INFO | Beginning Flow run for 'example'
└── 16:51:51 | INFO | Task 'case': Starting task run...
└── 16:51:52 | INFO | Task 'case': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO | Task 'required': Starting task run...
└── 16:51:52 | INFO | Task 'required': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO | Task 'case_value': Starting task run...
└── 16:51:52 | INFO | Task 'case_value': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO | Task 'case(True)': Starting task run...
└── 16:51:52 | INFO | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
└── 16:51:52 | INFO | Task 'case(True)': Finished task run for task with final state: 'Skipped'
└── 16:51:52 | INFO | Task 'optional': Starting task run...
└── 16:51:52 | INFO | Task 'optional': Finished task run for task with final state: 'Skipped'
└── 16:51:52 | INFO | Task 'final': Starting task run...
[1, 2, 3]
└── 16:51:52 | INFO | Task 'final': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!