Tom Shaffner
12/08/2021, 10:23 PMKevin Kho
Kevin Kho
Tom Shaffner
12/08/2021, 10:32 PMTom Shaffner
12/08/2021, 10:33 PMKevin Kho
Tom Shaffner
12/08/2021, 10:35 PMTom Shaffner
12/08/2021, 10:36 PMTom Shaffner
12/08/2021, 10:37 PMKevin Kho
Tom Shaffner
12/08/2021, 10:45 PMKevin Kho
Tom Shaffner
12/08/2021, 10:48 PMTom Shaffner
12/08/2021, 10:49 PMTom Shaffner
12/08/2021, 11:02 PMZanie
Zanie
Kevin Kho
Tom Shaffner
12/08/2021, 11:12 PMZanie
Tom Shaffner
12/08/2021, 11:17 PMZanie
Zanie
Tom Shaffner
12/08/2021, 11:18 PMTom Shaffner
12/08/2021, 11:18 PMZanie
Zanie
Zanie
8 December 2021,06:09:51 ,prefect.CloudTaskRunner,DEBUG,"Task 'Pull Oracle Data': Execution successful."
it looks like we’re getting past this point.Tom Shaffner
12/08/2021, 11:29 PMZanie
Zanie
Zanie
try:
pickled_val = cloudpickle.dumps(return_val)
except Exception as exc:
err_msg = (
f"Failed to pickle result of type {type(return_val).__name__!r} with "
f'exception: "{type(exc).__name__}: {str(exc)}". This timeout handler "'
"requires your function return value to be serializable with `cloudpickle`."
)
logger.error(f"{name}: {err_msg}")
pickled_val = cloudpickle.dumps(RuntimeError(err_msg))
logger.debug(f"{name}: Passing result back to main process...")
try:
queue.put(pickled_val)
except Exception:
logger.error(
f"{name}: Failed to put result in queue to main process!",
exc_info=True,
)
raise
Zanie
None
instead of the dataframe, do you ever see timeout issues?Tom Shaffner
12/08/2021, 11:33 PMTom Shaffner
12/08/2021, 11:34 PMTom Shaffner
12/08/2021, 11:35 PMTom Shaffner
12/08/2021, 11:37 PMZanie
Tom Shaffner
12/08/2021, 11:39 PMZanie
Tom Shaffner
12/08/2021, 11:40 PMpip install -U prefect
I still get version 0.15.10Tom Shaffner
12/08/2021, 11:40 PMZanie
pip install git+<https://github.com/PrefectHQ/prefect@queue-empty-timeout#egg=prefect>
Tom Shaffner
12/08/2021, 11:41 PMZanie
Zanie
Tom Shaffner
12/08/2021, 11:48 PMZanie
Kevin Kho
flow.run()
for this oneZanie
CloudFlowRunner
.Zanie
Tom Shaffner
12/08/2021, 11:51 PMTom Shaffner
12/08/2021, 11:52 PM# Set start schedule to Eastern time, via a start date of yesterday
start=pendulum.now("America/New_York").add(days=-1)
schedule = Schedule(clocks=[CronClock(CRON_SCHEDULE,start_date=start)])
with Flow(FLOW_NAME,result=LocalResult(dir='/Prefect_data_disk/results'),schedule=schedule) as flow:
<http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,prod=USE_ORACLE_PROD)
df = set_data_types(df)
create_data_summary_artifacts(df)
u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME)
<http://logger.info|logger.info>("Initiating history upload process.")
run_sql=f"SELECT * FROM {SUMMARY_VIEW_NAME}"
summary_df = pull_summary_data_via(upstream_tasks=[u],sql=run_sql)
delete_today_from_history_if_exists(upstream_tasks=[summary_df],df=df,history_table=HISTORY_TABLE_NAME)
upload_to_history_table(df=summary_df, destination_table=HISTORY_TABLE_NAME, append=True)
# using https by default
flow.storage = Git(
git_clone_url_secret_name="MY_REPO_CLONE_URL",
repo='Org/Prefect_ETL',
flow_path=__file__, # location of flow file in repo
)
flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
'PREFECT__LOGGING__LEVEL':'DEBUG',
},
working_dir=file_path, labels=["normal-process"])
flow.executor = LocalDaskExecutor()
flow.register(project_name=PROJECT_NAME)
Tom Shaffner
12/08/2021, 11:52 PMTom Shaffner
12/08/2021, 11:53 PMTom Shaffner
12/08/2021, 11:54 PMTom Shaffner
12/08/2021, 11:54 PMZanie
git
installation I think.Zanie
flow.run()
which might make it easier to diagnose where this install is failing.Tom Shaffner
12/09/2021, 3:37 PM[2021-12-09 10:36:16-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 10:36:16-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 10:36:16-0500-INFO - prefect.FlowRunner]-[flow_runner.py:run]-Line 245: Beginning Flow run for 'Items with No List Price Pull'
[2021-12-09 10:36:16-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Pull Oracle Data': Starting task run...
[2021-12-09 10:36:16-0500-WARNING - prefect.TaskRunner]-[task_runner.py:check_task_is_cached]-Line 798: Task 'Pull Oracle Data': Can't use cache because it is now invalid
[2021-12-09 10:36:17-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 10:36:17-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 10:36:17-0500-INFO - prefect.FlowRunner]-[flow_runner.py:run]-Line 245: Beginning Flow run for 'Items with No List Price Pull'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Pull Oracle Data': Starting task run...
[2021-12-09 10:36:17-0500-WARNING - prefect.TaskRunner]-[task_runner.py:check_task_is_cached]-Line 798: Task 'Pull Oracle Data': Can't use cache because it is now invalid
[2021-12-09 10:36:17-0500-ERROR - prefect.TaskRunner]-[task_runner.py:get_task_run_state]-Line 906: Task 'Pull Oracle Data': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 483, in run_task_with_timeout
return run_with_multiprocess_timeout(
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 400, in run_with_multiprocess_timeout
run_process.start()
File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/usr/lib/python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
_check_not_importing_main()
File "/usr/lib/python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Pull Oracle Data': Finished task run for task with final state: 'Failed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Set Data Types': Starting task run...
[2021-12-09 10:36:17-0500-WARNING - prefect.TaskRunner]-[task_runner.py:check_task_is_cached]-Line 798: Task 'Set Data Types': Can't use cache because it is now invalid
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Upload to table in Analytics DB': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Flow Data Artifacts': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Pull Summary Data': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Delete Existing History Data': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Upload to history table in Analytics DB': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.FlowRunner]-[flow_runner.py:determine_final_state]-Line 705: Flow run FAILED: some reference tasks failed.
[2021-12-09 10:36:17-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 14.88 seconds to complete.
Zanie
flow.run()
with if __name__ == '__main__':
as that error describesZanie
Zanie
d3f3162
Tom Shaffner
12/09/2021, 4:43 PMTom Shaffner
12/09/2021, 5:17 PMZanie
flow.run()
Zanie
prefect run -p path-to-flow.py --log-level debug
if you’d likeTom Shaffner
12/09/2021, 5:22 PMRetrieving local flow...[2021-12-09 12:22:10-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:22:10-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
Done
Running flow locally...
└── 12:22:10 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:22:10 | DEBUG | Using executor type LocalDaskExecutor
└── 12:22:10 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:22:10 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:22:10 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:22:10 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:22:10 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:22:10 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:22:10 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:22:10 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:22:11 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:22:12 | DEBUG | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:22:12 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:22:12 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:22:12 | INFO | Task 'Set Data Types': Starting task run...
└── 12:22:12 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:22:12 | DEBUG | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:22:12 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:22:12 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:22:12 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:22:12 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:22:12 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:22:12 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:22:12 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:22:12 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Tom Shaffner
12/09/2021, 5:22 PMZanie
Zanie
Tom Shaffner
12/09/2021, 5:27 PMTom Shaffner
12/09/2021, 5:27 PMTom Shaffner
12/09/2021, 5:27 PMZanie
oracle_pull
your own module?Tom Shaffner
12/09/2021, 5:28 PMTom Shaffner
12/09/2021, 5:28 PMZanie
Zanie
Zanie
processes=True
on your LocalDaskExecutor
?Tom Shaffner
12/09/2021, 5:30 PMprocesses=True
gives:
Retrieving local flow...[2021-12-09 12:29:37-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:29:37-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
Done
Running flow locally...
└── 12:29:37 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:29:37 | DEBUG | Using executor type LocalDaskExecutor
└── 12:29:37 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:29:38 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:29:38 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:29:38 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:29:38 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:29:38 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:29:38 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:29:38 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:29:38 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:29:39 | DEBUG | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:29:39 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:29:39 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:29:39 | INFO | Task 'Set Data Types': Starting task run...
└── 12:29:39 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:29:39 | DEBUG | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:29:39 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:29:39 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:29:39 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:29:39 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:29:39 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:29:39 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:29:39 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:29:39 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Zanie
Tom Shaffner
12/09/2021, 5:31 PMfile_path = Path(__file__).resolve().parent
and then pass that path in as the working_dir
in LocalRun, so it should import fineTom Shaffner
12/09/2021, 5:32 PMRetrieving local flow...[2021-12-09 12:31:40-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:31:40-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
Done
Running flow locally...
└── 12:31:40 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:31:40 | DEBUG | Using executor type LocalDaskExecutor
└── 12:31:40 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:31:40 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:31:40 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:31:40 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:31:40 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:31:40 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:31:40 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:31:40 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:31:40 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:31:41 | DEBUG | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:31:41 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:31:41 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:31:41 | INFO | Task 'Set Data Types': Starting task run...
└── 12:31:41 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:31:41 | DEBUG | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:31:41 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:31:41 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:31:41 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:31:41 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:31:41 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:31:41 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:31:41 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Zanie
└── 12:31:40 | DEBUG | Using executor type LocalDaskExecutor
Zanie
Tom Shaffner
12/09/2021, 5:34 PMTom Shaffner
12/09/2021, 5:39 PMprocesses=True
fails with this error:
Retrieving local flow...[2021-12-09 12:38:12-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:38:12-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:38:12-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 12:38:12 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:38:12 | DEBUG | Using executor type LocalDaskExecutor
└── 12:38:12 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:38:12 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:38:12 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:38:12 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:38:12 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:38:12 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:38:12 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:38:12 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:38:13 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:38:14 | DEBUG | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:38:14 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:38:14 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:38:14 | INFO | Task 'Set Data Types': Starting task run...
└── 12:38:14 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:38:14 | DEBUG | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:38:14 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:38:14 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:38:14 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:38:14 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:38:14 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:38:14 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:38:14 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Not having that processes flag in there also seems to return this error.Tom Shaffner
12/09/2021, 5:39 PMZanie
processes=False
?Zanie
Tom Shaffner
12/09/2021, 5:41 PMRetrieving local flow...[2021-12-09 12:40:58-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:40:58-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:40:58-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 12:40:58 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:40:58 | DEBUG | Using executor type LocalDaskExecutor
└── 12:40:58 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:40:58 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:40:58 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:40:58 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:40:58 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:40:58 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:40:58 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:40:58 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:40:59 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:41:00 | DEBUG | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:41:00 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:41:00 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:41:00 | INFO | Task 'Set Data Types': Starting task run...
└── 12:41:00 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:41:00 | DEBUG | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:41:00 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:41:00 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:41:00 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:41:00 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:41:00 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:41:00 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:41:00 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Tom Shaffner
12/09/2021, 5:41 PMTom Shaffner
12/09/2021, 5:41 PMZanie
LocalRun
isn’t used when you’re running something locallyZanie
Zanie
Tom Shaffner
12/09/2021, 5:43 PMZanie
LocalRun
here you’re using flow.run()
insteadZanie
Tom Shaffner
12/09/2021, 5:43 PMZanie
PYTHONPATH=… prefect run …
and set the PYTHONPATH to include your module directory we should get past this errorTom Shaffner
12/09/2021, 5:49 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 12:46:39-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:46:39-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:46:39-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 12:46:39 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:46:39 | DEBUG | Using executor type LocalDaskExecutor
└── 12:46:39 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:46:39 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:46:39 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:46:39 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:46:39 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:46:39 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:46:39 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:46:39 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
[2021-12-09 12:46:40-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 19.88 seconds to complete.
└── 12:48:19 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
Is that helpful at all? It seems to just stop thereZanie
Zanie
Tom Shaffner
12/09/2021, 5:51 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect version
0.15.10+13.gd3f3162eb
Zanie
None
from your task?Zanie
Tom Shaffner
12/09/2021, 5:54 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 12:53:45-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:53:45-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:53:45-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 12:53:45 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:53:45 | DEBUG | Using executor type LocalDaskExecutor
└── 12:53:45 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:53:46 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:53:46 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:53:46 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:53:46 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:53:46 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:53:46 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:53:46 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 1s timeout...
[2021-12-09 12:53:46-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
└── 12:53:47 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:53:48 | DEBUG | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:53:48 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:53:48 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:53:48 | INFO | Task 'Set Data Types': Starting task run...
└── 12:53:48 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:53:48 | DEBUG | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:53:48 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:53:48 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:53:48 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:53:48 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:53:48 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:53:48 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:53:48 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:53:48 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Zanie
Tom Shaffner
12/09/2021, 5:56 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 12:55:47-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:55:47-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:55:47-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 12:55:47 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 12:55:47 | DEBUG | Using executor type LocalDaskExecutor
└── 12:55:47 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:55:47 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 12:55:47 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:55:47 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:55:47 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:55:47 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:55:47 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:55:47 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 12:55:48-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 16.65 seconds to complete.
└── 12:56:06 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:56:06 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to Success
└── 12:56:06 | DEBUG | Task 'Pull Oracle Data': Handling state change from Success to Cached
└── 12:56:06 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'Cached'
└── 12:56:06 | INFO | Task 'Set Data Types': Starting task run...
└── 12:56:06 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:56:06 | DEBUG | Task 'Set Data Types': Handling state change from Pending to Running
└── 12:56:06 | DEBUG | Task 'Set Data Types': Calling task.run() method...
└── 12:56:06 | INFO | Setting Formats...
└── 12:56:06 | ERROR | Task 'Set Data Types': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 458, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "f_items_no_list_price_pull.py", line 28, in set_data_types
pre_drop_row_count = len(df.index)
AttributeError: 'NoneType' object has no attribute 'index'
└── 12:56:06 | DEBUG | Task 'Set Data Types': Handling state change from Running to Failed
└── 12:56:06 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'Failed'
└── 12:56:06 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:56:06 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 12:56:06 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:56:06 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:56:06 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:56:06 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:56:07 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO | Task 'Pull Summary Data': Starting task run...
└── 12:56:07 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:56:07 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:56:07 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 12:56:07 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:56:07 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:56:07 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:56:07 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:56:07 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:56:07 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO | Flow run FAILED: some reference tasks failed.
└── 12:56:07 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Tom Shaffner
12/09/2021, 5:56 PMTom Shaffner
12/09/2021, 5:56 PMZanie
Tom Shaffner
12/09/2021, 5:58 PMZanie
try:
logger.debug(f"{name}: Pickling value of size {sys.getsizeof(return_val)}...")
pickled_val = cloudpickle.dumps(return_val)
logger.debug(f"{name}: Pickling successful!")
except Exception as exc:
err_msg = (
f"Failed to pickle result of type {type(return_val).__name__!r} with "
f'exception: "{type(exc).__name__}: {str(exc)}". This timeout handler "'
"requires your function return value to be serializable with `cloudpickle`."
)
logger.error(f"{name}: {err_msg}")
pickled_val = cloudpickle.dumps(RuntimeError(err_msg))
logger.debug(f"{name}: Passing result back to main process...")
Tom Shaffner
12/09/2021, 5:58 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=20),timeout=(50),log_stdout=True) #(3*60*60)) # ,max_retries=3,retry_delay=timedelta(minutes=20)
def pull_oracle_data_via(oracle_query_sql, prod=True) -> DataFrame:
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(prod=prod) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
return None
Zanie
Tom Shaffner
12/09/2021, 5:59 PMTom Shaffner
12/09/2021, 5:59 PMZanie
None
Zanie
Zanie
Tom Shaffner
12/09/2021, 6:02 PMTom Shaffner
12/09/2021, 6:03 PMZanie
flow.run()
you shouldn’t need eitherZanie
Tom Shaffner
12/09/2021, 6:03 PMTom Shaffner
12/09/2021, 6:04 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 13:03:32-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:03:32-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:03:32-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 13:03:32 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 13:03:32 | DEBUG | Using executor type LocalDaskExecutor
└── 13:03:32 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:03:32 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 13:03:32 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:03:32 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:03:32 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:03:32 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:03:32 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:03:32 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:03:33-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 17.55 seconds to complete.
└── 13:04:22 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
Retrying with None return nextTom Shaffner
12/09/2021, 6:05 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 13:05:07-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:05:07-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:05:07-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 13:05:07 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 13:05:07 | DEBUG | Using executor type LocalDaskExecutor
└── 13:05:07 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:05:07 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 13:05:07 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:05:07 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:05:07 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:05:07 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:05:07 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:05:07 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:05:07-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 15.5 seconds to complete.
└── 13:05:24 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 13:05:24 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to Success
└── 13:05:24 | DEBUG | Task 'Pull Oracle Data': Handling state change from Success to Cached
└── 13:05:24 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'Cached'
└── 13:05:24 | INFO | Task 'Set Data Types': Starting task run...
└── 13:05:24 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 13:05:24 | DEBUG | Task 'Set Data Types': Handling state change from Pending to Running
└── 13:05:24 | DEBUG | Task 'Set Data Types': Calling task.run() method...
└── 13:05:24 | INFO | Setting Formats...
└── 13:05:24 | ERROR | Task 'Set Data Types': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 461, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "f_items_no_list_price_pull.py", line 28, in set_data_types
pre_drop_row_count = len(df.index)
AttributeError: 'NoneType' object has no attribute 'index'
└── 13:05:24 | DEBUG | Task 'Set Data Types': Handling state change from Running to Failed
└── 13:05:24 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'Failed'
└── 13:05:25 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 13:05:25 | DEBUG | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 13:05:25 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 13:05:25 | DEBUG | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 13:05:25 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | INFO | Task 'Pull Summary Data': Starting task run...
└── 13:05:25 | DEBUG | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 13:05:25 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 13:05:25 | DEBUG | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 13:05:25 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 13:05:25 | DEBUG | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 13:05:25 | INFO | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | INFO | Flow run FAILED: some reference tasks failed.
└── 13:05:25 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Tom Shaffner
12/09/2021, 6:06 PMTom Shaffner
12/09/2021, 6:06 PMZanie
Zanie
Tom Shaffner
12/09/2021, 6:08 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=20),timeout=(50),log_stdout=True) #(3*60*60)) # ,max_retries=3,retry_delay=timedelta(minutes=20)
def pull_oracle_data_via(oracle_query_sql, prod=True) -> DataFrame:
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(prod=prod) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
try:
logger.debug(f"{df}: Pickling value of size {sys.getsizeof(df)}...")
pickled_val = cloudpickle.dumps(df)
logger.debug(f"{df}: Pickling successful!")
except Exception as exc:
err_msg = (
f"Failed to pickle result of type {type(df).__name__!r} with "
f'exception: "{type(exc).__name__}: {str(exc)}". This timeout handler "'
"requires your function return value to be serializable with `cloudpickle`."
)
logger.error(f"{df}: {err_msg}")
pickled_val = cloudpickle.dumps(RuntimeError(err_msg))
logger.debug(f"{df}: Passing result back to main process...")
return df
Zanie
pickeld_val
at the end but 🤷 we should still see some info hereTom Shaffner
12/09/2021, 6:10 PMTom Shaffner
12/09/2021, 6:10 PMTom Shaffner
12/09/2021, 6:10 PMprefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 13:08:29-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:08:29-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:08:29-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 13:08:29 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 13:08:29 | DEBUG | Using executor type LocalDaskExecutor
└── 13:08:29 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:08:29 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 13:08:29 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:08:29 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:08:29 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:08:29 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:08:29 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:08:29 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:08:29-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 20.0 seconds to complete.
└── 13:09:19 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
Tom Shaffner
12/09/2021, 6:12 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=20),timeout=(50),log_stdout=True) #(3*60*60)) # ,max_retries=3,retry_delay=timedelta(minutes=20)
def pull_oracle_data_via(oracle_query_sql, prod=True) -> DataFrame:
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(prod=prod) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
logger.debug(f"{df}: Pickling value of size {sys.getsizeof(df)}...")
pickled_val = cloudpickle.dumps(df)
logger.debug(f"{df}: Pickling successful!")
logger.debug(f"{df}: Passing result back to main process...")
return df
That gives this:
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 13:11:09-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:11:09-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:11:09-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 13:11:09 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 13:11:09 | DEBUG | Using executor type LocalDaskExecutor
└── 13:11:09 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:11:09 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 13:11:09 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:11:09 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:11:09 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:11:09 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:11:09 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:11:09 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:11:09-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 18.71 seconds to complete.
└── 13:11:59 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
Tom Shaffner
12/09/2021, 6:12 PMZanie
Zanie
Zanie
Tom Shaffner
12/09/2021, 6:15 PMTom Shaffner
12/09/2021, 6:16 PMTom Shaffner
12/09/2021, 6:19 PM>>> sys.getsizeof(df)
4507188
>>> test=cloudpickle.dumps(df)
>>>
Tom Shaffner
12/09/2021, 6:19 PMZanie
Zanie
PREFECT__LOGGING__LEVEL=DEBUG prefect run -p ...
Zanie
Zanie
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import pandas as pd
import numpy as np
@task(timeout=10)
def return_df():
return pd.DataFrame(np.random.choice(["foo", "bar", "baz"], size=(100000, 100)))
with Flow("example", executor=LocalDaskExecutor()) as flow:
return_df()
Zanie
Tom Shaffner
12/09/2021, 6:35 PM(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ PREFECT__LOGGING__LEVEL=DEBUG prefect run -p f_items_no_list_price_pull.py --log-level debug
Retrieving local flow...[2021-12-09 13:34:14-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:34:14-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:34:14-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`, `git_token_username`, `use_ssh`, and `format_access_token`.
Done
Running flow locally...
└── 13:34:14 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 13:34:14 | DEBUG | Using executor type LocalDaskExecutor
└── 13:34:14 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:34:14 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 13:34:14 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:34:14 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:34:14 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:34:14 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:34:14 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:34:14 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:34:15-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Pull Oracle Data': Executing...
[2021-12-09 13:34:15-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 14.23 seconds to complete.
[2021-12-09 13:34:30-0500-DEBUG - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 27: DATAFRAME HERE
[3317 rows x 36 columns]: Passing result back to main process...
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Pull Oracle Data': Execution successful.
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Execution successful.
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Pull Oracle Data': Pickling value of size 4475246...
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling value of size 4475246...
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Pull Oracle Data': Pickling successful!
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling successful!
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Pull Oracle Data': Passing result back to main process...
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Passing result back to main process...
└── 13:35:04 | DEBUG | Task 'Pull Oracle Data': Execution process closed, collecting result...
Tom Shaffner
12/09/2021, 6:36 PMTom Shaffner
12/09/2021, 6:36 PMZanie
Tom Shaffner
12/09/2021, 6:42 PMTom Shaffner
12/09/2021, 6:43 PMZanie
Tom Shaffner
12/09/2021, 6:44 PMZanie
Tom Shaffner
12/09/2021, 6:48 PMRunning flow locally...
└── 13:46:27 | INFO | Beginning Flow run for 'Items with No List Price Pull'
└── 13:46:27 | DEBUG | Using executor type LocalDaskExecutor
└── 13:46:27 | DEBUG | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:46:27 | INFO | Task 'Pull Oracle Data': Starting task run...
└── 13:46:27 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:46:27 | DEBUG | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:46:27 | DEBUG | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:46:27 | DEBUG | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:46:27 | DEBUG | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:46:27 | DEBUG | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:46:28-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Pull Oracle Data': Executing...
[2021-12-09 13:46:28-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 14.73 seconds to complete.
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Pull Oracle Data': Execution successful.
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Execution successful.
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Pull Oracle Data': Pickling value of size 4475246...
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling value of size 4475246...
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Pull Oracle Data': Pickling successful!
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling successful!
[2021-12-09 13:46:45-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Pull Oracle Data': Passing result back to main process...
INFO:prefect.TaskRunner:Task 'Pull Oracle Data': Passing result back to main process...
└── 13:46:45 | DEBUG | Task 'Pull Oracle Data': Result received from subprocess, unpickling...
└── 13:46:45 | DEBUG | Task 'Pull Oracle Data': Handling state change from Running to Success
└── 13:46:45 | DEBUG | Task 'Pull Oracle Data': Handling state change from Success to Cached
└── 13:46:45 | INFO | Task 'Pull Oracle Data': Finished task run for task with final state: 'Cached'
└── 13:46:45 | INFO | Task 'Set Data Types': Starting task run...
└── 13:46:45 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 13:46:45 | DEBUG | Task 'Set Data Types': Handling state change from Pending to Running
└── 13:46:45 | DEBUG | Task 'Set Data Types': Calling task.run() method...
└── 13:46:45 | INFO | Setting Formats...
└── 13:46:45 | DEBUG | Task 'Set Data Types': Handling state change from Running to Success
└── 13:46:45 | DEBUG | Task 'Set Data Types': Handling state change from Success to Cached
└── 13:46:45 | INFO | Task 'Set Data Types': Finished task run for task with final state: 'Cached'
└── 13:46:45 | INFO | Task 'Flow Data Artifacts': Starting task run...
└── 13:46:45 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Pending to Running
└── 13:46:45 | DEBUG | Task 'Flow Data Artifacts': Calling task.run() method...
└── 13:46:45 | DEBUG | Task 'Flow Data Artifacts': Attaching process based timeout handler...
└── 13:46:45 | INFO | Task 'Upload to table in Analytics DB': Starting task run...
└── 13:46:45 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Pending to Running
└── 13:46:45 | DEBUG | Task 'Upload to table in Analytics DB': Calling task.run() method...
└── 13:46:45 | DEBUG | Task 'Upload to table in Analytics DB': Attaching process based timeout handler...
└── 13:46:45 | DEBUG | Task 'Flow Data Artifacts': Sending execution to a new process...
└── 13:46:45 | DEBUG | Task 'Upload to table in Analytics DB': Sending execution to a new process...
└── 13:46:45 | DEBUG | Task 'Flow Data Artifacts': Waiting for process to return with 600s timeout...
└── 13:46:45 | DEBUG | Task 'Upload to table in Analytics DB': Waiting for process to return with 14400s timeout...
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Flow Data Artifacts': Executing...
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Flow Data Artifacts': Execution successful.
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Flow Data Artifacts': Pickling value of size 16...
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Flow Data Artifacts': Pickling successful!
[2021-12-09 13:46:46-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Flow Data Artifacts': Passing result back to main process...
└── 13:46:46 | DEBUG | Task 'Flow Data Artifacts': Result received from subprocess, unpickling...
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Upload to table in Analytics DB': Executing...
[2021-12-09 13:46:46-0500-INFO - prefect]-[prefect_pull_framework.py:upload_to_table]-Line 56: Uploading to table Items_Missing_List_Price in SQL Server DB...
└── 13:46:46 | DEBUG | Task 'Flow Data Artifacts': Handling state change from Running to Success
└── 13:46:46 | INFO | Task 'Flow Data Artifacts': Finished task run for task with final state: 'Success'
Replacing line feed with carriage return in ERROR_MESSAGE column to avoid SQL errors.
[2021-12-09 13:46:48-0500-INFO - prefect]-[prefect_pull_framework.py:upload_to_table]-Line 59: Upload Complete!
[2021-12-09 13:46:48-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Upload to table in Analytics DB': Execution successful.
[2021-12-09 13:46:48-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Upload to table in Analytics DB': Pickling value of size 16...
[2021-12-09 13:46:48-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Upload to table in Analytics DB': Pickling successful!
[2021-12-09 13:46:48-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Upload to table in Analytics DB': Passing result back to main process...
└── 13:46:48 | DEBUG | Task 'Upload to table in Analytics DB': Result received from subprocess, unpickling...
└── 13:46:48 | DEBUG | Task 'Upload to table in Analytics DB': Handling state change from Running to Success
└── 13:46:48 | INFO | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'Success'
└── 13:46:48 | INFO | Task 'Pull Summary Data': Starting task run...
└── 13:46:48 | DEBUG | Task 'Pull Summary Data': Handling state change from Pending to Running
└── 13:46:48 | DEBUG | Task 'Pull Summary Data': Calling task.run() method...
└── 13:46:48 | DEBUG | Task 'Pull Summary Data': Attaching process based timeout handler...
└── 13:46:48 | DEBUG | Task 'Pull Summary Data': Sending execution to a new process...
└── 13:46:48 | DEBUG | Task 'Pull Summary Data': Waiting for process to return with 1800s timeout...
[2021-12-09 13:46:49-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Pull Summary Data': Executing...
[2021-12-09 13:46:49-0500-INFO - prefect]-[prefect_pull_framework.py:pull_summary_data_via]-Line 63: Initiating Sql Server Pull via SELECT * FROM v_Items_Missing_List_Price_Summary...
[2021-12-09 13:46:51-0500-INFO - prefect]-[prefect_pull_framework.py:pull_summary_data_via]-Line 68: Rows returned in summary data query: 1651...
[2021-12-09 13:46:51-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Pull Summary Data': Execution successful.
[2021-12-09 13:46:51-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Pull Summary Data': Pickling value of size 908945...
[2021-12-09 13:46:51-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Pull Summary Data': Pickling successful!
[2021-12-09 13:46:51-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Pull Summary Data': Passing result back to main process...
└── 13:46:51 | DEBUG | Task 'Pull Summary Data': Result received from subprocess, unpickling...
└── 13:46:51 | DEBUG | Task 'Pull Summary Data': Handling state change from Running to Success
└── 13:46:51 | INFO | Task 'Pull Summary Data': Finished task run for task with final state: 'Success'
└── 13:46:51 | INFO | Task 'Delete Existing History Data': Starting task run...
└── 13:46:51 | INFO | Task 'Upload to history table in Analytics DB': Starting task run...
└── 13:46:51 | DEBUG | Task 'Delete Existing History Data': Handling state change from Pending to Running
└── 13:46:51 | DEBUG | Task 'Upload to history table in Analytics DB': Handling state change from Pending to Running
└── 13:46:51 | DEBUG | Task 'Delete Existing History Data': Calling task.run() method...
└── 13:46:51 | DEBUG | Task 'Upload to history table in Analytics DB': Calling task.run() method...
└── 13:46:51 | DEBUG | Task 'Delete Existing History Data': Attaching process based timeout handler...
└── 13:46:51 | DEBUG | Task 'Upload to history table in Analytics DB': Attaching process based timeout handler...
└── 13:46:51 | DEBUG | Task 'Upload to history table in Analytics DB': Sending execution to a new process...
└── 13:46:51 | DEBUG | Task 'Delete Existing History Data': Sending execution to a new process...
└── 13:46:51 | DEBUG | Task 'Upload to history table in Analytics DB': Waiting for process to return with 7200s timeout...
└── 13:46:51 | DEBUG | Task 'Delete Existing History Data': Waiting for process to return with 1800s timeout...
[2021-12-09 13:46:52-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Upload to history table in Analytics DB': Executing...
[2021-12-09 13:46:52-0500-INFO - prefect]-[prefect_pull_framework.py:upload_to_history_table]-Line 92: Uploading to table Items_Missing_List_Price_History in SQL Server DB...
[2021-12-09 13:46:52-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Delete Existing History Data': Executing...
[2021-12-09 13:46:52-0500-INFO - prefect]-[prefect_pull_framework.py:delete_today_from_history_if_exists]-Line 75: Check if history summary already uploaded for today in Items_Missing_List_Price_History...
[2021-12-09 13:46:52-0500-INFO - prefect]-[prefect_pull_framework.py:delete_today_from_history_if_exists]-Line 85: Data found: Deleting one day of data from Items_Missing_List_Price_History before replacement...
Flow run succeeded! I had to cut off the last part of the log as it got too long but it worked! What did you change?Tom Shaffner
12/09/2021, 6:48 PMTom Shaffner
12/09/2021, 6:49 PMTom Shaffner
12/09/2021, 6:50 PMZanie
Zanie
Tom Shaffner
12/09/2021, 6:52 PMZanie
Zanie
Tom Shaffner
12/09/2021, 6:54 PMZanie
Zanie
Zanie
Tom Shaffner
12/09/2021, 6:58 PMTom Shaffner
12/09/2021, 6:58 PMZanie
Tom Shaffner
12/09/2021, 6:59 PMZanie
Zanie
Zanie
Tom Shaffner
12/09/2021, 6:59 PMZanie
Tom Shaffner
12/09/2021, 7:02 PMTom Shaffner
12/09/2021, 7:03 PMTom Shaffner
01/05/2022, 7:27 PMZanie
Zanie
Tom Shaffner
01/05/2022, 8:23 PMZanie