https://prefect.io logo
Title
m

Madison Schott

11/15/2022, 9:23 PM
Getting this error with the fivetran syncs when I move
await
from the function, any ideas?
A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
z

Zanie

11/15/2022, 9:43 PM
If going from
async function -> sync function -> sync compatible function
you need to run the first sync function in a worker thread or sync compatible thinks it’s in an async function still.
If you share a small example we’ll probably be able to give clearer guidance
m

Madison Schott

11/15/2022, 9:43 PM
@task
def webevent_sync(fivetran_credentials):
    webevents_secret = Secret.load("webevents-connector_id")
    webevents_sync = trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id=webevents_secret.get(),
        schedule_type="manual",
        poll_status_every_n_seconds=30,
    )
not sure what worker thread means
z

Zanie

11/15/2022, 9:46 PM
Can you share the full traceback?
m

Madison Schott

11/15/2022, 9:53 PM
this is how I'm calling it in my flow:
webevent_sync = webevent_sync(fivetran_credentials)
is that what you mean by full traceback?
z

Zanie

11/15/2022, 9:59 PM
Ah no I’m looking for all of the output from the errror that shows what functions are called
m

Madison Schott

11/15/2022, 9:59 PM
whoops
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "data_pipeline_2.0_prod.py", line 20, in <module>
    flow=data_pipeline_2,
NameError: name 'data_pipeline_2' is not defined

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/Cellar/python@3.9/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/Cellar/python@3.9/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/prefect/cli/deployment.py", line 754, in build
    flow = prefect.utilities.importtools.import_object(entrypoint)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'data_pipeline_2.0_prod.py' encountered an exception
An exception occurred.
sys:1: RuntimeWarning: coroutine 'Block.load' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
z

Zanie

11/15/2022, 10:34 PM
Ah that loos like a different error
m

Madison Schott

11/15/2022, 10:45 PM
the flow name is defined in the file though, I thought it would be caused by the other one
z

Zanie

11/15/2022, 11:30 PM
I don’t even see the other error here though?
I think a more complete example would be helpful
m

Madison Schott

11/15/2022, 11:32 PM
@flow
def data_pipeline_2():
    # run data syncs with Fivetran
    print("Pipeline 2.0 is working")

    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret",
    )

    # sync <http://Winc.com|Winc.com> data
    winc_sync = winc_sync(fivetran_credentials)

    # run Winc models
    user_details = user_detail_model_run(wait_for=[winc_sync])
this is what my flow looks like- the tasks are written outside the flow above
any ideas?