Thread
#prefect-community
    m

    Michael Ludwig

    2 years ago
    This is how we apply map:
    bucket_prefix = create_bucket_prefix_rfy()
    prediction_single_files = list_files(s3_folder=prediction)
    return load_rfy.map(
        input_data_location=prediction_single_files,
        bucket_prefix=unmapped(bucket_prefix),
    )
    We have a custom trigger but maybe something is broken with that guy when working with “Mapped” states:
    def _trigger(self, upstream_states: Dict[Edge, State]) -> bool:
            partition = self.check_partition()
            self.log_info(f"partition exists: {partition.exists}")
            if (
                trigger_run(
                    list(upstream_states.values()),
                    self._ignore_upstream_success_states_for_triggering,
                )
                or not partition.exists
            ):
                self.log_info(f"Triggering {self._name}")
                return True
    
            self._revision = cast(str, partition.revision)
    
            skip_message = (
                f"Partition already exists. Using most recent revision: {self._revision}"
            )
            self.log_info(skip_message)
            self.log_info(f"Skipping {self._name}")
            raise SKIP(result=self.get_output(), message=skip_message)
    def trigger_run(
        upstream_states: Sequence[state.State],
        ignore_upstream_success_states_for_triggering: bool,
    ) -> bool:
        n_upstream_states = len(upstream_states)
        n_success = sum(1 for task_state in upstream_states if task_state.is_successful())
    
        # If any task before had a different state than success or skip, don't run.
        if n_success < n_upstream_states:
            raise TRIGGERFAIL(
                'Trigger was "all_successful" but some of the upstream tasks failed.'
            )
    
        # Don't run this task based on upstream state only
        if ignore_upstream_success_states_for_triggering:
            return False
    
        n_skipped = sum(1 for task_state in upstream_states if task_state.is_skipped())
    
        # If the number of skipped states were less then the total number of states, run.
        return n_skipped < n_upstream_states
    nicholas

    nicholas

    2 years ago
    Hi @Michael Ludwig - this looks like you may have found a bug; would you please open an issue with your reproduction here and some minimum input data so we can reproduce?
    Dylan

    Dylan

    2 years ago
    @Marvin open “Mapping SKIPed Task Runs Bug”
    Marvin

    Marvin

    2 years ago