I'm just starting with prefect v2 and when I try t...
# prefect-community
I'm just starting with prefect v2 and when I try to run a deployment from Prefect cloud to a local agent running on a Windows server, the agent raises a NotImplementedError.
Can you show a sample of your code?
Thanks for your question. Could you please move the traceback to the thread to help keep the main channel tidy? It looks like you have a similar issue to this SO question. Prefect uses FastAPI under the hood. It looks like setting an event loop policy worked in their case.
Copy code
09:12:49.068 | INFO    | prefect.agent - Submitting flow run 'f42cb8fc-a952-4fa0-8b5f-3af70a33aaaa'
09:12:49.302 | INFO    | prefect.infrastructure.process - Opening process 'enigmatic-piculet'...
09:12:49.306 | ERROR   | prefect.agent - Failed to submit flow run 'f42cb8fc-a952-4fa0-8b5f-3af70a33aaaa' to infrastructure.
Traceback (most recent call last):
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\agent.py", line 206, in submit_run
    await self.task_group.start(submit_flow_run, flow_run, infrastructure)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 807, in start
    return await future
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 702, in _run_wrapped_task
    await coro
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\infrastructure\submission.py", line 48, in submit_flow_run
    return await infrastructure.run(task_status=task_status)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\infrastructure\process.py", line 79, in run
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\utilities\processutils.py", line 69, in run_process
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\utilities\processutils.py", line 28, in open_process
    process = await anyio.open_process(command, **kwargs)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_core\_subprocesses.py", line 135, in open_process
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 1102, in open_process
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\subprocess.py", line 202, in create_subprocess_shell
    stderr=stderr, **kwds)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\base_events.py", line 1514, in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\base_events.py", line 462, in _make_subprocess_transport
    raise NotImplementedError
Copy code
(prefect2) C:\Windows\system32>prefect version
Version:             2.3.1
API version:         0.8.0
Python version:      3.7.12
Git commit:          1d485b1d
Built:               Thu, Sep 1, 2022 3:53 PM
OS/Arch:             win32/AMD64
Profile:             cloud-gestamp-us-default
Server type:         cloud
The flow I'm trying to run is a parent flow with a subflow:
Copy code
def sap_material_documents_by_plant_and_date(plant, query_date):
    mat_docs = extract_sap_material_documents(plant, query_date)
    if not mat_docs.empty:
        mat_docs = transform_sap_material_documents(mat_docs)

def sap_material_documents():
    engine = sqlalchemy.create_engine(r'<mssql+pyodbc://localhost/data_warehouse?driver=ODBC+Driver+18+for+SQL+Server&trusted_connection=yes>')
    sap_mat_docs_max_dates_df = pd.read_sql('SELECT plant, CAST(MAX(datetime_entered) AS DATE) AS "max_date_entered" FROM sap_material_documents GROUP BY plant', engine)
    date_extracted = datetime.date.today()
    for row_dict in sap_mat_docs_max_dates_df.to_dict(orient='records'):    
        plant = row_dict['plant']
        query_date = row_dict['max_date_entered']
        while True:
            query_date += datetime.timedelta(days=1)
            sap_material_documents_by_plant_and_date(plant=plant, query_date=query_date)
            if query_date >= date_extracted - datetime.timedelta(days=1):
Here is the full file and the deployment:
I don’t think it’s something specific to your code - looks like it might be a Windows async issue that you can work around by setting the event loop policy - like in the above SO answer.
Unfortunately, I'm running on Windows an uvloop doesn't run on Windows, so that SO answer doesn't help.
You might want to try upgrading to Python 3.8 if possible. I saw that as a suggestion somewhere.
It works on 3.10 for me. I figured out how to upgrade my dependency that was keeping me on 3.7 and upgraded to 3.10. It looks like anyio doesn't have a way to do subprocesses on Windows with Python 3.7 and that's the crux of the issue.
🙏 1
🙌 1