I'd like to be able to pass a trigger to a task wh...
# prefect-community
n
I'd like to be able to pass a trigger to a task when I add the task to the flow. I have a trigger I'm working on that takes additional parameters based on its associated task. I can replace the default trigger directly, but this adds a bit of clutter when constructing the flow. Existing:
Copy code
with Flow("Existing") as flow:
    file_b = file_builder("b.data")
    file_b.trigger = upstream_dependency(src_fpaths=["a.data"])
Proposed:
Copy code
with Flow("Proposed") as flow:
    file_b = file_builder("b.data", trigger=upstream_dependency(src_fpaths="a.data"))
Example Trigger:
Copy code
@curry
def upstream_dependency(
    upstream_states: Callable[[Dict["core.Edge", "state.State"]], bool],
    source_fpaths: List[Path] = None,
) -> bool:
    if not all(s.is_successful() for s in _get_all_states_as_set(upstream_states)):
        raise signals.TRIGGERFAIL(
            'Trigger was "all_successful" but some of the upstream tasks failed.'
        )
    # Everything skipped.   Check to see if any file dependencies require task to run.
    run_required = file_dependency_skipper(source_fpaths)
    if run_required:
        return True
    else:
        raise signals.SKIP("All dependencies are up to date.")
I hacked this together with the following change. In
prefect.core.task
Copy code
def bind(
    self,
    *args: Any,
    mapped: bool = False,
    upstream_tasks: Iterable[Any] = None,
    trigger: Callable[[Dict["core.Edge", "state.State"]], bool] = None,
    flow: "Flow" = None,
    **kwargs: Any
) -> "Task":
self.trigger = trigger
If I build the task by inheritance instead of the @task decorator, I can pass the trigger. I'm not sure if we want to dump all the parameters into the parameters for constructing a task as the list is pretty short now.
upstream_tasks
being the most similar one that is already in the signature of bind.
j
Hey @Nate Atkins - you can pass a special kwarg
task_args
that I think will let you do what you want. Here’s a quick example:
Copy code
from prefect import task, Flow

@task
def f(x):
    return x + 1

with Flow('x') as flow:
    result = f(1, task_args=dict(trigger='not a real trigger'))

assert result.trigger == 'not a real trigger'
I’m realizing we don’t really expose this fact in our docs - I’ll add something!
n
Wonderful!
@Jeremiah just opened https://github.com/PrefectHQ/prefect/issues/2588 at the intersection of
task_args
and
target
. When the
target
and
result
are passed with
task_args
and the copy of the task is made the
location
property isn't set on the
result
and it falls back to the
default_location
when it write out the file. I can work around this by passing through
@task.
j
👌 just saw your issue and was writing a comment