Michael Ludwig
07/31/2020, 7:21 AMbucket_prefix = create_bucket_prefix_rfy()
prediction_single_files = list_files(s3_folder=prediction)
return load_rfy.map(
    input_data_location=prediction_single_files,
    bucket_prefix=unmapped(bucket_prefix),
)Michael Ludwig
07/31/2020, 8:59 AMdef _trigger(self, upstream_states: Dict[Edge, State]) -> bool:
        partition = self.check_partition()
        self.log_info(f"partition exists: {partition.exists}")
        if (
            trigger_run(
                list(upstream_states.values()),
                self._ignore_upstream_success_states_for_triggering,
            )
            or not partition.exists
        ):
            self.log_info(f"Triggering {self._name}")
            return True
        self._revision = cast(str, partition.revision)
        skip_message = (
            f"Partition already exists. Using most recent revision: {self._revision}"
        )
        self.log_info(skip_message)
        self.log_info(f"Skipping {self._name}")
        raise SKIP(result=self.get_output(), message=skip_message)def trigger_run(
    upstream_states: Sequence[state.State],
    ignore_upstream_success_states_for_triggering: bool,
) -> bool:
    n_upstream_states = len(upstream_states)
    n_success = sum(1 for task_state in upstream_states if task_state.is_successful())
    # If any task before had a different state than success or skip, don't run.
    if n_success < n_upstream_states:
        raise TRIGGERFAIL(
            'Trigger was "all_successful" but some of the upstream tasks failed.'
        )
    # Don't run this task based on upstream state only
    if ignore_upstream_success_states_for_triggering:
        return False
    n_skipped = sum(1 for task_state in upstream_states if task_state.is_skipped())
    # If the number of skipped states were less then the total number of states, run.
    return n_skipped < n_upstream_statesnicholas
Dylan
Marvin
08/05/2020, 4:23 PM