Cab Maddux
03/31/2020, 8:49 PMZachary Hughes
03/31/2020, 9:34 PMCab Maddux
03/31/2020, 9:38 PMZachary Hughes
03/31/2020, 9:52 PMCab Maddux
03/31/2020, 9:52 PMZachary Hughes
03/31/2020, 9:58 PMrun
method if you've set self.result_handler
(which it sounds like you have)
⢠you could also add a state handler to your task and access the result info you're looking for by accessing new_state._result
Cab Maddux
04/01/2020, 11:09 AMZach
04/01/2020, 3:20 PMZachary Hughes
04/01/2020, 3:22 PMZach
04/01/2020, 3:30 PMZachary Hughes
04/01/2020, 3:34 PMZach
04/01/2020, 3:45 PM@task(
checkpoint=True,
result_handler=GCSResultHandler(
bucket="my-bucket", credentials_secret=GCS_CREDS_SECRET_NAME
),
state_handlers=[my_state_handler_function],
cache_for=datetime.timedelta(days=DEFAULT_TASK_CACHE_DURATION),
cache_validator=cache_validators.all_inputs,
cache_key=CACHE_KEYS.get("run_function_foo"),
)
def run_function_foo(arg1: str, arg2: str) -> str:
When I take out those last three caching arguments to the task decorator then the state handler only runs onceZachary Hughes
04/01/2020, 3:59 PMCached
. So in that case, you have another state transition: Success
-> Cached
. But if you're just looking to trigger this logic on Success
, you should be able to check the new_state
argument in your state handler and branch based on that.