Nate Atkins
05/16/2020, 8:02 PMwith Flow("Existing") as flow:
file_b = file_builder("b.data")
file_b.trigger = upstream_dependency(src_fpaths=["a.data"])
Proposed:
with Flow("Proposed") as flow:
file_b = file_builder("b.data", trigger=upstream_dependency(src_fpaths="a.data"))
Example Trigger:
@curry
def upstream_dependency(
upstream_states: Callable[[Dict["core.Edge", "state.State"]], bool],
source_fpaths: List[Path] = None,
) -> bool:
if not all(s.is_successful() for s in _get_all_states_as_set(upstream_states)):
raise signals.TRIGGERFAIL(
'Trigger was "all_successful" but some of the upstream tasks failed.'
)
# Everything skipped. Check to see if any file dependencies require task to run.
run_required = file_dependency_skipper(source_fpaths)
if run_required:
return True
else:
raise signals.SKIP("All dependencies are up to date.")
I hacked this together with the following change. In prefect.core.task
def bind(
self,
*args: Any,
mapped: bool = False,
upstream_tasks: Iterable[Any] = None,
trigger: Callable[[Dict["core.Edge", "state.State"]], bool] = None,
flow: "Flow" = None,
**kwargs: Any
) -> "Task":
self.trigger = trigger
If I build the task by inheritance instead of the @task decorator, I can pass the trigger. I'm not sure if we want to dump all the parameters into the parameters for constructing a task as the list is pretty short now. upstream_tasks
being the most similar one that is already in the signature of bind.Jeremiah
05/16/2020, 9:02 PMtask_args
that I think will let you do what you want. Here’s a quick example:from prefect import task, Flow
@task
def f(x):
return x + 1
with Flow('x') as flow:
result = f(1, task_args=dict(trigger='not a real trigger'))
assert result.trigger == 'not a real trigger'
Nate Atkins
05/16/2020, 10:29 PMtask_args
and target
. When the target
and result
are passed with task_args
and the copy of the task is made the location
property isn't set on the result
and it falls back to the default_location
when it write out the file.
I can work around this by passing through @task.
Jeremiah
05/17/2020, 5:13 PM