Madison Schott
11/08/2022, 5:23 PMsync = await fivetran_sync_flow(
^
SyntaxError: 'await' outside async function
Ryan Peden
11/08/2022, 5:35 PMimport asyncio
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import trigger_fivetran_connector_sync_and_wait_for_completion
@flow
async def my_flow():
...
fivetran_credentials = FivetranCredentials(
api_key="my_api_key",
api_secret="my_api_secret",
)
fivetran_result = await trigger_fivetran_connector_sync_and_wait_for_completion(
fivetran_credentials=fivetran_credentials,
connector_id="my_connector_id",
schedule_type="my_schedule_type",
poll_status_every_n_seconds=30,
)
...
if __name__ == "__main__":
asyncio.run(my_flow())
await
and then call .wait
or .result
to wait for the PrefectFuture to complete (examples here).
I think this would be true of all the tasks in prefect_fivetrain.connectors since they are all async.Madison Schott
11/08/2022, 6:01 PM.wait
or .result
at all? Does this just return whether the sync was successful or not?Ryan Peden
11/08/2022, 6:21 PMfrom prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import trigger_fivetran_connector_sync_and_wait_for_completion
@flow
def my_flow():
...
fivetran_credentials = FivetranCredentials(
api_key="my_api_key",
api_secret="my_api_secret",
)
fivetran_result = trigger_fivetran_connector_sync_and_wait_for_completion(
fivetran_credentials=fivetran_credentials,
connector_id="my_connector_id",
schedule_type="my_schedule_type",
poll_status_every_n_seconds=30,
)
...
my_flow()
Madison Schott
11/09/2022, 10:06 PM