If you run a sync function with `prefect.utilities...
# ask-community
m
If you run a sync function with
prefect.utilities.asyncutils.run_sync_in_worker_thread
then the @sync_compatible wrapper will not work, because the root caller function is async. Is this correct and expected? I would expect the sync_compatible wrapper to check if its calling function is sync/async, rather than the root function. So that I can do something like this, which is calling sync flows concurrently from an async root flow. It works mostly - but any functions which use the @sync_compatible wrapper in the sync functions will expect the sync function to be able to await.
Copy code
import asyncio
from prefect import flow
import prefect
from prefect.utilities.asyncutils import run_sync_in_worker_thread


@prefect.utilities.asyncutils.sync_compatible
async def ingest_and_transform(ingest_func, transform_func):
    # Run the ingest function and wait for it to complete.
    data = await run_sync_in_worker_thread(ingest_func)
    # As soon as ingestion is done, start the transform function.
    await run_sync_in_worker_thread(transform_func, data)


@flow
async def namely_dw_pipeline():
    from mad_data_namely.dw.profiles.ingest import ingest_profiles
    from mad_data_namely.dw.profiles.transform import transform_profiles
    from mad_data_namely.dw.reports.ingest import ingest_reports
    from mad_data_namely.dw.reports.transform import transform_reports

    # Setup tasks for profiles and reports
    task_profiles = ingest_and_transform(ingest_profiles, transform_profiles)
    task_reports = ingest_and_transform(ingest_reports, transform_reports)

    # Run both tasks concurrently. Each will start its transform as soon as its ingest completes.
    await asyncio.gather(task_profiles, task_reports)


if __name__ == "__main__":
    asyncio.run(namely_dw_pipeline())