Hi there, had a few tasks finish with the state on...
# prefect-server
r
Hi there, had a few tasks finish with the state on
Pending
which I think lead to double runs. From what I understand
Pending
shouldn’t be a terminal state?
a
it should not, for sure 🙂 Can you share your flow? Usually I see double runs when you have something like: • flow.run() • flow.register() in your flow file. If you remove that and use CLI instead, e.g.: • prefect run • prefect register the double run issue should be fixed. LMK if this was the use case for you. If not, sharing your flow, or a small reproducible example would make it easier for me to identify the issue.
r
Hi Anna, the flow is registered in CI already and I dont use
.run
or
.register
anywhere. This happened in two different task runs (same flow run) last week but I havent seen it before or since, so a repro may be a bit tricky
We are using dask executor with a persistent cluster FYI
a
can you share your flow? Atm I don’t have enough information to give any recommendations
r
Yes sure thing
a
Are you on Prefect Cloud? There is a mechanism called version locking that ensures your flow and task runs are executed only once - you can enable it for this flow using GraphQL:
Copy code
mutation {
  enable_flow_version_lock(input: { flow_id: "your-flow-id-here" }) {
    success
  }
}
or in UI:
r
No on server unfortunately
Copy code
with Flow("Analysis") as flow:
    organisation_id = Parameter("organisation_id")

    # Other tasks snipped...

    # Duplicate Invoice
    buckets = duplicate_invoice_analysis_flow_task(
        organisation_id=organisation_id, upstream_tasks=[number_analysis, reversal_analysis]
    )
    buckets_analysis = duplicate_invoice_bucket_analysis_flow_task.map(
        organisation_id=unmapped(organisation_id),
        audit_mode=unmapped(False),
        bucket_id=buckets,
    )
    duplicate_invoice_public_records_analysis_flow_task(
        organisation_id=organisation_id, audit_mode=False, upstream_tasks=[buckets_analysis]
    )
Im not sure why without version locking a task can finish in pending though?
a
I did not say that 🙂 I only said that there is a mechanism that would ensure that it runs only once.
r
sure 😄
a
can you share what you get when you run:
Copy code
flow.visualize()
r
They are all distinct tasks, the names are slightly different
a
yeah I noticed too late.
Can you share your storage and run config too?
r
All defaults there
a
ok, so you just use local storage and local run and run it only locally for now?
r
This all runs on ECS, all the dask workers and agent share a docker image with our codebase on
a
is this diagram correct? which of those tasks run twice or ends in a Pending state?
r
Yup thats right.
duplicate_invoice_analysis_flow_task
ran twice as did one of the mapped tasks in
duplicate_invoice_bucket_analysis_flow_task
I think the rerun of the mapped task is unrelated - due to dask worker OOM which we account for in our tasks but i think I missed this case.
The
duplicate_invoice_analysis_flow_task
finishing in pending is a mystery to me however 😄
a
ditto. The only idea I have is that you could add a reduce step in between - could be that this task runs multiple times because of a lack of reduce step after mapping:
Copy code
mapping_output = print_mapped_output(buckets_analysis)
here is a full flow:
Copy code
from prefect import task, Flow, Parameter, unmapped


@task
def number_analysis():
    pass


@task
def reversal_analysis():
    pass


@task
def duplicate_invoice_analysis_flow_task(organisation_id):
    pass


@task
def duplicate_invoice_bucket_analysis_flow_task(organisation_id, audit_mode, bucket_id):
    pass


@task
def duplicate_invoice_public_records_analysis_flow_task(
    organisation_id="organisation_id", audit_mode=False
):
    pass


@task(log_stdout=True)
def print_mapped_output(x):
    print(x)


with Flow("Analysis") as flow:
    organisation_id = Parameter("organisation_id", default="xyz")
    # Other tasks snipped...

    # Duplicate Invoice
    buckets = duplicate_invoice_analysis_flow_task(
        organisation_id=organisation_id,
        upstream_tasks=[number_analysis, reversal_analysis],
    )
    buckets_analysis = duplicate_invoice_bucket_analysis_flow_task.map(
        organisation_id=unmapped(organisation_id),
        audit_mode=unmapped(False),
        bucket_id=buckets,
    )
    mapping_output = print_mapped_output(buckets_analysis)
    duplicate_invoice_public_records_analysis_flow_task(
        organisation_id=organisation_id,
        audit_mode=False,
        upstream_tasks=[mapping_output],
    )

if __name__ == "__main__":
    flow.visualize()
r
Thank you for the suggestion, will try it out
🙌 1
However I dont think that will solve anything related to
duplicate_invoice_analysis_flow_task
?
a
it could, because the output of it is used as a list of things to map over. But I can’t say for sure
r
Sure, but as
duplicate_invoice_analysis_flow_task
takes some time to complete I dont really want to run it twice if at all possible
a
I’m not aware of any mechanism on Server to ensure that. But it should not run twice. Maybe you can try set up caching to avoid rerunning this task if it’s computationally expensive? e.g. cache_for
Copy code
@task(cache_for=datetime.timedelta(hours=1))
r
We have a decorator for our tasks which checks if the task has completed successfully before and reuses that result as the time based caching was too restrictive. In this case however the task didnt finish in a successful state, the task didnt even finish before it was re ran - I can see logs that happen at the start of the dask interspersed with ones from the middle of the task
a
Did you check why it didn’t finish?
r
This is the logs from this task
The task was then picked up and ran again, however mid way through this run another one was started and the task was marked as successful twice
Im imagining that a terminal state of pending confused the scheduler? thats a guess tho
a
yeah, but I mean specifically the logs of the task, e.g. Dask logs? perhaps it makes sense to find out what is going on within this task, why it takes so long to complete, etc. I would try to perhaps break this task down into multiple ones if it takes so long, look at whether this task has enough resources (e.g. memory) to complete, and try to apply caching or store the output to some external store e.g. S3 or a database to avoid rerunning when not needed
r
Once my code was running it did complete fine, note the time stamps on the logs there is not even a second between task start and marking as pending
a
yeah I see that, but no idea why. Again, I would try to break this task down, try caching or external state persistence and see if this fixes the issue.
r
Sure, correct cache invalidation in this case will be very tricky 😄 Thanks for your help, if I can repro the pending final state I will file a bug.
👍 1