Michael Ludwig
07/31/2020, 7:21 AMbucket_prefix = create_bucket_prefix_rfy()
prediction_single_files = list_files(s3_folder=prediction)
return load_rfy.map(
input_data_location=prediction_single_files,
bucket_prefix=unmapped(bucket_prefix),
)
def _trigger(self, upstream_states: Dict[Edge, State]) -> bool:
partition = self.check_partition()
self.log_info(f"partition exists: {partition.exists}")
if (
trigger_run(
list(upstream_states.values()),
self._ignore_upstream_success_states_for_triggering,
)
or not partition.exists
):
self.log_info(f"Triggering {self._name}")
return True
self._revision = cast(str, partition.revision)
skip_message = (
f"Partition already exists. Using most recent revision: {self._revision}"
)
self.log_info(skip_message)
self.log_info(f"Skipping {self._name}")
raise SKIP(result=self.get_output(), message=skip_message)
def trigger_run(
upstream_states: Sequence[state.State],
ignore_upstream_success_states_for_triggering: bool,
) -> bool:
n_upstream_states = len(upstream_states)
n_success = sum(1 for task_state in upstream_states if task_state.is_successful())
# If any task before had a different state than success or skip, don't run.
if n_success < n_upstream_states:
raise TRIGGERFAIL(
'Trigger was "all_successful" but some of the upstream tasks failed.'
)
# Don't run this task based on upstream state only
if ignore_upstream_success_states_for_triggering:
return False
n_skipped = sum(1 for task_state in upstream_states if task_state.is_skipped())
# If the number of skipped states were less then the total number of states, run.
return n_skipped < n_upstream_states
nicholas
07/31/2020, 1:43 PMDylan
08/05/2020, 4:23 PMMarvin
08/05/2020, 4:23 PM