Are there any good examples of using signal.SKIP()...
# prefect-community
d
Are there any good examples of using signal.SKIP() to short circuit running the whole flow? Trying to wrap our brain around it. Example:
Copy code
def test_short_circuitted_flows(prefect_flow):
    @task
    def extract() -> int:
        signal = signals.SKIP()
        signal.flag = True
        signal.value = "I was skipped yo!"
        raise signal

    @task(skip_on_upstream_skip=True)
    def transform(x: int) -> int:
        return int(x / 2 + 2)

    @task(skip_on_upstream_skip=True)
    def load(x: int) -> str:
        return str(x)

    with prefect_flow as flow:
        e = extract()
        t = transform(e)
        l = load(t)

    assert is_serializable(flow)

    state = flow.run()

    assert state.is_successful()

    assert state.result[e].is_successful()
    assert state.result[t].is_successful()
    assert state.result[l].is_successful()

    assert isinstance(state.result[e].result, signals.SKIP)
    assert isinstance(state.result[t].result, signals.SKIP)
    assert isinstance(state.result[l].result, signals.SKIP)
the second and third asserts fail
(last ones)
Copy code
assert isinstance(state.result[t].result, signals.SKIP)
E   assert False
E    +  where False = isinstance(None, <class 'prefect.engine.signals.SKIP'>)
E    +    where None = <Skipped: "Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.">.result
E    +    and   <class 'prefect.engine.signals.SKIP'> = signals.SKIP
just trying to verify that t and l tasks were skipped
ah, the result is a state object, not a signal object
final working example. thanks for listening 😉
Copy code
def test_short_circuitted_flows(prefect_flow):
    @task
    def extract() -> int:
        signal = signals.SKIP()
        signal.flag = True
        signal.value = "I was skipped yo!"
        raise signal

    @task(skip_on_upstream_skip=True)
    def transform(x: int) -> int:
        return int(x / 2 + 2)

    @task(skip_on_upstream_skip=True)
    def load(x: int) -> str:
        return str(x)

    with prefect_flow as flow:
        e = extract()
        t = transform(e)
        l = load(t)

    assert is_serializable(flow)

    state = flow.run()

    assert state.is_successful()

    assert state.result[e].is_successful()
    assert state.result[t].is_successful()
    assert state.result[l].is_successful()

    assert isinstance(state.result[e], Skipped)
    assert isinstance(state.result[t], Skipped)
    assert isinstance(state.result[l], Skipped)