https://prefect.io logo
a

Alex Rud

01/12/2021, 6:26 PM
Hi all... Is there a way to implement a dynamic
manual_only
trigger? I'm trying to do the following: read file into dataframe -> run GreatExpectations validation on dataframe -> if validation passes continue with next task, if not allow the next task to run when manually triggered
something like this
Copy code
df = extract_data_frame(
    input_file_path,
    sftp_hostname,
    sftp_password,
    sftp_file_path,
  )
  try:
    vt = validation_task(
      batch_kwargs={"dataset": df, "datasource": "appts"},
      expectation_suite_name="STLUKES_DEV_6K_APPTS.warning",
      context_root_dir=ge_ctx_root
    )
    nodes = extract_nodes(df, upstream_tasks=[vt])
  except FAIL:
    nodes = extract_nodes(df, upstream_tasks=[vt], trigger=manual_only)
j

Jenny

01/12/2021, 7:55 PM
Hi @Alex Rud You might want to consider adding a control flow task (maybe
case
for your validation task?) and an extra task to handle the manual only trigger which can be upstream of your extract_nodes task if validation fails. (See the pause task at the end of the example here: https://medium.com/the-prefect-blog/needs-approval-184f2512a3cf) and docs on control flow tasks here: https://docs.prefect.io/api/latest/tasks/control_flow.html#functions
b

Brett Naul

01/13/2021, 2:03 PM
hi @Alex Rud, I think I had the same q the other day and @Dylan suggested this: https://github.com/PrefectHQ/prefect/issues/3937
upvote 1
a

Alex Rud

01/13/2021, 2:27 PM
Thanks @Brett Naul, I think thats more high level/run meta info… What I’m looking for is more of alternate paths based on previous task states… Though I’ve seen @Jenny’s article before her response gave me an idea for a solution…
In my flow I have as follows:
Copy code
df = extract_data_frame(
    input_file_path,
    sftp_hostname,
    sftp_password,
    sftp_file_path,
  )
  vt = validation_task(
    batch_kwargs={"dataset": df, "datasource": "appts"},
    expectation_suite_name="STLUKES_DEV_6K_APPTS.warning",
    context_root_dir=ge_ctx_root,
    upstream_tasks=[df]
                  )
  vf = validation_fail(upstream_tasks=[vt])
  ua = user_approve(upstream_tasks=[vf])
  nodes = extract_nodes(df, upstream_tasks=[vt,ua])
in my tasks I have as follows:
Copy code
def autocomplete(task, old_state, new_state):
  new_state = Skipped() if new_state.is_failed() else new_state
  return new_state

@task(trigger=any_failed, state_handlers=[autocomplete])
def validation_fail():
  pass

@task(trigger=manual_only)
def user_approve():
  pass

@task(trigger=any_successful, skip_on_upstream_skip=False)
def extract_nodes(df: DataFrame) -> List[ExternalAppointmentStruct]:
The only slight flaw here is the need for an intermediary step
validation_fail
what would have been nice to have is to have a trigger that’s a combo of
manual_only
and
any_failed
ie:
user_approved
would trigger on
manual_only_if_any_failed
or
[manual_only, any_failed]