https://prefect.io logo
Title
m

Madison Schott

11/08/2022, 5:23 PM
Getting this error with a fivetran task, even though I defined it exactly as in the docs (https://github.com/fivetran/prefect-fivetran). Any ideas?
sync = await fivetran_sync_flow(
                ^
SyntaxError: 'await' outside async function
1
r

Ryan Peden

11/08/2022, 5:35 PM
Hi Madison - it looks like that example in the readme might need updating, because it's using await inside a non-async function, and prefect_fivetran.connectors doesn't appear to export export anything named fivetran_sync_flow. Based on the function signature, it looks like this may do what you want? You'd ideally need to call it inside an async flow, though, so something like:
import asyncio
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import trigger_fivetran_connector_sync_and_wait_for_completion

@flow
async def my_flow():
    ...
    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret",
    )
    fivetran_result = await trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id="my_connector_id",
        schedule_type="my_schedule_type",
        poll_status_every_n_seconds=30,
    )
    ...

if __name__ == "__main__":
    asyncio.run(my_flow())
Though if you want to one of the async tasks from prefect_fivetran.connectors inside a sync flow, you could do it without
await
and then call
.wait
or
.result
to wait for the PrefectFuture to complete (examples here). I think this would be true of all the tasks in prefect_fivetrain.connectors since they are all async.
m

Madison Schott

11/08/2022, 6:01 PM
I think I would like it inside a sync flow- Would I need the
.wait
or
.result
at all? Does this just return whether the sync was successful or not?
r

Ryan Peden

11/08/2022, 6:21 PM
My apologies - I believe we make async tasks and flows sync-compatible, so you can call them as-is from within sync flows. So my previous example would become:
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import trigger_fivetran_connector_sync_and_wait_for_completion

@flow
def my_flow():
    ...
    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret",
    )
    fivetran_result =  trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id="my_connector_id",
        schedule_type="my_schedule_type",
        poll_status_every_n_seconds=30,
    )
    ...


my_flow()
m

Madison Schott

11/09/2022, 10:06 PM
got it, thanks!