Hi, I have a function that runs parallelly to fetc...
# ask-community
s
Hi, I have a function that runs parallelly to fetch data from external api. The
get_data_asynchronous()
function below creates 10 threads and calls the api concurrently. I am using this function in
run_factset_api()
. As a standalone code locally this works fine. But when I schedule a run on prefect, the
run_factset_api()
function exits before execution and returns coroutine object (although locally it returns the desired value). Is there something I should do to facilitate parallel run on prefect?
Copy code
async def get_data_asynchronous():
    with ThreadPoolExecutor(max_workers=10) as executor:
        with requests.Session() as session:
            # Set any session parameters here before calling `fetch`
            loop = asyncio.get_event_loop()
            tasks = [
                loop.run_in_executor(
                    executor,
                    company.get_company_records,
                    *(session, [companies], {**company_info, **formulas})
                    # Allows us to pass in multiple arguments to `fetch`
                )
                for companies in companies_to_fetch
            ]
            for response in await asyncio.gather(*tasks):
                master = master.append(response, ignore_index=True)
    return master

@task
def run_factset_api():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    master = loop.run_until_complete(future)
    return master

@task
def save_data_to_s3(emmi_reduction):
    s3_resource = boto3.resource('s3')
    s3_resource.Object(bucket, 'factset_output_data.csv').put(Body=csv_buffer.getvalue())

with Flow('api-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
     response = run_factset_api()        
     if response:
         save_data_to_db(response)
        
flow.register('pipeline')
a
@Sridhar usually, in Prefect Core, you could attach a LocalDaskExecutor or DaskExecutor to your flow:
Copy code
from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
Then, you could use Mapping to parallelize the operations. It could be that the asyncio code in your task would need some logic to handle Prefect Context in a thread-safe way. The executors above already have that logic. But based on this discussion, it looks like it can work when you do
asyncio.run()
@Sridhar btw, Orion has first-class support for asyncio so that your task will get easier in Prefect 2.0 https://orion-docs.prefect.io/tutorials/execution/#asynchronous-execution