Alex Rud
01/12/2021, 6:26 PMmanual_only
trigger? I'm trying to do the following: read file into dataframe -> run GreatExpectations validation on dataframe -> if validation passes continue with next task, if not allow the next task to run when manually triggereddf = extract_data_frame(
input_file_path,
sftp_hostname,
sftp_password,
sftp_file_path,
)
try:
vt = validation_task(
batch_kwargs={"dataset": df, "datasource": "appts"},
expectation_suite_name="STLUKES_DEV_6K_APPTS.warning",
context_root_dir=ge_ctx_root
)
nodes = extract_nodes(df, upstream_tasks=[vt])
except FAIL:
nodes = extract_nodes(df, upstream_tasks=[vt], trigger=manual_only)
Jenny
01/12/2021, 7:55 PMcase
for your validation task?) and an extra task to handle the manual only trigger which can be upstream of your extract_nodes task if validation fails. (See the pause task at the end of the example here: https://medium.com/the-prefect-blog/needs-approval-184f2512a3cf) and docs on control flow tasks here: https://docs.prefect.io/api/latest/tasks/control_flow.html#functionsBrett Naul
01/13/2021, 2:03 PMAlex Rud
01/13/2021, 2:27 PMdf = extract_data_frame(
input_file_path,
sftp_hostname,
sftp_password,
sftp_file_path,
)
vt = validation_task(
batch_kwargs={"dataset": df, "datasource": "appts"},
expectation_suite_name="STLUKES_DEV_6K_APPTS.warning",
context_root_dir=ge_ctx_root,
upstream_tasks=[df]
)
vf = validation_fail(upstream_tasks=[vt])
ua = user_approve(upstream_tasks=[vf])
nodes = extract_nodes(df, upstream_tasks=[vt,ua])
def autocomplete(task, old_state, new_state):
new_state = Skipped() if new_state.is_failed() else new_state
return new_state
@task(trigger=any_failed, state_handlers=[autocomplete])
def validation_fail():
pass
@task(trigger=manual_only)
def user_approve():
pass
@task(trigger=any_successful, skip_on_upstream_skip=False)
def extract_nodes(df: DataFrame) -> List[ExternalAppointmentStruct]:
validation_fail
what would have been nice to have is to have a trigger that’s a combo of manual_only
and any_failed
user_approved
would trigger on manual_only_if_any_failed
or [manual_only, any_failed]