Here's a puzzle for you. I have a number of flows ...
# ask-community
t
Here's a puzzle for you. I have a number of flows with tasks that query oracle. In most of these flows the task works fine. In one instance though, the oracle query simply never returns. The code to query oracle itself is reused between the flows; it imports a library for talking to oracle. Things I've checked: The query I'm using worked until recently at which point I made a minor change and this behavior began, but the new query works fine in sql developer or if I use my old (pre-Prefect) python process; in that case it returns in 11.66 seconds. I've tweaked the query every way I can think of to fix it, to no avail. If I point the flow at a different .sql file it works fine. If I have the flow load the sql from file and print it, and then copy that query to sql developer it works fine. If I manually set the same library to call the same query code using the same .sql file in the python environment being used by Prefect, it works fine. As best I can tell then, this error only occurs in Prefect, and it manifests as a complete failure to return so I don't even have an error code to check. Is there anything anyone can think of that might cause such behavior? Or anything else I could check that might be different?
k
Is it a particularly long query?
Does the Flow fail eventually?
t
I have a 3-hour timeout set on that task; I haven't seen it hit that limit yet as I've been testing (and 1 minute is enough for me to know it's not working), but presumably that would be the eventual failure? Though as I say, it hasn't actually gone to that status yet (I just introduced the limits a day or so ago)
The query is 98 lines long. One that works is 70 lines
k
Can you try this heartbeat config and see if it helps?
t
Hmm, I'm running this locally, and using the LocalDaskExecutor, should I just add that to the LocalRun config as an environment variable? If I switch to debug logs I see a new log entry every ten seconds on the task so the heartbeat should still be running, if that's what you were checking for.
I added it as an environment variable to LocalRun - not seeing any difference so far.
Also, think that would be if I were in Prefect Cloud, right? For a local run that probably isn't even relevant?
k
Yes to local run. Yes for local run probably not.
t
Hmm, weirdly, (and I don't know if this is related or not), but if I drop the timeout to 100 seconds, the task still doesn't fail. I'm at 2.5 minutes and still going now
k
This is not entirely unexpected as timeouts are really a best effort thing but sometimes they can not work, especially for operations involving connections to something
t
hmm, but the task is being checked every 10 seconds, right? It should at least cancel itself and mark as failure?
My whole point of implementing Prefect was to try and be more robust to network issues; if timeouts don't work, how can I ever account for days with network problems?
I have to say, this becomes kind of a show-stopper. If timeouts don't work our whole purpose with Prefect is sort of null and void; old tasks will back up every time there's a network delay or failure, and I can't build logic to deal with those failures as the status will never show up as failed. Is there no way to deal with this?
z
Hey Tom, can you share the debug level logs that indicate what type of timeout handler is being used?
Timeouts are quite complicated and their method of enforcement is dependent on the executor type, executor settings, and system platform.
k
Could you share how you queried the database also? Are you handling the closing of that connection?
t
I'd be happy to share whatever you like. I'm using cx_oracle to query the database. I can share all the relevant pieces of code too. Is there not a way to have it force-end the task, even if it abandons the query? The log is here.
z
Great thanks that’s helpful. So here we’re running the task code in a subprocess so we can join it with a timeout. The process is returning before the timeout expires at 11:14
t
Here's the library I'm importing to query oracle with. Kevin, your question makes me think I should add some cleanup code? I'd assumed there would be some auto-ending of the task in the timeout; I take it that's not the case?
z
It looks like it may be frozen attempting to collect the result from the subprocess, which would be an actual bug.
Are you returning a large piece of data?
t
Oh, well that would be a relief for me, I've done everything I can to figure out what's going on here. And no, I'm returning a pandas dataframe which, in this case, should have a bit under 6k rows and <30 columns
I have MUCH larger dataframes I return at times; this one is not one of them
z
Interesting. So I’ve got an exploratory change at https://github.com/PrefectHQ/prefect/pull/5213 that you could try
I’ve never seen a freeze while collecting the result like this though.
I actually see at
8 December 2021,06:09:51 ,prefect.CloudTaskRunner,DEBUG,"Task 'Pull Oracle Data': Execution successful."
it looks like we’re getting past this point.
t
Yeah, I saw that, but the task neither finished nor errored so I assumed that was an initiating flag rather than completion one
z
Oops sorry yeah that’s upstream of returning the result.
That log is within the timeout subprocess.
After that log, we should see some logs about the result being passed back to the parent
Copy code
try:
        pickled_val = cloudpickle.dumps(return_val)
    except Exception as exc:
        err_msg = (
            f"Failed to pickle result of type {type(return_val).__name__!r} with "
            f'exception: "{type(exc).__name__}: {str(exc)}". This timeout handler "'
            "requires your function return value to be serializable with `cloudpickle`."
        )
        logger.error(f"{name}: {err_msg}")
        pickled_val = cloudpickle.dumps(RuntimeError(err_msg))

    logger.debug(f"{name}: Passing result back to main process...")

    try:
        queue.put(pickled_val)
    except Exception:
        logger.error(
            f"{name}: Failed to put result in queue to main process!",
            exc_info=True,
        )
        raise
If you run you query, but just return
None
instead of the dataframe, do you ever see timeout issues?
t
Hmm, not sure. I didn't see the issue when I changed to a different query with the same calling code.
I should note too, this query did, in another instance I have, return values where a carriage return was in one of the pandas cells. In another place in my code I have a check for that before uploading and when I was testing manually I saw it; wondering if the change I made returned a value with that, or some other special character, that is causing issues and which didn't get returned previously?
That other git test version above gave the same result; log here
I've got to leave soon, is there anything else I can provide now to help you?
z
I pushed a change that adds two more debug lines
t
Will that show up via a pip install already or is there a lag? Something else I need to use to install that version?
z
I think you’ll just need to pip install the git version again
t
If I do
pip install -U prefect
I still get version 0.15.10
Oh, or do you mean you pushed to that change above?
z
Yeah:
Copy code
pip install git+<https://github.com/PrefectHQ/prefect@queue-empty-timeout#egg=prefect>
t
Is 0.15.10+12.gbc349d374 the right version then?
z
Yep!
I added a couple more debug logs around pickling. It seems like there are two problems here: • The result isn’t being passed back to the parent • The parent is waiting for a nonexistant result without raising a timeout
t
I'm not seeing anything new in the logs offhand. I even re-registered the flow again to verify that wasn't an issue, and the attached is what I get.
z
Hm, it kind of looks like your agent isn’t using the new code. Did you restart the agent? Is the agent on the machine where you are pip installing?
k
I think he is using
flow.run()
for this one
z
It’s using the
CloudFlowRunner
.
We can look into this some more tomorrow. Unfortunately, I think we’ll have a hard time reproducing your issue ourselves.
t
It is the same machine and I rebooted the whole machine after install to verify everything was refreshed. I just restarted the prefect-agent again, running now....
Copy code
# Set start schedule to Eastern time, via a start date of yesterday
start=pendulum.now("America/New_York").add(days=-1)
schedule = Schedule(clocks=[CronClock(CRON_SCHEDULE,start_date=start)])

with Flow(FLOW_NAME,result=LocalResult(dir='/Prefect_data_disk/results'),schedule=schedule) as flow:
    <http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
    df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,prod=USE_ORACLE_PROD)
    df = set_data_types(df)
    create_data_summary_artifacts(df)
    u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME)
    
    <http://logger.info|logger.info>("Initiating history upload process.")
    run_sql=f"SELECT * FROM {SUMMARY_VIEW_NAME}"
    summary_df = pull_summary_data_via(upstream_tasks=[u],sql=run_sql)
    delete_today_from_history_if_exists(upstream_tasks=[summary_df],df=df,history_table=HISTORY_TABLE_NAME)
    upload_to_history_table(df=summary_df, destination_table=HISTORY_TABLE_NAME, append=True)

