https://prefect.io logo
Title
m

Madison Schott

11/17/2022, 10:55 PM
Getting this error with the fivetran syncs when I move
await
from the function, any ideas?
A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
@task
def webevent_sync(fivetran_credentials):
    webevents_secret = Secret.load("webevents-connector_id")
    webevents_sync = trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id=webevents_secret.get(),
        schedule_type="manual",
        poll_status_every_n_seconds=30,
    )
I got rid of the
await
in front of the class name
I want to call the sync from a flow and have the sync run when the flow does
n

Nate

11/18/2022, 12:42 AM
It looks like you’re calling the “trigger_fivetran_connector_…” flow from within a task, where calling flows / tasks from tasks is not supported, can you try making your webevent_sync task a flow?
m

Madison Schott

11/18/2022, 10:00 PM
Ah that is a flow? The documentation doesn't make that super clear
So a flow can be called within a flow?
@flow
def winc_sync(fivetran_credentials):
    winc_secret = Secret.load("winc-connector-id")
    winc_sync = trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id=winc_secret.get(),
        schedule_type="manual",
        poll_status_every_n_seconds=30
    )
Is this way of calling it within the other flow correct?
@flow
def data_pipeline_2():
    # run data syncs with Fivetran
    print("Pipeline 2.0 is working")

    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret"
    )

    # sync <http://Winc.com|Winc.com> data
    winc_sync = winc_sync(fivetran_credentials)
n

Nate

11/18/2022, 10:08 PM
Ah that is a flow? The documentation doesn't make that super clear
yep - we can reach out to the maintainers at fivetran and propose some updates to the docs
So a flow can be called within a flow?
yep! this was a big motivator for prefect 2, allowing flows to be called directly from other flows and yes, that looks right
m

Madison Schott

11/18/2022, 10:09 PM
Great, thanks! So this is what fixed that issue of not using
await
? I'm still confused as to what that does and how it's different with or without.
n

Nate

11/18/2022, 10:23 PM
I'm still looking into the exact reason you got that message, bc I would have expected an error like this in your case
RuntimeError: Flows cannot be run from within tasks. Did you mean to call this flow in a flow?
but in general, I think its safe to say: • if your main @flow decorated function is defined as a sync function (not
async def
just
def
) then you should treat everything inside as a callable (a normal function you don't have to
await
) because prefect knows how handle things like
Secret.load
in a sync context • if your main flow is async, you will have to await flows / tasks / functions that are defined as asynchronous functions