Jason
11/28/2023, 8:10 PMFinished in state Failed("Flow run encountered an exception. PicklingError: Can't pickle <function run_etl at 0x000002B511687CE0>: import of module '__prefect_loader__' failed")
the latest solution we saw was setting "checkpoint=False" but it looks like this has parameter been deprecated. Any help would be greatly appreciatedNate
11/28/2023, 11:27 PMJason
11/30/2023, 1:33 AMEncountered exception during execution:
Traceback (most recent call last):
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\site-packages\prefect\engine.py", line 829, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 291, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 315, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\jabadmin\jobs\python\PSKLOGS_ATMA_RUN.py", line 91, in main
p1.start()
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\context.py", line 336, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 95, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\jabadmin\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function run_etl at 0x000001E409688220>: import of module '__prefect_loader__' failed
08:28:58 PM
prefect.flow_runs
Finished in state Failed("Flow run encountered an exception. PicklingError: Can't pickle <function run_etl at 0x000001E409688220>: import of module '__prefect_loader__' failed")
Jason
11/30/2023, 1:34 AM@flow()
def main():
atma = []
for startsWith in startsWithList:
atma1 = get_blob_files(startsWith, cont_name, blob_service_client)
atma.extend(atma1)
sql_df = get_loaded_files(cs)
blob_df = compare_blob_sql(atma, sql_df)
start_time = datetime.now()
K = len(blob_df)
if 1==0:
run_etl(blob_df,0,K)
if 1==1:
p1 = Process(target=run_etl, args=(blob_df,0,int(round(K/4)),d_path1,))
p1.start()
p2 = Process(target=run_etl, args=(blob_df, int(round(K/4)), 2*int(round(K/4)),d_path2,))
p2.start()
p3 = Process(target=run_etl, args=(blob_df, 2*int(round(K/4)), 3*int(round(K/4)), d_path3, ))
p3.start()
p4 = Process(target=run_etl, args=(blob_df, 3*int(round(K/4)), K, d_path4,))
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
end_time = datetime.now()
print('Runtime: {}'.format(end_time - start_time))
if __name__ == "__main__":
main.serve(name="Project Name")
Jason
11/30/2023, 1:35 AM