Freddie
11/03/2021, 10:36 AMAnna Geller
Freddie
11/03/2021, 10:46 AMdef get_persons_flow(schedule):
with Flow('persons', schedule) as flow:
_, invalid = download_persons()
check_invalid_persons(invalid)
return flow
Task definition
@task(name='Download persons')
def download_persons() -> Tuple[Set[int], Set[int]]:
config = get_config()
apis = Apis(config)
downloader = Downloader(apis)
return apis.run(downloader.download_persons())
The downloader.download_persons()
call is what returns the sets of tuples. I'll try and reproduce locally too.Freddie
11/03/2021, 10:47 AMAnna Geller
check_invalid_persons
takes the longest correct? If so, perhaps you could redesign download_persons
so that it returns a list of integers (or a list of anything that is required by this check_ function). Then you could use Mapping to do this check for all IDs in parallel? not sure what your API’s constraints are, but to speed things up, parallel execution would be worth trying.
from prefect.executors import LocalDaskExecutor
with Flow('persons', schedule, executor=LocalDaskExecutor()) as flow:
list_invalid_ids = download_persons()
check_invalid_persons.map(list_invalid_ids)
Freddie
11/03/2021, 10:54 AMcheck_invalid_persons
is a task too. It batch processes the ids.Freddie
11/03/2021, 10:54 AMdownload_persons
task is taking too long to complete.Freddie
11/03/2021, 10:55 AMdownload_persons
function:
<http://self.logger.info|self.logger.info>('Completed download of persons.')
return seen_ids, unseen_ids
Freddie
11/03/2021, 10:55 AMFreddie
11/03/2021, 10:58 AMAnna Geller
executor=LocalDaskExecutor(scheduler="threads")
Freddie
11/03/2021, 11:01 AMFreddie
11/03/2021, 11:02 AMAnna Geller
Freddie
11/03/2021, 11:03 AMAnna Geller
Freddie
11/03/2021, 11:07 AMAnna Geller
Freddie
11/03/2021, 11:10 AMFreddie
11/03/2021, 11:10 AMFreddie
11/03/2021, 11:10 AMFreddie
11/03/2021, 11:18 AMFreddie
11/03/2021, 11:21 AMAnna Geller
Freddie
11/03/2021, 11:47 AMKevin Kho
Freddie
11/03/2021, 2:05 PMKevin Kho