https://prefect.io logo
Title
r

Robert Hales

12/06/2021, 10:03 AM
Hi there, had a few tasks finish with the state on
Pending
which I think lead to double runs. From what I understand
Pending
shouldn’t be a terminal state?
a

Anna Geller

12/06/2021, 10:09 AM
it should not, for sure 🙂 Can you share your flow? Usually I see double runs when you have something like: • flow.run() • flow.register() in your flow file. If you remove that and use CLI instead, e.g.: • prefect run • prefect register the double run issue should be fixed. LMK if this was the use case for you. If not, sharing your flow, or a small reproducible example would make it easier for me to identify the issue.
r

Robert Hales

12/06/2021, 10:27 AM
Hi Anna, the flow is registered in CI already and I dont use
.run
or
.register
anywhere. This happened in two different task runs (same flow run) last week but I havent seen it before or since, so a repro may be a bit tricky
We are using dask executor with a persistent cluster FYI
a

Anna Geller

12/06/2021, 10:28 AM
can you share your flow? Atm I don’t have enough information to give any recommendations
r

Robert Hales

12/06/2021, 10:30 AM
Yes sure thing
a

Anna Geller

12/06/2021, 10:34 AM
Are you on Prefect Cloud? There is a mechanism called version locking that ensures your flow and task runs are executed only once - you can enable it for this flow using GraphQL:
mutation {
  enable_flow_version_lock(input: { flow_id: "your-flow-id-here" }) {
    success
  }
}
or in UI:
r

Robert Hales

12/06/2021, 10:35 AM
No on server unfortunately
with Flow("Analysis") as flow:
    organisation_id = Parameter("organisation_id")

    # Other tasks snipped...

    # Duplicate Invoice
    buckets = duplicate_invoice_analysis_flow_task(
        organisation_id=organisation_id, upstream_tasks=[number_analysis, reversal_analysis]
    )
    buckets_analysis = duplicate_invoice_bucket_analysis_flow_task.map(
        organisation_id=unmapped(organisation_id),
        audit_mode=unmapped(False),
        bucket_id=buckets,
    )
    duplicate_invoice_public_records_analysis_flow_task(
        organisation_id=organisation_id, audit_mode=False, upstream_tasks=[buckets_analysis]
    )
Im not sure why without version locking a task can finish in pending though?
a

Anna Geller

12/06/2021, 10:39 AM
I did not say that 🙂 I only said that there is a mechanism that would ensure that it runs only once.
r

Robert Hales

12/06/2021, 10:42 AM
sure 😄
a

Anna Geller

12/06/2021, 10:43 AM
can you share what you get when you run:
flow.visualize()
r

Robert Hales

12/06/2021, 10:44 AM
They are all distinct tasks, the names are slightly different
a

Anna Geller

12/06/2021, 10:44 AM
yeah I noticed too late.
Can you share your storage and run config too?
r

Robert Hales

12/06/2021, 10:45 AM
All defaults there
a

Anna Geller

12/06/2021, 10:45 AM
ok, so you just use local storage and local run and run it only locally for now?
r

Robert Hales

12/06/2021, 10:48 AM
This all runs on ECS, all the dask workers and agent share a docker image with our codebase on
a

Anna Geller

12/06/2021, 10:50 AM
is this diagram correct? which of those tasks run twice or ends in a Pending state?
r

Robert Hales

12/06/2021, 10:52 AM
Yup thats right.
duplicate_invoice_analysis_flow_task
ran twice as did one of the mapped tasks in
duplicate_invoice_bucket_analysis_flow_task
I think the rerun of the mapped task is unrelated - due to dask worker OOM which we account for in our tasks but i think I missed this case.
The
duplicate_invoice_analysis_flow_task
finishing in pending is a mystery to me however 😄
a

Anna Geller