# using https by default
flow.storage = Git(
    git_clone_url_secret_name="MY_REPO_CLONE_URL",
    repo='Org/Prefect_ETL',
    flow_path=__file__,  # location of flow file in repo
)

flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
                                'PREFECT__LOGGING__LEVEL':'DEBUG',
                            },
        working_dir=file_path, labels=["normal-process"])

flow.executor = LocalDaskExecutor()
flow.register(project_name=PROJECT_NAME)
The setup for this flow in case it helps
Restarted agent, but I don't think this is different
@Zanie understood about the difficulty reproducing. What can I do to help?
And actually, that question is less pressured now too because that's all the time I have to debug right now, but if you leave any next steps I could take here I'll get on them first thing in the morning; thanks so much for your taking the time and being willing to support here!
z
No I don’t see the new logs there either so something isn’t working out with the
git
installation I think.
You’re welcome! You should be able to get these logs to reproduce using
flow.run()
which might make it easier to diagnose where this install is failing.
t
@Zanie, flow.run() did help, here's what that gave me:
Copy code
[2021-12-09 10:36:16-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 10:36:16-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 10:36:16-0500-INFO - prefect.FlowRunner]-[flow_runner.py:run]-Line 245: Beginning Flow run for 'Items with No List Price Pull'
[2021-12-09 10:36:16-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Pull Oracle Data': Starting task run...
[2021-12-09 10:36:16-0500-WARNING - prefect.TaskRunner]-[task_runner.py:check_task_is_cached]-Line 798: Task 'Pull Oracle Data': Can't use cache because it is now invalid
[2021-12-09 10:36:17-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 10:36:17-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 10:36:17-0500-INFO - prefect.FlowRunner]-[flow_runner.py:run]-Line 245: Beginning Flow run for 'Items with No List Price Pull'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Pull Oracle Data': Starting task run...
[2021-12-09 10:36:17-0500-WARNING - prefect.TaskRunner]-[task_runner.py:check_task_is_cached]-Line 798: Task 'Pull Oracle Data': Can't use cache because it is now invalid
[2021-12-09 10:36:17-0500-ERROR - prefect.TaskRunner]-[task_runner.py:get_task_run_state]-Line 906: Task 'Pull Oracle Data': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 483, in run_task_with_timeout
    return run_with_multiprocess_timeout(
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 400, in run_with_multiprocess_timeout
    run_process.start()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/usr/lib/python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "/usr/lib/python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Pull Oracle Data': Finished task run for task with final state: 'Failed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Set Data Types': Starting task run...
[2021-12-09 10:36:17-0500-WARNING - prefect.TaskRunner]-[task_runner.py:check_task_is_cached]-Line 798: Task 'Set Data Types': Can't use cache because it is now invalid
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Upload to table in Analytics DB': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Flow Data Artifacts': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Pull Summary Data': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Delete Existing History Data': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 241: Task 'Upload to history table in Analytics DB': Starting task run...
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.TaskRunner]-[task_runner.py:run]-Line 335: Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
[2021-12-09 10:36:17-0500-INFO - prefect.FlowRunner]-[flow_runner.py:determine_final_state]-Line 705: Flow run FAILED: some reference tasks failed.
[2021-12-09 10:36:17-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 14.88 seconds to complete.
z
You’ll need to wrap
flow.run()
with
if __name__ == '__main__':
as that error describes
It looks like you’re not getting debug logs anymore there
I did push another commit last night, you can see the lastest commit on the PR is
d3f3162
t
Yeah, sorry, I clearly need my coffee, I was just checking what was printed to screen; I need to go pull the logs. Forgot where they were written.
Hi Michael, I hate to have to ask such a noob question, but where are the logs written by default? I've been all through the documentation and can't find it.
z
They should just be printing to stdout when using
flow.run()
You can use
prefect run -p path-to-flow.py --log-level debug
if you’d like
t
Copy code
Retrieving local flow...[2021-12-09 12:22:10-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:22:10-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
 Done
Running flow locally...
└── 12:22:10 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:22:10 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:22:10 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:22:10 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:22:10 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:22:10 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:22:10 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:22:10 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:22:10 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:22:10 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
    request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:22:11 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:22:12 | DEBUG   | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:22:12 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:22:12 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:22:12 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:22:12 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:22:12 | DEBUG   | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:22:12 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:22:12 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:22:12 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:22:12 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:22:12 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:22:12 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:22:12 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:22:12 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:22:12 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:22:12 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:22:12 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
That did it
z
A real error! 😄
Here, the subprocess doesn’t have the dependency (which is pretty weird). Are you importing it within the task or something?
t
Within the task? No. I do have a more complex import situation though. My flow actually imports a bunch of tasks from another file (since a lot of the tasks are reused between flows), and that file imports the oracle_pull library. Maybe something in that setup is the issue?
I'd thought, based on the documentation, that as long as I was doing a LocalRun this would be okay; then all the files are in their appropriate local place, and I have a flag in there to set the run folder to the current file's folder.
And, this works fine in other flows
z
Is
oracle_pull
your own module?
t
Yes
That's the module I shared yesterday
z
You might need to ensure it’s on the PYTHONPATH
It’s feasible that the CWD is not being passed through when there’s a subprocess (not sure why that would be though).
Can you try setting
processes=True
on your
LocalDaskExecutor
?
t
Sent you a ping with the oracle_pull code above (couldn't find a way to point to it here). Trying with
processes=True
gives:
Copy code
Retrieving local flow...[2021-12-09 12:29:37-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:29:37-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
 Done
Running flow locally...
└── 12:29:37 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:29:37 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:29:37 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:29:38 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:29:38 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:29:38 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:29:38 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:29:38 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:29:38 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:29:38 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
    request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:29:38 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:29:39 | DEBUG   | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:29:39 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:29:39 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:29:39 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:29:39 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:29:39 | DEBUG   | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:29:39 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:29:39 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:29:39 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:29:39 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:29:39 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:29:39 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:29:39 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:29:39 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:29:39 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:29:39 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:29:39 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
z
Hm, I expected that to use a different timeout handler. What happens if you don’t specify an executor at all?
t
In my flow I get
file_path = Path(__file__).resolve().parent
and then pass that path in as the
working_dir
in LocalRun, so it should import fine
With no executor specified:
Copy code
Retrieving local flow...[2021-12-09 12:31:40-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:31:40-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
 Done
Running flow locally...
└── 12:31:40 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:31:40 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:31:40 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:31:40 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:31:40 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:31:40 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:31:40 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:31:40 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:31:40 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:31:40 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
    request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:31:40 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:31:41 | DEBUG   | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:31:41 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:31:41 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:31:41 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:31:41 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:31:41 | DEBUG   | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:31:41 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:31:41 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:31:41 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:31:41 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:31:41 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:31:41 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:31:41 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:31:41 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:31:41 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:31:41 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
z
└── 12:31:40 | DEBUG   | Using executor type LocalDaskExecutor
Are your changes taking effect?
t
I'm wondering that too; something funky seems to be going on. This error seems to occur before that module is imported, but in the previous version the module actually began executing and returned a query result. I'm gonna delete the flow via the UI and try again; wondering if I'm actually triggering the saved one somehow
Okay weird, I had commented out the git code, but if I turn it back on and submit my changed it seems to work. First, trying it with no executor succeeds! Second, trying it with
processes=True
fails with this error:
Copy code
Retrieving local flow...[2021-12-09 12:38:12-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:38:12-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:38:12-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 12:38:12 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:38:12 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:38:12 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:38:12 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:38:12 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:38:12 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:38:12 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:38:12 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:38:12 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:38:12 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
    request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:38:13 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:38:14 | DEBUG   | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:38:14 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:38:14 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:38:14 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:38:14 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:38:14 | DEBUG   | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:38:14 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:38:14 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:38:14 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:38:14 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:38:14 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:38:14 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:38:14 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:38:14 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:38:14 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:38:14 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Not having that processes flag in there also seems to return this error.
So I guess, somehow, the DaskExecutor is not getting the module to import
z
What about
processes=False
?
Just to cover our bases here 🙂
t
Processes false:
Copy code
Retrieving local flow...[2021-12-09 12:40:58-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:40:58-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:40:58-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 12:40:58 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:40:58 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:40:58 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:40:58 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:40:58 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:40:58 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:40:58 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:40:58 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:40:58 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:40:58 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 287, in multiprocessing_safe_run_and_retrieve
    request = cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'oracle_pull'
└── 12:40:59 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:41:00 | DEBUG   | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:41:00 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:41:00 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:41:00 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:41:00 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:41:00 | DEBUG   | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:41:00 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:41:00 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:41:00 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:41:00 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:41:00 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:41:00 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:41:00 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:41:00 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:41:00 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:41:00 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
This is strange though, in other cases, with the Dask executor, it works.
I feel like this is still something different in my setup.
z
LocalRun
isn’t used when you’re running something locally
(lol)
It’s a configuration for API based runs that the agent handles. So perhaps that’s it.
t
I know, but the point of LocalRun is really to have all the files there at runtime to be imported, right? That's how I thought it was working.
z
Yeah but you’re not using
LocalRun
here you’re using
flow.run()
instead
That’s probably why you’re not getting this same error via Cloud runs
t
Ooooh, that's the difference. 😕 So is there a way for me to get the debug out from the LocalRun?
z
If you do
PYTHONPATH=… prefect run …
and set the PYTHONPATH to include your module directory we should get past this error
t
THERE we go. Log:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 12:46:39-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:46:39-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:46:39-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 12:46:39 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:46:39 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:46:39 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:46:39 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:46:39 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:46:39 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:46:39 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:46:39 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:46:39 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:46:39 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 100s timeout...
[2021-12-09 12:46:40-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 19.88 seconds to complete.
└── 12:48:19 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
Is that helpful at all? It seems to just stop there
z
😄 haha still none of these new logs I added
So perplexing..
t
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect version
0.15.10+13.gd3f3162eb
z
And if you return
None
from your task?
And what happens if you adjust the timeout to 1 second?
t
Interesting... 1-second timeout:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 12:53:45-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:53:45-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:53:45-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 12:53:45 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:53:45 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:53:45 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:53:46 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:53:46 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:53:46 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:53:46 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:53:46 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:53:46 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:53:46 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 1s timeout...
[2021-12-09 12:53:46-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
└── 12:53:47 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:53:48 | DEBUG   | Task 'Pull Oracle Data': No result returned within the timeout period!
└── 12:53:48 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to TimedOut
└── 12:53:48 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'TimedOut'
└── 12:53:48 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:53:48 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:53:48 | DEBUG   | Task 'Set Data Types': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:53:48 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:53:48 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:53:48 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:53:48 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:53:48 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:53:48 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:53:48 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:53:48 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:53:48 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:53:48 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:53:48 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
z
So basically that demonstrates to me that the timeout is behaving, something is going wrong with returning the data
t
Return None with 50 second timeout:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 12:55:47-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 12:55:47-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 12:55:47-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 12:55:47 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 12:55:47 | DEBUG   | Using executor type LocalDaskExecutor
└── 12:55:47 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 12:55:47 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 12:55:47 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 12:55:47 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 12:55:47 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 12:55:47 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 12:55:47 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 12:55:47 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 12:55:48-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 16.65 seconds to complete.
└── 12:56:06 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 12:56:06 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to Success
└── 12:56:06 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Success to Cached
└── 12:56:06 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'Cached'
└── 12:56:06 | INFO    | Task 'Set Data Types': Starting task run...
└── 12:56:06 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 12:56:06 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to Running
└── 12:56:06 | DEBUG   | Task 'Set Data Types': Calling task.run() method...
└── 12:56:06 | INFO    | Setting Formats...
└── 12:56:06 | ERROR   | Task 'Set Data Types': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 458, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "f_items_no_list_price_pull.py", line 28, in set_data_types
    pre_drop_row_count = len(df.index)
AttributeError: 'NoneType' object has no attribute 'index'
└── 12:56:06 | DEBUG   | Task 'Set Data Types': Handling state change from Running to Failed
└── 12:56:06 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'Failed'
└── 12:56:06 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 12:56:06 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 12:56:06 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:56:06 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 12:56:06 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:56:06 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 12:56:07 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 12:56:07 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 12:56:07 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 12:56:07 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 12:56:07 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 12:56:07 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 12:56:07 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 12:56:07 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 12:56:07 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 12:56:07 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 12:56:07 | INFO    | Flow run FAILED: some reference tasks failed.
└── 12:56:07 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
Agreed. Also, the more I've thought about it, the more that I think the query change means some illegal character was returned into the dataframe
Not illegal from pandas obviously, as no error is thrown there, but illegal for something else. The serialization maybe?
z
So what’s interesting about that last one is we have none of the logs about returning the data from the subprocess
t
Well, that returned none. What logs would we get?
z
Copy code
try:
        logger.debug(f"{name}: Pickling value of size {sys.getsizeof(return_val)}...")
        pickled_val = cloudpickle.dumps(return_val)
        logger.debug(f"{name}: Pickling successful!")
    except Exception as exc:
        err_msg = (
            f"Failed to pickle result of type {type(return_val).__name__!r} with "
            f'exception: "{type(exc).__name__}: {str(exc)}". This timeout handler "'
            "requires your function return value to be serializable with `cloudpickle`."
        )
        logger.error(f"{name}: {err_msg}")
        pickled_val = cloudpickle.dumps(RuntimeError(err_msg))

    logger.debug(f"{name}: Passing result back to main process...")
t
Here's the function I used for that:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=20),timeout=(50),log_stdout=True) #(3*60*60)) # ,max_retries=3,retry_delay=timedelta(minutes=20)
def pull_oracle_data_via(oracle_query_sql, prod=True) -> DataFrame:
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(prod=prod) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    return None
z
We should have these pickling logs then “passing result”
t
Oh. Hmm, even on a None object?
What if I return some dummy DB instead
z
Yeah even with
None
I think the logs aren’t flushing.. let me investigate that
There’s a new commit that should flush the logs (I hope 🙂
t
Do I need to restart the server to use this or is it sufficient to just restart the agent?
I'll just do both to be sure
z
Since you’re using
flow.run()
you shouldn’t need either
Just a new pip install
t
Oh; well, good to know for next time. Rerunning now...
Normal log if I return something:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 13:03:32-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:03:32-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:03:32-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 13:03:32 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 13:03:32 | DEBUG   | Using executor type LocalDaskExecutor
└── 13:03:32 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:03:32 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 13:03:32 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:03:32 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:03:32 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:03:32 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:03:32 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:03:32 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:03:33-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 17.55 seconds to complete.
└── 13:04:22 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
Retrying with None return next
None return:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 13:05:07-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:05:07-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:05:07-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 13:05:07 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 13:05:07 | DEBUG   | Using executor type LocalDaskExecutor
└── 13:05:07 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:05:07 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 13:05:07 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:05:07 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:05:07 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:05:07 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:05:07 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:05:07 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:05:07-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 15.5 seconds to complete.
└── 13:05:24 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
└── 13:05:24 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to Success
└── 13:05:24 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Success to Cached
└── 13:05:24 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'Cached'
└── 13:05:24 | INFO    | Task 'Set Data Types': Starting task run...
└── 13:05:24 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 13:05:24 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to Running
└── 13:05:24 | DEBUG   | Task 'Set Data Types': Calling task.run() method...
└── 13:05:24 | INFO    | Setting Formats...
└── 13:05:24 | ERROR   | Task 'Set Data Types': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/h452338/prefect_env/lib/python3.8/site-packages/prefect/utilities/executors.py", line 461, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "f_items_no_list_price_pull.py", line 28, in set_data_types
    pre_drop_row_count = len(df.index)
AttributeError: 'NoneType' object has no attribute 'index'
└── 13:05:24 | DEBUG   | Task 'Set Data Types': Handling state change from Running to Failed
└── 13:05:24 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'Failed'
└── 13:05:25 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 13:05:25 | DEBUG   | Task 'Flow Data Artifacts': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 13:05:25 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to TriggerFailed
└── 13:05:25 | DEBUG   | Task 'Upload to table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 13:05:25 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 13:05:25 | DEBUG   | Task 'Pull Summary Data': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to TriggerFailed
└── 13:05:25 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 13:05:25 | DEBUG   | Task 'Delete Existing History Data': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 13:05:25 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to TriggerFailed
└── 13:05:25 | DEBUG   | Task 'Upload to history table in Analytics DB': TRIGGERFAIL signal raised during execution.
└── 13:05:25 | INFO    | Task 'Delete Existing History Data': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to TriggerFailed
└── 13:05:25 | INFO    | Task 'Upload to history table in Analytics DB': Finished task run for task with final state: 'TriggerFailed'
└── 13:05:25 | INFO    | Flow run FAILED: some reference tasks failed.
└── 13:05:25 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Running to Failed
Flow run failed!
What if I take your pickle code above and put it in my pull function at the end?
I'm getting back the logger results from that code; the Initiating Oracle Pull line shows there
z
You can try pickling the result beforehand, yeah.
We’re still not getting the value return logs.. weird.
t
The new function I'm now trying:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=20),timeout=(50),log_stdout=True) #(3*60*60)) # ,max_retries=3,retry_delay=timedelta(minutes=20)
def pull_oracle_data_via(oracle_query_sql, prod=True) -> DataFrame:
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(prod=prod) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)

    try:
        logger.debug(f"{df}: Pickling value of size {sys.getsizeof(df)}...")
        pickled_val = cloudpickle.dumps(df)
        logger.debug(f"{df}: Pickling successful!")
    except Exception as exc:
        err_msg = (
            f"Failed to pickle result of type {type(df).__name__!r} with "
            f'exception: "{type(exc).__name__}: {str(exc)}". This timeout handler "'
            "requires your function return value to be serializable with `cloudpickle`."
        )
        logger.error(f"{df}: {err_msg}")
        pickled_val = cloudpickle.dumps(RuntimeError(err_msg))

    logger.debug(f"{df}: Passing result back to main process...")
    
    return df
z
You’ll want to return
pickeld_val
at the end but 🤷 we should still see some info here
t
Yeah, I thought about that, but I figure I'm just trying to recreate the error here, and the function says it passes out a dataframe so I'd keep passing the original one out
Weird; nothing more.
Copy code
prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 13:08:29-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:08:29-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:08:29-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 13:08:29 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 13:08:29 | DEBUG   | Using executor type LocalDaskExecutor
└── 13:08:29 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:08:29 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 13:08:29 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:08:29 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:08:29 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:08:29 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:08:29 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:08:29 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:08:29-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 20.0 seconds to complete.
└── 13:09:19 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
Okay, I stripped the function down to remove the try block:
Copy code
@task(name='Pull Oracle Data',cache_for=timedelta(hours=20),timeout=(50),log_stdout=True) #(3*60*60)) # ,max_retries=3,retry_delay=timedelta(minutes=20)
def pull_oracle_data_via(oracle_query_sql, prod=True) -> DataFrame:
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(prod=prod) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)


    logger.debug(f"{df}: Pickling value of size {sys.getsizeof(df)}...")
    pickled_val = cloudpickle.dumps(df)
    logger.debug(f"{df}: Pickling successful!")

    logger.debug(f"{df}: Passing result back to main process...")
    
    return df
That gives this:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 13:11:09-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:11:09-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:11:09-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 13:11:09 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 13:11:09 | DEBUG   | Using executor type LocalDaskExecutor
└── 13:11:09 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:11:09 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 13:11:09 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:11:09 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:11:09 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:11:09 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:11:09 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:11:09 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:11:09-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 18.71 seconds to complete.
└── 13:11:59 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
Seems like maybe the cloudpickle dump is just never returning? No error, but no completion either?
z
I can reproduce the lack of extra logs locally, so I’ll poke around at that and get back to you
It does seem like a cloudpickle issue though
Can you cloudpickle the same dataframe outside of prefect?
t
Was just saving it to try that
It doesn't throw an error when I dump it
If I import the library manually and pull via the same sql query and then test the pickling line, and preceding logging sys call, I get this:
Copy code
>>> sys.getsizeof(df)
4507188
>>> test=cloudpickle.dumps(df)
>>>
It seems to work fine there
z
Ah the log level doesn’t propagate when using the CLI
Can you do
Copy code
PREFECT__LOGGING__LEVEL=DEBUG prefect run -p ...
The temporary override we use for the CLI isn’t magical enough for a subprocess
I’m testing with
Copy code
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor

import pandas as pd
import numpy as np


@task(timeout=10)
def return_df():
    return pd.DataFrame(np.random.choice(["foo", "bar", "baz"], size=(100000, 100)))


with Flow("example", executor=LocalDaskExecutor()) as flow:
    return_df()
And seeing no issues
t
If I do that it prints out the dataframe used; I replaced that with DATAFRAME HERE in the log; otherwise, this is what you get:
Copy code
(prefect_env) h452338@PrefectETL-VM:~/Prefect_ETL$ PREFECT__LOGGING__LEVEL=DEBUG prefect run -p f_items_no_list_price_pull.py  --log-level debug
Retrieving local flow...[2021-12-09 13:34:14-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 67: Items with No List Price Pull Flow initiated, running in /home/h452338/Prefect_ETL
[2021-12-09 13:34:14-0500-INFO - prefect]-[f_items_no_list_price_pull.py:<module>]-Line 73: Initiating history upload process.
[2021-12-09 13:34:14-0500-WARNING - prefect.Git]-[git.py:__init__]-Line 107: Git storage initialized with a `git_clone_url_secret_name`. The value of this Secret will be used to clone the repository, ignoring `repo`, `repo_host`, `git_token_secret_name`,  `git_token_username`, `use_ssh`, and `format_access_token`.
 Done
Running flow locally...
└── 13:34:14 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 13:34:14 | DEBUG   | Using executor type LocalDaskExecutor
└── 13:34:14 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:34:14 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 13:34:14 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:34:14 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:34:14 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:34:14 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:34:14 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:34:14 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:34:15-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Pull Oracle Data': Executing...
[2021-12-09 13:34:15-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 14.23 seconds to complete.
[2021-12-09 13:34:30-0500-DEBUG - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 27:     DATAFRAME HERE
[3317 rows x 36 columns]: Passing result back to main process...
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Pull Oracle Data': Execution successful.
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Execution successful.
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Pull Oracle Data': Pickling value of size 4475246...
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling value of size 4475246...
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Pull Oracle Data': Pickling successful!
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling successful!
[2021-12-09 13:34:31-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Pull Oracle Data': Passing result back to main process...
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Passing result back to main process...
└── 13:35:04 | DEBUG   | Task 'Pull Oracle Data': Execution process closed, collecting result...
So, seems like not a pickling issue
Somehow it gets passed back and then....nothing?
z
Well, at least we’ve crossed some things off here
👍 1
t
So...what do we try next? If it's pickling correctly, is the handoff the issue somewhere in a way we could look at?
And at this point, do you have a sense of the timeframe at all for us to find the issue? I'd planned to use today setting up some new flows for a real production test, and with this issue I'm wondering if it will go on long enough that I should create my own manual flows instead. 😞
z
I’ve pushed another change that inserts a new log
t
Pulled; rerunning
z
It’s not really making any sense to me, so I’m not sure what the timeline will be.
t
Copy code
Running flow locally...
└── 13:46:27 | INFO    | Beginning Flow run for 'Items with No List Price Pull'
└── 13:46:27 | DEBUG   | Using executor type LocalDaskExecutor
└── 13:46:27 | DEBUG   | Flow 'Items with No List Price Pull': Handling state change from Scheduled to Running
└── 13:46:27 | INFO    | Task 'Pull Oracle Data': Starting task run...
└── 13:46:27 | WARNING | Task 'Pull Oracle Data': Can't use cache because it is now invalid
└── 13:46:27 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Pending to Running
└── 13:46:27 | DEBUG   | Task 'Pull Oracle Data': Calling task.run() method...
└── 13:46:27 | DEBUG   | Task 'Pull Oracle Data': Attaching process based timeout handler...
└── 13:46:27 | DEBUG   | Task 'Pull Oracle Data': Sending execution to a new process...
└── 13:46:27 | DEBUG   | Task 'Pull Oracle Data': Waiting for process to return with 50s timeout...
[2021-12-09 13:46:28-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Pull Oracle Data': Executing...
[2021-12-09 13:46:28-0500-INFO - prefect]-[prefect_pull_framework.py:pull_oracle_data_via]-Line 21: Initiating Oracle Pull using sql/items_no_list_price.sql...
Successfully connected to Oracle Database. Prod: True
Successfully connected to Oracle Database. Prod: False
Initiating query...
Oracle query took 0.0 minutes and 14.73 seconds to complete.
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Pull Oracle Data': Execution successful.
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Execution successful.
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Pull Oracle Data': Pickling value of size 4475246...
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling value of size 4475246...
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Pull Oracle Data': Pickling successful!
DEBUG:prefect.TaskRunner:Task 'Pull Oracle Data': Pickling successful!
[2021-12-09 13:46:45-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Pull Oracle Data': Passing result back to main process...
INFO:prefect.TaskRunner:Task 'Pull Oracle Data': Passing result back to main process...
└── 13:46:45 | DEBUG   | Task 'Pull Oracle Data': Result received from subprocess, unpickling...
└── 13:46:45 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Running to Success
└── 13:46:45 | DEBUG   | Task 'Pull Oracle Data': Handling state change from Success to Cached
└── 13:46:45 | INFO    | Task 'Pull Oracle Data': Finished task run for task with final state: 'Cached'
└── 13:46:45 | INFO    | Task 'Set Data Types': Starting task run...
└── 13:46:45 | WARNING | Task 'Set Data Types': Can't use cache because it is now invalid
└── 13:46:45 | DEBUG   | Task 'Set Data Types': Handling state change from Pending to Running
└── 13:46:45 | DEBUG   | Task 'Set Data Types': Calling task.run() method...
└── 13:46:45 | INFO    | Setting Formats...
└── 13:46:45 | DEBUG   | Task 'Set Data Types': Handling state change from Running to Success
└── 13:46:45 | DEBUG   | Task 'Set Data Types': Handling state change from Success to Cached
└── 13:46:45 | INFO    | Task 'Set Data Types': Finished task run for task with final state: 'Cached'
└── 13:46:45 | INFO    | Task 'Flow Data Artifacts': Starting task run...
└── 13:46:45 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Pending to Running
└── 13:46:45 | DEBUG   | Task 'Flow Data Artifacts': Calling task.run() method...
└── 13:46:45 | DEBUG   | Task 'Flow Data Artifacts': Attaching process based timeout handler...
└── 13:46:45 | INFO    | Task 'Upload to table in Analytics DB': Starting task run...
└── 13:46:45 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Pending to Running
└── 13:46:45 | DEBUG   | Task 'Upload to table in Analytics DB': Calling task.run() method...
└── 13:46:45 | DEBUG   | Task 'Upload to table in Analytics DB': Attaching process based timeout handler...
└── 13:46:45 | DEBUG   | Task 'Flow Data Artifacts': Sending execution to a new process...
└── 13:46:45 | DEBUG   | Task 'Upload to table in Analytics DB': Sending execution to a new process...
└── 13:46:45 | DEBUG   | Task 'Flow Data Artifacts': Waiting for process to return with 600s timeout...
└── 13:46:45 | DEBUG   | Task 'Upload to table in Analytics DB': Waiting for process to return with 14400s timeout...
[2021-12-09 13:46:45-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Flow Data Artifacts': Executing...
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Flow Data Artifacts': Execution successful.
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Flow Data Artifacts': Pickling value of size 16...
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Flow Data Artifacts': Pickling successful!
[2021-12-09 13:46:46-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Flow Data Artifacts': Passing result back to main process...
└── 13:46:46 | DEBUG   | Task 'Flow Data Artifacts': Result received from subprocess, unpickling...
[2021-12-09 13:46:46-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Upload to table in Analytics DB': Executing...
[2021-12-09 13:46:46-0500-INFO - prefect]-[prefect_pull_framework.py:upload_to_table]-Line 56: Uploading to table Items_Missing_List_Price in SQL Server DB...
└── 13:46:46 | DEBUG   | Task 'Flow Data Artifacts': Handling state change from Running to Success
└── 13:46:46 | INFO    | Task 'Flow Data Artifacts': Finished task run for task with final state: 'Success'
Replacing line feed with carriage return in ERROR_MESSAGE column to avoid SQL errors.
[2021-12-09 13:46:48-0500-INFO - prefect]-[prefect_pull_framework.py:upload_to_table]-Line 59: Upload Complete!
[2021-12-09 13:46:48-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Upload to table in Analytics DB': Execution successful.
[2021-12-09 13:46:48-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Upload to table in Analytics DB': Pickling value of size 16...
[2021-12-09 13:46:48-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Upload to table in Analytics DB': Pickling successful!
[2021-12-09 13:46:48-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Upload to table in Analytics DB': Passing result back to main process...
└── 13:46:48 | DEBUG   | Task 'Upload to table in Analytics DB': Result received from subprocess, unpickling...
└── 13:46:48 | DEBUG   | Task 'Upload to table in Analytics DB': Handling state change from Running to Success
└── 13:46:48 | INFO    | Task 'Upload to table in Analytics DB': Finished task run for task with final state: 'Success'
└── 13:46:48 | INFO    | Task 'Pull Summary Data': Starting task run...
└── 13:46:48 | DEBUG   | Task 'Pull Summary Data': Handling state change from Pending to Running
└── 13:46:48 | DEBUG   | Task 'Pull Summary Data': Calling task.run() method...
└── 13:46:48 | DEBUG   | Task 'Pull Summary Data': Attaching process based timeout handler...
└── 13:46:48 | DEBUG   | Task 'Pull Summary Data': Sending execution to a new process...
└── 13:46:48 | DEBUG   | Task 'Pull Summary Data': Waiting for process to return with 1800s timeout...
[2021-12-09 13:46:49-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Pull Summary Data': Executing...
[2021-12-09 13:46:49-0500-INFO - prefect]-[prefect_pull_framework.py:pull_summary_data_via]-Line 63: Initiating Sql Server Pull via SELECT * FROM v_Items_Missing_List_Price_Summary...
[2021-12-09 13:46:51-0500-INFO - prefect]-[prefect_pull_framework.py:pull_summary_data_via]-Line 68: Rows returned in summary data query: 1651...
[2021-12-09 13:46:51-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 300: Task 'Pull Summary Data': Execution successful.
[2021-12-09 13:46:51-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 318: Task 'Pull Summary Data': Pickling value of size 908945...
[2021-12-09 13:46:51-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 320: Task 'Pull Summary Data': Pickling successful!
[2021-12-09 13:46:51-0500-INFO - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 330: Task 'Pull Summary Data': Passing result back to main process...
└── 13:46:51 | DEBUG   | Task 'Pull Summary Data': Result received from subprocess, unpickling...
└── 13:46:51 | DEBUG   | Task 'Pull Summary Data': Handling state change from Running to Success
└── 13:46:51 | INFO    | Task 'Pull Summary Data': Finished task run for task with final state: 'Success'
└── 13:46:51 | INFO    | Task 'Delete Existing History Data': Starting task run...
└── 13:46:51 | INFO    | Task 'Upload to history table in Analytics DB': Starting task run...
└── 13:46:51 | DEBUG   | Task 'Delete Existing History Data': Handling state change from Pending to Running
└── 13:46:51 | DEBUG   | Task 'Upload to history table in Analytics DB': Handling state change from Pending to Running
└── 13:46:51 | DEBUG   | Task 'Delete Existing History Data': Calling task.run() method...
└── 13:46:51 | DEBUG   | Task 'Upload to history table in Analytics DB': Calling task.run() method...
└── 13:46:51 | DEBUG   | Task 'Delete Existing History Data': Attaching process based timeout handler...
└── 13:46:51 | DEBUG   | Task 'Upload to history table in Analytics DB': Attaching process based timeout handler...
└── 13:46:51 | DEBUG   | Task 'Upload to history table in Analytics DB': Sending execution to a new process...
└── 13:46:51 | DEBUG   | Task 'Delete Existing History Data': Sending execution to a new process...
└── 13:46:51 | DEBUG   | Task 'Upload to history table in Analytics DB': Waiting for process to return with 7200s timeout...
└── 13:46:51 | DEBUG   | Task 'Delete Existing History Data': Waiting for process to return with 1800s timeout...
[2021-12-09 13:46:52-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Upload to history table in Analytics DB': Executing...
[2021-12-09 13:46:52-0500-INFO - prefect]-[prefect_pull_framework.py:upload_to_history_table]-Line 92: Uploading to table Items_Missing_List_Price_History in SQL Server DB...
[2021-12-09 13:46:52-0500-DEBUG - prefect.TaskRunner]-[executors.py:multiprocessing_safe_run_and_retrieve]-Line 298: Task 'Delete Existing History Data': Executing...
[2021-12-09 13:46:52-0500-INFO - prefect]-[prefect_pull_framework.py:delete_today_from_history_if_exists]-Line 75: Check if history summary already uploaded for today in Items_Missing_List_Price_History...
[2021-12-09 13:46:52-0500-INFO - prefect]-[prefect_pull_framework.py:delete_today_from_history_if_exists]-Line 85: Data found: Deleting one day of data from Items_Missing_List_Price_History before replacement...
Flow run succeeded! I had to cut off the last part of the log as it got too long but it worked! What did you change?
Clearly I should have asked about the timeline earlier! 😄
Is this a fluke? Or was there a change made that might make a difference?
I'm reregistering and testing in the UI too
z
I changed how results are retrieved too
Seemed like an irrelevant change… but 🤷
t
Working in the UI now too
z
Sweet. I’ll get that PR ready for a release then
What python version are you on btw?
t
3.8.10
z
👍thanks
Oh funny I committed a sleep in there
Surprised that didn’t mess anything up
t
haha, should I pull again to remove the sleep? 🙂
maybe I'll get a few seconds speed back
z
Naw haha the sleep actually doesn’t matter because we terminate the process once the queue is populated with a result
t
So....where does this leave me longer-term? For the moment I take it I should pull this branch for all my agents and use it; will it get folded into the next release? Or more testing/verification needed?
z
I did just push a fix
It’ll be in the next release
You can use this brach until then
t
amazing. When is that set to go live?
z
I’ll probably cut a release next tuesday
t
Fantastic
Well, I'm kicking off a BUNCH of new runs, and can dive in to the development of more, thank you so much for your help here! This was a lot of effort and I really appreciate the attention; I though I was going to have to give up on Prefect with this; very relieved!
@Zanie, I upgraded recently to the 0.15.11 release, but I now seem to again be seeing the behavior where timeouts fail in the local run. Took me a while to realize that was going on again; was the fix implemented above put into this release as planned?
z
It is included in 0.15.11
t
Hmm, okay, I'll watch and log more carefully then to verify that behavior is still persisting. If it is I'll plan to follow up again, but not sure enough yet that it's not a me error to open a ticket, so to speak.
z
Okay! Just let me know