Maity
10/23/2023, 6:25 AMprefect.utilities.asyncutils.run_sync_in_worker_thread
then the @sync_compatible wrapper will not work, because the root caller function is async. Is this correct and expected? I would expect the sync_compatible wrapper to check if its calling function is sync/async, rather than the root function. So that I can do something like this, which is calling sync flows concurrently from an async root flow. It works mostly - but any functions which use the @sync_compatible wrapper in the sync functions will expect the sync function to be able to await.
import asyncio
from prefect import flow
import prefect
from prefect.utilities.asyncutils import run_sync_in_worker_thread
@prefect.utilities.asyncutils.sync_compatible
async def ingest_and_transform(ingest_func, transform_func):
# Run the ingest function and wait for it to complete.
data = await run_sync_in_worker_thread(ingest_func)
# As soon as ingestion is done, start the transform function.
await run_sync_in_worker_thread(transform_func, data)
@flow
async def namely_dw_pipeline():
from mad_data_namely.dw.profiles.ingest import ingest_profiles
from mad_data_namely.dw.profiles.transform import transform_profiles
from mad_data_namely.dw.reports.ingest import ingest_reports
from mad_data_namely.dw.reports.transform import transform_reports
# Setup tasks for profiles and reports
task_profiles = ingest_and_transform(ingest_profiles, transform_profiles)
task_reports = ingest_and_transform(ingest_reports, transform_reports)
# Run both tasks concurrently. Each will start its transform as soon as its ingest completes.
await asyncio.gather(task_profiles, task_reports)
if __name__ == "__main__":
asyncio.run(namely_dw_pipeline())