Andor Tóth
04/06/2021, 4:01 PMSQL_DIR = Path('sql')
@task
def list_query_names():
return [f.name for f in SQL_DIR.glob('*.sql')]
@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):
sql = Path(SQL_DIR / name).read_text()
print('Query name: %s' % name)
engine = sqla.create_engine(DSN)
rs = engine.execute(sql)
return dict(keys=rs.keys(), rows=rs.fetchall())
@task
def save_results(rs, name):
with (OUT_DIR / name).with_suffix('.txt').open('w') as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerow(rs['keys'])
csv_writer.writerows(rs['rows'])
with Flow("Queries") as flow:
query_names = list_query_names()
results = exec_query.map(query_names)
save_results.map(results, query_names)
flow.executor = LocalDaskExecutor(num_workers=2, schedule='processes')
flow.run()
Andor Tóth
04/06/2021, 4:05 PMfrom prefect import Flow, task
from prefect.executors import LocalDaskExecutor
import csv
import sqlalchemy as sqla
from pathlib import Path
Andor Tóth
04/06/2021, 4:06 PMDylan
Dylan
Dylan
Andor Tóth
04/06/2021, 4:56 PMpython -i queries.py
, but the result is the same if it is run by an agentAndor Tóth
04/06/2021, 4:56 PMprefect 3964559 0.3 1.2 585352 74788 pts/13 Sl+ 17:21 0:02 | \_ python -i queries.py
prefect 3964730 0.0 0.1 53468 11980 pts/13 S+ 17:21 0:00 | \_ /srv/prefect/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(8)
prefect 3964732 0.2 0.0 0 0 pts/13 Z+ 17:21 0:02 | \_ [python] <defunct>
andor.t+ 3936805 0.0 0.0 24112 3796 pts/15 Ss 17:14 0:00 \_ -bash
Andor Tóth
04/06/2021, 4:56 PMAndor Tóth
04/06/2021, 4:56 PMAndor Tóth
04/06/2021, 4:57 PMprefect diagnostics
follows:
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-4.18.0-240.10.1.el8_3.x86_64-x86_64-with-centos-8",
"prefect_backend": "server",
"prefect_version": "0.14.15",
"python_version": "3.6.8"
}
}
Andor Tóth
04/06/2021, 4:59 PMAndor Tóth
04/06/2021, 4:59 PMAndor Tóth
04/06/2021, 4:59 PMDylan
Dylan
Andor Tóth
04/06/2021, 5:15 PM[2021-04-06 19:13:59+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Queries'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query': Finished task run for task with final state: 'Mapped'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'save_results': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'save_results': Finished task run for task with final state: 'Mapped'
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Starting task run...
[2021-04-06 19:14:00+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Starting task run...[2021-04-06 19:14:01+0200] INFO - prefect.exec_query[0] | Query name: menu_ab.sql
[2021-04-06 19:14:01+0200] INFO - prefect.exec_query[1] | Query name: zuzda_ct_by_url_last_hour_src.sql
[2021-04-06 19:14:04+0200] INFO - prefect.exec_query[1] | Columns: ['day', 'ts', 'source', 'campaign_id', 'row_id', 'url', 'ct']
[2021-04-06 19:14:04+0200] INFO - prefect.exec_query[0] | Columns: ['day', 'menu', 'submenu', 'page_version', 'ct', 'uc'] [2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Starting task run...
[2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:05+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Starting task run...
[2021-04-06 19:14:06+0200] INFO - prefect.exec_query[2] | Query name: aktualis_ct_by_url_15m.sql
[2021-04-06 19:14:08+0200] INFO - prefect.exec_query[2] | Columns: ['ct', 'url']
[2021-04-06 19:14:09+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Finished task run for task with final state: 'Success'
[2021-04-06 19:14:09+0200] INFO - prefect.TaskRunner | Task 'save_results[2]': Starting task run...
[2021-04-06 19:14:09+0200] INFO - prefect.TaskRunner | Task 'save_results[2]': Finished task run for task with final state: 'Success'
Andor Tóth
04/06/2021, 5:16 PMAndor Tóth
04/06/2021, 5:16 PMDylan
Andor Tóth
04/06/2021, 5:17 PMDylan
Dylan
Dylan
timeout
parameterDylan
Andor Tóth
04/06/2021, 5:21 PMAndor Tóth
04/06/2021, 5:21 PMAndor Tóth
04/06/2021, 5:22 PMDylan
Dylan
Andor Tóth
04/06/2021, 5:23 PMAndor Tóth
04/06/2021, 5:23 PM^C[2021-04-06 19:22:36+0200] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
^CTraceback (most recent call last):
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 542, in start
yield
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 657, in get_flow_run_state
s.map_states = executor.wait(mapped_children[t])
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 627, in wait
futures, scheduler=self.scheduler, pool=self._pool, optimize_graph=False
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/base.py", line 565, in compute
results = schedule(dsk, keys, **kwargs)
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/threaded.py", line 84, in get
**kwargs
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 476, in get_async
key, res_info, failed = queue_get(queue)
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 133, in queue_get
return q.get()
File "/usr/lib64/python3.6/queue.py", line 164, in get
self.not_empty.wait()
File "/usr/lib64/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
Andor Tóth
04/06/2021, 5:24 PMAndor Tóth
04/06/2021, 5:24 PMDylan
Dylan
Andor Tóth
04/06/2021, 5:24 PMDylan
Dylan
Dylan
Andor Tóth
04/06/2021, 5:26 PMAndor Tóth
04/06/2021, 5:26 PMDylan
Dylan
Andor Tóth
04/06/2021, 5:32 PMDylan
Dylan
Dylan
Dylan
Marvin
04/06/2021, 5:36 PMZanie
Zanie
Zanie
Zanie
waiter
in dask itself that's blocking execution... hm..Zanie
main
check. Does your issue go away if you do
if __name__ == "__main__":
flow.run()
Andor Tóth
04/06/2021, 7:01 PMAndor Tóth
04/06/2021, 7:01 PMAndor Tóth
04/06/2021, 7:02 PMAndor Tóth
04/06/2021, 7:03 PMAndor Tóth
04/06/2021, 7:03 PMAndor Tóth
04/06/2021, 7:04 PMAndor Tóth
04/06/2021, 7:05 PMselect sleep(20000)
), but it did not triggered the errorAndor Tóth
04/06/2021, 7:05 PMAndor Tóth
04/06/2021, 7:13 PMZanie
import csv
from time import sleep
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
SLEEP = 20
@task
def list_query_names():
return ["a", "b", "c"]
@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):
sleep(SLEEP)
return dict(keys={"d", "e", "f"}, rows=["x", "y", "z"])
@task
def save_results(rs, name):
with open(name, 'w') as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerow(rs['keys'])
csv_writer.writerows(rs['rows'])
with Flow("Queries") as flow:
query_names = list_query_names()
results = exec_query.map(query_names)
save_results.map(results, query_names)
flow.executor = LocalDaskExecutor(num_workers=2, schedule='processes')
if __name__ == "__main__":
flow.run()
errors with a sleep of 20 and runs fine with <15Andor Tóth
04/06/2021, 7:20 PMAndor Tóth
04/06/2021, 7:20 PMAndor Tóth
04/06/2021, 7:21 PMselect "a", sleep(10000)
Zanie
with
per https://docs.sqlalchemy.org/en/14/core/connections.html#basic-usage ?Andor Tóth
04/06/2021, 7:32 PMAndor Tóth
04/06/2021, 7:35 PM@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):
sql = Path(SQL_DIR / name).read_text()
print('Query name: %s' % name)
engine = sqla.create_engine(DSN)
with engine.connect() as conn:
rs = conn.execute(sql)
results = dict(keys=rs.keys(), rows=rs.fetchall())
print('Columns: %s' % results['keys'])
return results
Andor Tóth
04/06/2021, 7:38 PMZanie
print('Columns ...
line or before?Andor Tóth
04/06/2021, 7:39 PM[2021-04-06 21:38:34+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Queries'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'list_query_names': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query': Finished task run for task with final state: 'Mapped'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'save_results': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'save_results': Finished task run for task with final state: 'Mapped'
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Starting task run...
[2021-04-06 21:38:34+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Starting task run...
Query name: aktualis_ct_by_url_15m.sql
Query name: menu_ab.sql
Columns: ['ct', 'url']
Columns: ['day', 'menu', 'submenu', 'page_version', 'ct', 'uc']
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Starting task run...
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Finished task run for task with final state: 'Success'
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Starting task run...
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Starting task run...
[2021-04-06 21:38:37+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Finished task run for task with final state: 'Success'
Query name: zuzda_ct_by_url_last_hour_src.sql
Columns: ['day', 'ts', 'source', 'campaign_id', 'row_id', 'url', 'ct']
Andor Tóth
04/06/2021, 7:39 PMAndor Tóth
04/06/2021, 7:40 PM# ps faux | grep '[q]ueries' -A3
prefect 4183313 1.5 0.9 556492 55784 pts/9 Sl+ 21:38 0:01 | \_ python -i queries.py
prefect 4183320 0.0 0.2 53468 12172 pts/9 S+ 21:38 0:00 | \_ /srv/prefect/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(7)
prefect 4183331 2.1 0.0 0 0 pts/9 Z+ 21:38 0:01 | \_ [python] <defunct>
andor.t+ 909727 0.0 0.0 24092 8 pts/3 Ss Mar08 0:00 \_ -bash
Andor Tóth
04/06/2021, 7:41 PM^C[2021-04-06 21:41:16+0200] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
^CTraceback (most recent call last):
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 542, in start
yield
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 657, in get_flow_run_state
s.map_states = executor.wait(mapped_children[t])
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 627, in wait
futures, scheduler=self.scheduler, pool=self._pool, optimize_graph=False
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/base.py", line 565, in compute
results = schedule(dsk, keys, **kwargs)
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/threaded.py", line 84, in get
**kwargs
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 476, in get_async
key, res_info, failed = queue_get(queue)
File "/srv/prefect/venv/lib/python3.6/site-packages/dask/local.py", line 133, in queue_get
return q.get()
File "/usr/lib64/python3.6/queue.py", line 164, in get
self.not_empty.wait()
File "/usr/lib64/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "queries.py", line 43, in <module>
flow.run()
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/core/flow.py", line 1266, in run
**kwargs,
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/core/flow.py", line 1087, in _run
**kwargs,
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 282, in run
executor=executor,
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/utilities/executors.py", line 71, in inner
return runner_method(self, *args, **kwargs)
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 661, in get_flow_run_state
assert isinstance(final_states, dict)
File "/usr/lib64/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/srv/prefect/venv/lib/python3.6/site-packages/prefect/executors/dask.py", line 552, in start
self._pool.join()
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 550, in join
p.join()
File "/usr/lib64/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/usr/lib64/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Andor Tóth
04/06/2021, 7:43 PMAndor Tóth
04/06/2021, 7:43 PMZanie
PREFECT__LOGGING__LEVEL="DEBUG"
and we'll get some more logs from the task runnerZanie
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Calling task.run() method...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Attaching process based timeout handler...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Sending execution to a new process...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Waiting for process to return with 15s timeout...
[2021-04-06 14:43:36-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Waiting for process to return with 15s timeout...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Executing...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Executing...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Passing result back to main process...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Passing result back to main process...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Execution process closed, collecting result...
[2021-04-06 14:43:37-0500] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Execution process closed, collecting result...
Andor Tóth
04/06/2021, 7:44 PM[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Passing result back to main process...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Execution process closed, collecting result...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[0]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'exec_query[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Execution process closed, collecting result...
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Starting task run...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[0]': Handling state change from Pending to Running
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[0]': Calling task.run() method...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[1]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[0]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'exec_query[1]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[0]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Starting task run...
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'exec_query[2]': Starting task run...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[1]': Handling state change from Pending to Running
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Handling state change from Pending to Running
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[1]': Calling task.run() method...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Calling task.run() method...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Attaching process based timeout handler...
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'save_results[1]': Handling state change from Running to Success
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Sending execution to a new process...
[2021-04-06 21:44:28+0200] INFO - prefect.TaskRunner | Task 'save_results[1]': Finished task run for task with final state: 'Success'
[2021-04-06 21:44:28+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Waiting for process to return with 15s timeout...
[2021-04-06 21:44:29+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Executing...
Query name: zuzda_ct_by_url_last_hour_src.sql
Columns: ['day', 'ts', 'source', 'campaign_id', 'row_id', 'url', 'ct']
[2021-04-06 21:44:32+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Passing result back to main process...
[2021-04-06 21:44:43+0200] DEBUG - prefect.TaskRunner | Task 'exec_query[2]': Execution process closed, collecting result...
Andor Tóth
04/06/2021, 7:46 PMzuzda_ct_by_url_last_hour_src.sql
Andor Tóth
04/06/2021, 7:46 PMAndor Tóth
04/06/2021, 7:47 PMAndor Tóth
04/06/2021, 7:47 PMAndor Tóth
04/06/2021, 7:52 PMselect * from tmp.zuzda_test
Andor Tóth
04/06/2021, 7:52 PMAndor Tóth
04/06/2021, 7:52 PMZanie
Andor Tóth
04/06/2021, 7:54 PMselect "A", sleep(1000)
Andor Tóth
04/06/2021, 7:56 PMAndor Tóth
04/06/2021, 7:57 PMAndor Tóth
04/06/2021, 7:57 PMAndor Tóth
04/06/2021, 7:58 PMAndor Tóth
04/06/2021, 8:07 PMZanie
Andor Tóth
04/06/2021, 8:10 PMAndor Tóth
04/06/2021, 8:10 PMAndor Tóth
04/06/2021, 8:10 PMZanie
DaskExecutor
instead of the LocalDaskExecutor
-- Dask itself typically recommends using the distributed executor even for local work.