hello guys. I need help here. so, i have a simple ...
# ask-community
t
hello guys. I need help here. so, i have a simple flow that do extract and load data. in the extract step i get a result from cx_oracle basiclly ~70k row and 77 columns. the result is passed to load task (again using cx_oracle). i think the process inside task is success without problem (in prefect and i try a basic python function call on notebook). but the problem is in prefect there is a gap between the end of extract task to the start of load task (will provide an image), the gap time is relativelly big, sometimes even longer than the extract task process. is there some problem in passing data between task or what? the flow running on kubernetes-job and i use on premise prefect server and agent (all in same cluster)
I dont know is this can help, but i ran the agent in debug mode, and i get an exception from APILogWorkerThread. This is snippet from the log :
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.8/queue.py", line 178, in get
    raise Empty
_queue.Empty
21:19:51.487 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - <AsyncCancelScope, name='get_task_call_return_value' RUNNING, runtime=0.00> entered
21:20:26.934 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fa4053d2a90> processing batch of size 401
21:20:26.959 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9999786205589771) in thread 'APILogWorkerThread'
21:20:26.959 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' RUNNING, runtime=0.00> entered
21:20:26.976 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fa4053d2a90> enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 'load_oracle-0' for task 'load_oracle'", 'timestamp': '2023-06-28T21:20:26.976475+00:00', 'flow_run_id': '8b0da7c4-e157-4e41-a2bd-b87fbb6f96de', 'task_run_id': None, '__payload_size__': 237}
21:20:26.976 | INFO    | Flow run 'skilled-galago' - Created task run 'load_oracle-0' for task 'load_oracle'
21:20:26.977 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' COMPLETED, runtime=0.02> exited
21:20:26.978 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fa4053d2a90> enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 'load_oracle-0' immediately...", 'timestamp': '2023-06-28T21:20:26.977770+00:00', 'flow_run_id': '8b0da7c4-e157-4e41-a2bd-b87fbb6f96de', 'task_run_id': None, '__payload_size__': 222}
21:20:26.977 | INFO    | Flow run 'skilled-galago' - Executing 'load_oracle-0' immediately...
21:20:26.978 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Finished call get(timeout=1.9999786205589771)
21:20:26.980 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fa4053d2a90> added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 'load_oracle-0' for task 'load_oracle'", 'timestamp': '2023-06-28T21:20:26.976475+00:00', 'flow_run_id': '8b0da7c4-e157-4e41-a2bd-b87fbb6f96de', 'task_run_id': None} to batch (size 237/3000000)
21:20:26.980 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9786812588572502) in thread 'APILogWorkerThread'
21:20:26.981 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' RUNNING, runtime=0.00> entered
21:20:26.981 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' COMPLETED, runtime=0.00> exited
21:20:26.981 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Finished call get(timeout=1.9786812588572502)
21:20:26.981 | DEBUG   | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.logging.handlers.APILogWorker object at 0x7fa4053d2a90> added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 'load_oracle-0' immediately...", 'timestamp': '2023-06-28T21:20:26.977770+00:00', 'flow_run_id': '8b0da7c4-e157-4e41-a2bd-b87fbb6f96de', 'task_run_id': None} to batch (size 459/3000000)
21:20:26.981 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9773342795670033) in thread 'APILogWorkerThread'
21:20:26.981 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' RUNNING, runtime=0.00> entered
21:20:28.964 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' COMPLETED, runtime=1.98> exited
21:20:28.965 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Encountered exception in call get(timeout=1.9773342795670033)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.8/queue.py", line 178, in get
    raise Empty
_queue.Empty
i tried smaller data to pass between the task, and the gap is smaller. but i dont think the original data is too big. like just 0.5 mb (?) and to think that the process is running in same pod, i think the gap is too big.
n
hey @Thomas Agung Santoso can you write the data to disk from the extract task and pass the reference around as task parameters?
t
hey nate thankyou for the respond, before i do that, i might got a solution (i think i get this from some issue in github when browsing for solution), i pass the data as pandas dataframe. In the original flow, the data is in the form of a list of tuples (this is the original form i got from cx_oracle cursor). is there any ideas why the dataframe is not have the same problem?
n
hmm, no I'm not exactly sure why that would make a difference. if it works for you great! I just know that at some point, extremely large task run inputs can slow things down and it may become useful to pass data by reference
t
okay. i will remember that in the future. Maybe just for the record, i just check the dataframe memory size, and its really huge compared to the list of tuples (160mb compared to 0.5mb). And for my flow, i change the extract to pandas .from_sql and then pass the data as dataframe, and in the load task change that data into list of tuples (this because i cannot find a way to executemany in pandas using cx_oracle 😅). thankyou!
n
sure! 👍