Madison Schott11/17/2022, 10:55 PM
from the function, any ideas?
A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
@task def webevent_sync(fivetran_credentials): webevents_secret = Secret.load("webevents-connector_id") webevents_sync = trigger_fivetran_connector_sync_and_wait_for_completion( fivetran_credentials=fivetran_credentials, connector_id=webevents_secret.get(), schedule_type="manual", poll_status_every_n_seconds=30, )
in front of the class name
Nate11/18/2022, 12:42 AM
Madison Schott11/18/2022, 10:00 PM
@flow def winc_sync(fivetran_credentials): winc_secret = Secret.load("winc-connector-id") winc_sync = trigger_fivetran_connector_sync_and_wait_for_completion( fivetran_credentials=fivetran_credentials, connector_id=winc_secret.get(), schedule_type="manual", poll_status_every_n_seconds=30 )
@flow def data_pipeline_2(): # run data syncs with Fivetran print("Pipeline 2.0 is working") fivetran_credentials = FivetranCredentials( api_key="my_api_key", api_secret="my_api_secret" ) # sync <http://Winc.com|Winc.com> data winc_sync = winc_sync(fivetran_credentials)
Nate11/18/2022, 10:08 PM
Ah that is a flow? The documentation doesn't make that super clearyep - we can reach out to the maintainers at fivetran and propose some updates to the docs
So a flow can be called within a flow?yep! this was a big motivator for prefect 2, allowing flows to be called directly from other flows and yes, that looks right
Madison Schott11/18/2022, 10:09 PM
? I'm still confused as to what that does and how it's different with or without.
Nate11/18/2022, 10:23 PM
but in general, I think its safe to say: • if your main @flow decorated function is defined as a sync function (not
RuntimeError: Flows cannot be run from within tasks. Did you mean to call this flow in a flow?
) then you should treat everything inside as a callable (a normal function you don't have to
) because prefect knows how handle things like
in a sync context • if your main flow is async, you will have to await flows / tasks / functions that are defined as asynchronous functions