Andrew Stewart
09/07/2022, 1:25 PMKhuyen Tran
09/07/2022, 3:03 PMJeff Hale
09/07/2022, 3:05 PMAndrew Stewart
09/07/2022, 3:12 PM09:12:49.068 | INFO | prefect.agent - Submitting flow run 'f42cb8fc-a952-4fa0-8b5f-3af70a33aaaa'
09:12:49.302 | INFO | prefect.infrastructure.process - Opening process 'enigmatic-piculet'...
09:12:49.306 | ERROR | prefect.agent - Failed to submit flow run 'f42cb8fc-a952-4fa0-8b5f-3af70a33aaaa' to infrastructure.
Traceback (most recent call last):
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\agent.py", line 206, in submit_run
await self.task_group.start(submit_flow_run, flow_run, infrastructure)
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 807, in start
return await future
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 702, in _run_wrapped_task
await coro
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\infrastructure\submission.py", line 48, in submit_flow_run
return await infrastructure.run(task_status=task_status)
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\infrastructure\process.py", line 79, in run
cwd=tmp_dir,
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\utilities\processutils.py", line 69, in run_process
**kwargs,
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\contextlib.py", line 170, in __aenter__
return await self.gen.__anext__()
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\prefect\utilities\processutils.py", line 28, in open_process
process = await anyio.open_process(command, **kwargs)
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_core\_subprocesses.py", line 135, in open_process
start_new_session=start_new_session,
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 1102, in open_process
start_new_session=start_new_session,
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\subprocess.py", line 202, in create_subprocess_shell
stderr=stderr, **kwds)
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\base_events.py", line 1514, in subprocess_shell
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
File "C:\ProgramData\Miniconda3\envs\prefect2\lib\asyncio\base_events.py", line 462, in _make_subprocess_transport
raise NotImplementedError
NotImplementedError
(prefect2) C:\Windows\system32>prefect version
Version: 2.3.1
API version: 0.8.0
Python version: 3.7.12
Git commit: 1d485b1d
Built: Thu, Sep 1, 2022 3:53 PM
OS/Arch: win32/AMD64
Profile: cloud-gestamp-us-default
Server type: cloud
Andrew Stewart
09/07/2022, 3:16 PM@flow
def sap_material_documents_by_plant_and_date(plant, query_date):
mat_docs = extract_sap_material_documents(plant, query_date)
if not mat_docs.empty:
mat_docs = transform_sap_material_documents(mat_docs)
load_to_database(mat_docs)
@flow
def sap_material_documents():
engine = sqlalchemy.create_engine(r'<mssql+pyodbc://localhost/data_warehouse?driver=ODBC+Driver+18+for+SQL+Server&trusted_connection=yes>')
sap_mat_docs_max_dates_df = pd.read_sql('SELECT plant, CAST(MAX(datetime_entered) AS DATE) AS "max_date_entered" FROM sap_material_documents GROUP BY plant', engine)
date_extracted = datetime.date.today()
for row_dict in sap_mat_docs_max_dates_df.to_dict(orient='records'):
plant = row_dict['plant']
query_date = row_dict['max_date_entered']
while True:
query_date += datetime.timedelta(days=1)
sap_material_documents_by_plant_and_date(plant=plant, query_date=query_date)
if query_date >= date_extracted - datetime.timedelta(days=1):
break
Andrew Stewart
09/07/2022, 3:20 PMJeff Hale
09/07/2022, 3:40 PMAndrew Stewart
09/07/2022, 5:39 PMJeff Hale
09/07/2022, 5:48 PMAndrew Stewart
09/08/2022, 2:17 AM