I'm just starting with prefect v2 and when I try t...
# prefect-community
a
I'm just starting with prefect v2 and when I try to run a deployment from Prefect cloud to a local agent running on a Windows server, the agent raises a NotImplementedError.
1
k
Can you show a sample of your code?
j
Thanks for your question. Could you please move the traceback to the thread to help keep the main channel tidy? It looks like you have a similar issue to this SO question. Prefect uses FastAPI under the hood. It looks like setting an event loop policy worked in their case.
a
Traceback:
Copy code
09:12:49.068 | INFO    | prefect.agent - Submitting flow run 'f42cb8fc-a952-4fa0-8b5f-3af70a33aaaa'
09:12:49.302 | INFO    | prefect.infrastructure.process - Opening process 'enigmatic-piculet'...
09:12:49.306 | ERROR   | prefect.agent - Failed to submit flow run 'f42cb8fc-a952-4fa0-8b5f-3af70a33aaaa' to infrastructure.
Traceback (most recent call last):
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\agent.py", line 206, in submit_run
    await self.task_group.start(submit_flow_run, flow_run, infrastructure)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 807, in start
    return await future
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 702, in _run_wrapped_task
    await coro
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\infrastructure\submission.py", line 48, in submit_flow_run
    return await infrastructure.run(task_status=task_status)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\infrastructure\process.py", line 79, in run
    cwd=tmp_dir,
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\utilities\processutils.py", line 69, in run_process
    **kwargs,
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\utilities\processutils.py", line 28, in open_process
    process = await anyio.open_process(command, **kwargs)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_core\_subprocesses.py", line 135, in open_process
    start_new_session=start_new_session,
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 1102, in open_process
    start_new_session=start_new_session,
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\subprocess.py", line 202, in create_subprocess_shell
    stderr=stderr, **kwds)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\base_events.py", line 1514, in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\base_events.py", line 462, in _make_subprocess_transport
    raise NotImplementedError
NotImplementedError
Copy code
(prefect2) C:\Windows\system32>prefect version
Version:             2.3.1
API version:         0.8.0
Python version:      3.7.12
Git commit:          1d485b1d
Built:               Thu, Sep 1, 2022 3:53 PM
OS/Arch:             win32/AMD64
Profile:             cloud-gestamp-us-default
Server type:         cloud
The flow I'm trying to run is a parent flow with a subflow:
Copy code
@flow
def sap_material_documents_by_plant_and_date(plant, query_date):
    mat_docs = extract_sap_material_documents(plant, query_date)
    if not mat_docs.empty:
        mat_docs = transform_sap_material_documents(mat_docs)
        load_to_database(mat_docs)

@flow
def sap_material_documents():
    engine = sqlalchemy.create_engine(r'<mssql+pyodbc://localhost/data_warehouse?driver=ODBC+Driver+18+for+SQL+Server&trusted_connection=yes>')
    sap_mat_docs_max_dates_df = pd.read_sql('SELECT plant, CAST(MAX(datetime_entered) AS DATE) AS "max_date_entered" FROM sap_material_documents GROUP BY plant', engine)
    date_extracted = datetime.date.today()
    
    for row_dict in sap_mat_docs_max_dates_df.to_dict(orient='records'):    
        plant = row_dict['plant']
        query_date = row_dict['max_date_entered']
        while True:
            query_date += datetime.timedelta(days=1)
            sap_material_documents_by_plant_and_date(plant=plant, query_date=query_date)
            if query_date >= date_extracted - datetime.timedelta(days=1):
                break
Here is the full file and the deployment:
j
I don’t think it’s something specific to your code - looks like it might be a Windows async issue that you can work around by setting the event loop policy - like in the above SO answer.
a
Unfortunately, I'm running on Windows an uvloop doesn't run on Windows, so that SO answer doesn't help.
j
You might want to try upgrading to Python 3.8 if possible. I saw that as a suggestion somewhere.
a
It works on 3.10 for me. I figured out how to upgrade my dependency that was keeping me on 3.7 and upgraded to 3.10. It looks like anyio doesn't have a way to do subprocesses on Windows with Python 3.7 and that's the crux of the issue.
🙏 1
🙌 1