Sridhar
11/29/2021, 12:38 AMget_data_asynchronous()
function below creates 10 threads and calls the api concurrently. I am using this function in run_factset_api()
. As a standalone code locally this works fine. But when I schedule a run on prefect, the run_factset_api()
function exits before execution and returns coroutine object (although locally it returns the desired value). Is there something I should do to facilitate parallel run on prefect?
async def get_data_asynchronous():
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
executor,
company.get_company_records,
*(session, [companies], {**company_info, **formulas})
# Allows us to pass in multiple arguments to `fetch`
)
for companies in companies_to_fetch
]
for response in await asyncio.gather(*tasks):
master = master.append(response, ignore_index=True)
return master
@task
def run_factset_api():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
master = loop.run_until_complete(future)
return master
@task
def save_data_to_s3(emmi_reduction):
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'factset_output_data.csv').put(Body=csv_buffer.getvalue())
with Flow('api-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
response = run_factset_api()
if response:
save_data_to_db(response)
flow.register('pipeline')
Anna Geller
from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
Then, you could use Mapping to parallelize the operations. It could be that the asyncio code in your task would need some logic to handle Prefect Context in a thread-safe way. The executors above already have that logic.
But based on this discussion, it looks like it can work when you do asyncio.run()
Anna Geller
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by