12/06/2021, 11:10 AM
ditto. The only idea I have is that you could add a reduce step in between - could be that this task runs multiple times because of a lack of reduce step after mapping:
mapping_output = print_mapped_output(buckets_analysis)
here is a full flow:
from prefect import task, Flow, Parameter, unmapped


@task
def number_analysis():
    pass


@task
def reversal_analysis():
    pass


@task
def duplicate_invoice_analysis_flow_task(organisation_id):
    pass


@task
def duplicate_invoice_bucket_analysis_flow_task(organisation_id, audit_mode, bucket_id):
    pass


@task
def duplicate_invoice_public_records_analysis_flow_task(
    organisation_id="organisation_id", audit_mode=False
):
    pass


@task(log_stdout=True)
def print_mapped_output(x):
    print(x)


with Flow("Analysis") as flow:
    organisation_id = Parameter("organisation_id", default="xyz")
    # Other tasks snipped...

    # Duplicate Invoice
    buckets = duplicate_invoice_analysis_flow_task(
        organisation_id=organisation_id,
        upstream_tasks=[number_analysis, reversal_analysis],
    )
    buckets_analysis = duplicate_invoice_bucket_analysis_flow_task.map(
        organisation_id=unmapped(organisation_id),
        audit_mode=unmapped(False),
        bucket_id=buckets,
    )
    mapping_output = print_mapped_output(buckets_analysis)
    duplicate_invoice_public_records_analysis_flow_task(
        organisation_id=organisation_id,
        audit_mode=False,
        upstream_tasks=[mapping_output],
    )

if __name__ == "__main__":
    flow.visualize()
r

Robert Hales

12/06/2021, 11:13 AM
Thank you for the suggestion, will try it out
🙌 1
However I dont think that will solve anything related to
duplicate_invoice_analysis_flow_task
?
a

Anna Geller

12/06/2021, 11:16 AM
it could, because the output of it is used as a list of things to map over. But I can’t say for sure
r

Robert Hales

12/06/2021, 11:18 AM
Sure, but as
duplicate_invoice_analysis_flow_task
takes some time to complete I dont really want to run it twice if at all possible
a

Anna Geller

12/06/2021, 11:23 AM
I’m not aware of any mechanism on Server to ensure that. But it should not run twice. Maybe you can try set up caching to avoid rerunning this task if it’s computationally expensive? e.g. cache_for
@task(cache_for=datetime.timedelta(hours=1))
r

Robert Hales

12/06/2021, 11:28 AM
We have a decorator for our tasks which checks if the task has completed successfully before and reuses that result as the time based caching was too restrictive. In this case however the task didnt finish in a successful state, the task didnt even finish before it was re ran - I can see logs that happen at the start of the dask interspersed with ones from the middle of the task
a

Anna Geller

12/06/2021, 11:30 AM
Did you check why it didn’t finish?
r

Robert Hales

12/06/2021, 11:32 AM
This is the logs from this task
The task was then picked up and ran again, however mid way through this run another one was started and the task was marked as successful twice
Im imagining that a terminal state of pending confused the scheduler? thats a guess tho
a

Anna Geller

12/06/2021, 11:37 AM
yeah, but I mean specifically the logs of the task, e.g. Dask logs? perhaps it makes sense to find out what is going on within this task, why it takes so long to complete, etc. I would try to perhaps break this task down into multiple ones if it takes so long, look at whether this task has enough resources (e.g. memory) to complete, and try to apply caching or store the output to some external store e.g. S3 or a database to avoid rerunning when not needed
r

Robert Hales

12/06/2021, 11:40 AM
Once my code was running it did complete fine, note the time stamps on the logs there is not even a second between task start and marking as pending
a

Anna Geller

12/06/2021, 11:42 AM
yeah I see that, but no idea why. Again, I would try to break this task down, try caching or external state persistence and see if this fixes the issue.
r

Robert Hales

12/06/2021, 11:48 AM
Sure, correct cache invalidation in this case will be very tricky 😄 Thanks for your help, if I can repro the pending final state I will file a bug.
👍 1