Dolor Oculus
11/20/2020, 11:59 PMdef test_short_circuitted_flows(prefect_flow):
@task
def extract() -> int:
signal = signals.SKIP()
signal.flag = True
signal.value = "I was skipped yo!"
raise signal
@task(skip_on_upstream_skip=True)
def transform(x: int) -> int:
return int(x / 2 + 2)
@task(skip_on_upstream_skip=True)
def load(x: int) -> str:
return str(x)
with prefect_flow as flow:
e = extract()
t = transform(e)
l = load(t)
assert is_serializable(flow)
state = flow.run()
assert state.is_successful()
assert state.result[e].is_successful()
assert state.result[t].is_successful()
assert state.result[l].is_successful()
assert isinstance(state.result[e].result, signals.SKIP)
assert isinstance(state.result[t].result, signals.SKIP)
assert isinstance(state.result[l].result, signals.SKIP)
assert isinstance(state.result[t].result, signals.SKIP)
E assert False
E + where False = isinstance(None, <class 'prefect.engine.signals.SKIP'>)
E + where None = <Skipped: "Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.">.result
E + and <class 'prefect.engine.signals.SKIP'> = signals.SKIP
def test_short_circuitted_flows(prefect_flow):
@task
def extract() -> int:
signal = signals.SKIP()
signal.flag = True
signal.value = "I was skipped yo!"
raise signal
@task(skip_on_upstream_skip=True)
def transform(x: int) -> int:
return int(x / 2 + 2)
@task(skip_on_upstream_skip=True)
def load(x: int) -> str:
return str(x)
with prefect_flow as flow:
e = extract()
t = transform(e)
l = load(t)
assert is_serializable(flow)
state = flow.run()
assert state.is_successful()
assert state.result[e].is_successful()
assert state.result[t].is_successful()
assert state.result[l].is_successful()
assert isinstance(state.result[e], Skipped)
assert isinstance(state.result[t], Skipped)
assert isinstance(state.result[l], Skipped)