Nate Atkins05/16/2020, 8:02 PM
with Flow("Existing") as flow: file_b = file_builder("b.data") file_b.trigger = upstream_dependency(src_fpaths=["a.data"])
with Flow("Proposed") as flow: file_b = file_builder("b.data", trigger=upstream_dependency(src_fpaths="a.data"))
I hacked this together with the following change. In
@curry def upstream_dependency( upstream_states: Callable[[Dict["core.Edge", "state.State"]], bool], source_fpaths: List[Path] = None, ) -> bool: if not all(s.is_successful() for s in _get_all_states_as_set(upstream_states)): raise signals.TRIGGERFAIL( 'Trigger was "all_successful" but some of the upstream tasks failed.' ) # Everything skipped. Check to see if any file dependencies require task to run. run_required = file_dependency_skipper(source_fpaths) if run_required: return True else: raise signals.SKIP("All dependencies are up to date.")
If I build the task by inheritance instead of the @task decorator, I can pass the trigger. I'm not sure if we want to dump all the parameters into the parameters for constructing a task as the list is pretty short now.
def bind( self, *args: Any, mapped: bool = False, upstream_tasks: Iterable[Any] = None, trigger: Callable[[Dict["core.Edge", "state.State"]], bool] = None, flow: "Flow" = None, **kwargs: Any ) -> "Task": self.trigger = trigger
being the most similar one that is already in the signature of bind.
Jeremiah05/16/2020, 9:02 PM
that I think will let you do what you want. Here’s a quick example:
from prefect import task, Flow @task def f(x): return x + 1 with Flow('x') as flow: result = f(1, task_args=dict(trigger='not a real trigger')) assert result.trigger == 'not a real trigger'
Nate Atkins05/16/2020, 10:29 PM
. When the
are passed with
and the copy of the task is made the
property isn't set on the
and it falls back to the
when it write out the file. I can work around this by passing through
Jeremiah05/17/2020, 5:13 PM