Madison Schott
11/17/2022, 10:55 PMawait
from the function, any ideas?
A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
@task
def webevent_sync(fivetran_credentials):
webevents_secret = Secret.load("webevents-connector_id")
webevents_sync = trigger_fivetran_connector_sync_and_wait_for_completion(
fivetran_credentials=fivetran_credentials,
connector_id=webevents_secret.get(),
schedule_type="manual",
poll_status_every_n_seconds=30,
)
await
in front of the class nameNate
11/18/2022, 12:42 AMMadison Schott
11/18/2022, 10:00 PM@flow
def winc_sync(fivetran_credentials):
winc_secret = Secret.load("winc-connector-id")
winc_sync = trigger_fivetran_connector_sync_and_wait_for_completion(
fivetran_credentials=fivetran_credentials,
connector_id=winc_secret.get(),
schedule_type="manual",
poll_status_every_n_seconds=30
)
@flow
def data_pipeline_2():
# run data syncs with Fivetran
print("Pipeline 2.0 is working")
fivetran_credentials = FivetranCredentials(
api_key="my_api_key",
api_secret="my_api_secret"
)
# sync <http://Winc.com|Winc.com> data
winc_sync = winc_sync(fivetran_credentials)
Nate
11/18/2022, 10:08 PMAh that is a flow? The documentation doesn't make that super clearyep - we can reach out to the maintainers at fivetran and propose some updates to the docs
So a flow can be called within a flow?yep! this was a big motivator for prefect 2, allowing flows to be called directly from other flows and yes, that looks right
Madison Schott
11/18/2022, 10:09 PMawait
? I'm still confused as to what that does and how it's different with or without.Nate
11/18/2022, 10:23 PMRuntimeError: Flows cannot be run from within tasks. Did you mean to call this flow in a flow?
but in general, I think its safe to say:
• if your main @flow decorated function is defined as a sync function (not async def
just def
) then you should treat everything inside as a callable (a normal function you don't have to await
) because prefect knows how handle things like Secret.load
in a sync context
• if your main flow is async, you will have to await flows / tasks / functions that are defined as asynchronous functions