Sridhar
11/29/2021, 12:38 AMget_data_asynchronous()
function below creates 10 threads and calls the api concurrently. I am using this function in run_factset_api()
. As a standalone code locally this works fine. But when I schedule a run on prefect, the run_factset_api()
function exits before execution and returns coroutine object (although locally it returns the desired value). Is there something I should do to facilitate parallel run on prefect?
async def get_data_asynchronous():
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
executor,
company.get_company_records,
*(session, [companies], {**company_info, **formulas})
# Allows us to pass in multiple arguments to `fetch`
)
for companies in companies_to_fetch
]
for response in await asyncio.gather(*tasks):
master = master.append(response, ignore_index=True)
return master
@task
def run_factset_api():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
master = loop.run_until_complete(future)
return master
@task
def save_data_to_s3(emmi_reduction):
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'factset_output_data.csv').put(Body=csv_buffer.getvalue())
with Flow('api-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
response = run_factset_api()
if response:
save_data_to_db(response)
flow.register('pipeline')
Anna Geller
from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
Then, you could use Mapping to parallelize the operations. It could be that the asyncio code in your task would need some logic to handle Prefect Context in a thread-safe way. The executors above already have that logic.
But based on this discussion, it looks like it can work when you do asyncio.run()
Anna Geller