This is how we apply map: ```bucket_prefix = creat...
# prefect-community
This is how we apply map:
bucket_prefix = create_bucket_prefix_rfy()
prediction_single_files = list_files(s3_folder=prediction)
We have a custom trigger but maybe something is broken with that guy when working with “Mapped” states:
def _trigger(self, upstream_states: Dict[Edge, State]) -> bool:
        partition = self.check_partition()
        self.log_info(f"partition exists: {partition.exists}")
        if (
            or not partition.exists
            self.log_info(f"Triggering {self._name}")
            return True

        self._revision = cast(str, partition.revision)

        skip_message = (
            f"Partition already exists. Using most recent revision: {self._revision}"
        self.log_info(f"Skipping {self._name}")
        raise SKIP(result=self.get_output(), message=skip_message)
def trigger_run(
    upstream_states: Sequence[state.State],
    ignore_upstream_success_states_for_triggering: bool,
) -> bool:
    n_upstream_states = len(upstream_states)
    n_success = sum(1 for task_state in upstream_states if task_state.is_successful())

    # If any task before had a different state than success or skip, don't run.
    if n_success < n_upstream_states:
        raise TRIGGERFAIL(
            'Trigger was "all_successful" but some of the upstream tasks failed.'

    # Don't run this task based on upstream state only
    if ignore_upstream_success_states_for_triggering:
        return False

    n_skipped = sum(1 for task_state in upstream_states if task_state.is_skipped())

    # If the number of skipped states were less then the total number of states, run.
    return n_skipped < n_upstream_states
Hi @Michael Ludwig - this looks like you may have found a bug; would you please open an issue with your reproduction here and some minimum input data so we can reproduce?
@Marvin open “Mapping SKIPed Task Runs Bug”