Hi all, we're running Prefect on Heroku. We've got...
# ask-community
f
Hi all, we're running Prefect on Heroku. We've got some flows which share state between tasks. Namely, the first task goes and partitions a set of ids by those which are still valid or not (by calling out to a 3rd party API). The second task then processes the invalid ones to determine what's happened to them. The first task returns a tuple of sets of ints. There are at most 1M ints between the two sets. At the moment it's taking multiple hours to return from the first task and mark it completed and I wondered if anyone had any advice on how to deal with this and whether this is expected behaviour?
a
@Freddie can you elaborate more what do you mean exactly by sharing state between tasks in your use case? If you could give me a simple flow example that I can reproduce, this would be helpful. I think what you perhaps mean is that you pass some data (the IDs you mentioned) from one task to another, not state, correct? A flow example would help to eliminate ambiguities here.
f
Hey @Anna Geller here's a few snippets: Flow definition
Copy code
def get_persons_flow(schedule):
    with Flow('persons', schedule) as flow:
        _, invalid = download_persons()
        check_invalid_persons(invalid)

    return flow
Task definition
Copy code
@task(name='Download persons')
def download_persons() -> Tuple[Set[int], Set[int]]:
    config = get_config()
    apis = Apis(config)
    downloader = Downloader(apis)
    return apis.run(downloader.download_persons())
The
downloader.download_persons()
call is what returns the sets of tuples. I'll try and reproduce locally too.
You're correct - we pass data between, not state.
a
Thanks for the example! At a first glance, it looks like the task
check_invalid_persons
takes the longest correct? If so, perhaps you could redesign
download_persons
so that it returns a list of integers (or a list of anything that is required by this check_ function). Then you could use Mapping to do this check for all IDs in parallel? not sure what your API’s constraints are, but to speed things up, parallel execution would be worth trying.
Copy code
from prefect.executors import LocalDaskExecutor

with Flow('persons', schedule, executor=LocalDaskExecutor()) as flow:
    list_invalid_ids = download_persons()
    check_invalid_persons.map(list_invalid_ids)
f
The
check_invalid_persons
is a task too. It batch processes the ids.
The problem we're having is that the
download_persons
task is taking too long to complete.
So these are the last few lines of the
download_persons
function:
Copy code
<http://self.logger.info|self.logger.info>('Completed download of persons.')

return seen_ids, unseen_ids
The log line is printed, then it takes hours for the task to be marked as successful and return.
I'm assuming that returning 1M ids in this set is OK - some of them will be bigints, but even if they are huge, we're still looking at something in the region of <50MB right
a
@Freddie do you believe this is an issue with Prefect? If you’re just asking for advice on how to speed this download process, perhaps you can redesign this task a bit to take advantage of multithreading to download those in parallel using Mapping? Prefect could then help, if you would assign threads to the Dask executor:
Copy code
executor=LocalDaskExecutor(scheduler="threads")
f
I believe this is Prefect yes. The download completes but the task doesn't mark as successful.
The downloading takes a couple of minutes, and then it takes multiple hours once it has finished to return the sets and mark as completed.
a
Perhaps this tutorial can be helpful - it shows how to start by doing one IO operation, and then scaling out to Dask to do it for multiple such operations: https://docs.prefect.io/core/advanced_tutorials/advanced-mapping.html#advanced-features
f
We don't have any issue with parallelism in the task. What the task is supposed to do finishes, it's just that the task itself is not marked as completed on Prefect cloud for several hours so the next job doesn't start.
a
I see. Could you build an example that I could reproduce to debug the issue? It’s hard to grasp why the task is not marked as finished even though it finished 🙂 Perhaps you can also share more about your setup: which agent do you use? You said, you use Prefect cloud, but also that you run Prefect on Heroku - what exactly do you host on Heroku - an agent?
f
Yep, we have an agent on Heroku
a
a Local agent? Docker agent?
f
A local agent
We have a local agent on Heroku
I don't know if you have time, but I'd be more than happy to show you on Zoom or similar, just for complete transparency.
I'll also put some more logging on these jobs to see if there's anything else getting in the way.
upvote 1
And we're going to deploy the test flow I wrote that worked fine locally onto our different environments to see what happens there
upvote 1
a
thanks for that! If you are interested in this kind of support, you can contact sales@prefect.io
f
👍
k
Hey @Freddie, I’m sure you understand but we don’t hop on calls here because it’s quite involved to debug this kind of thing. We do sell the enterprise hours if you’re interested. But about your problem, does this work well on a Local agent on your laptop?
f
It does work on a local agent with the test flow that I've written to try and emulate. I'm rolling out that test flow on our staging and production stacks to debug more deeply. I had reached out here first in case there was anything obviously silly I was doing with passing data around, but it wouldn't appear so, so I'll let you know if I find anything Prefect-related as I dig.
k
Yeah your flow seems pretty reasonable I think.