Robert Hales
12/06/2021, 10:03 AMPending
which I think lead to double runs. From what I understand Pending
shouldn’t be a terminal state?Anna Geller
Robert Hales
12/06/2021, 10:27 AM.run
or .register
anywhere. This happened in two different task runs (same flow run) last week but I havent seen it before or since, so a repro may be a bit trickyRobert Hales
12/06/2021, 10:28 AMAnna Geller
Robert Hales
12/06/2021, 10:30 AMAnna Geller
mutation {
enable_flow_version_lock(input: { flow_id: "your-flow-id-here" }) {
success
}
}
or in UI:Robert Hales
12/06/2021, 10:35 AMRobert Hales
12/06/2021, 10:36 AMwith Flow("Analysis") as flow:
organisation_id = Parameter("organisation_id")
# Other tasks snipped...
# Duplicate Invoice
buckets = duplicate_invoice_analysis_flow_task(
organisation_id=organisation_id, upstream_tasks=[number_analysis, reversal_analysis]
)
buckets_analysis = duplicate_invoice_bucket_analysis_flow_task.map(
organisation_id=unmapped(organisation_id),
audit_mode=unmapped(False),
bucket_id=buckets,
)
duplicate_invoice_public_records_analysis_flow_task(
organisation_id=organisation_id, audit_mode=False, upstream_tasks=[buckets_analysis]
)
Robert Hales
12/06/2021, 10:38 AMAnna Geller
Robert Hales
12/06/2021, 10:42 AMAnna Geller
flow.visualize()
Robert Hales
12/06/2021, 10:44 AMAnna Geller
Anna Geller
Robert Hales
12/06/2021, 10:45 AMAnna Geller
Robert Hales
12/06/2021, 10:48 AMAnna Geller
Robert Hales
12/06/2021, 10:52 AMduplicate_invoice_analysis_flow_task
ran twice as did one of the mapped tasks in duplicate_invoice_bucket_analysis_flow_task
Robert Hales
12/06/2021, 11:03 AMRobert Hales
12/06/2021, 11:04 AMduplicate_invoice_analysis_flow_task
finishing in pending is a mystery to me however 😄Anna Geller
mapping_output = print_mapped_output(buckets_analysis)
here is a full flow:
from prefect import task, Flow, Parameter, unmapped
@task
def number_analysis():
pass
@task
def reversal_analysis():
pass
@task
def duplicate_invoice_analysis_flow_task(organisation_id):
pass
@task
def duplicate_invoice_bucket_analysis_flow_task(organisation_id, audit_mode, bucket_id):
pass
@task
def duplicate_invoice_public_records_analysis_flow_task(
organisation_id="organisation_id", audit_mode=False
):
pass
@task(log_stdout=True)
def print_mapped_output(x):
print(x)
with Flow("Analysis") as flow:
organisation_id = Parameter("organisation_id", default="xyz")
# Other tasks snipped...
# Duplicate Invoice
buckets = duplicate_invoice_analysis_flow_task(
organisation_id=organisation_id,
upstream_tasks=[number_analysis, reversal_analysis],
)
buckets_analysis = duplicate_invoice_bucket_analysis_flow_task.map(
organisation_id=unmapped(organisation_id),
audit_mode=unmapped(False),
bucket_id=buckets,
)
mapping_output = print_mapped_output(buckets_analysis)
duplicate_invoice_public_records_analysis_flow_task(
organisation_id=organisation_id,
audit_mode=False,
upstream_tasks=[mapping_output],
)
if __name__ == "__main__":
flow.visualize()
Robert Hales
12/06/2021, 11:13 AMRobert Hales
12/06/2021, 11:14 AMduplicate_invoice_analysis_flow_task
?Anna Geller
Robert Hales
12/06/2021, 11:18 AMduplicate_invoice_analysis_flow_task
takes some time to complete I dont really want to run it twice if at all possibleAnna Geller
@task(cache_for=datetime.timedelta(hours=1))
Robert Hales
12/06/2021, 11:28 AMAnna Geller
Robert Hales
12/06/2021, 11:32 AMRobert Hales
12/06/2021, 11:33 AMRobert Hales
12/06/2021, 11:34 AMAnna Geller
Robert Hales
12/06/2021, 11:40 AMAnna Geller
Robert Hales
12/06/2021, 11:48 AM