Fred Israel
01/30/2020, 6:56 PMFred Israel
01/30/2020, 7:06 PMFred Israel
01/30/2020, 7:12 PMJohn Ramirez
01/30/2020, 8:28 PMNayeentara
01/31/2020, 7:36 AMAlex Cano
01/31/2020, 6:12 PMAttributeError: Context object has no attribute dict
. Can anyone spot something wrong with the flow (like structurally or conceptually)?
from prefect import Flow, task
from prefect.environments.storage import Local
@task
def extract():
return [1, 2, 3]
@task
def always_process_and_pass_downstream(element: int) -> int:
return element * 2
@task
def load(elements: List[int]) -> None:
total = sum(elements)
@task
def process_and_finish(element: int) -> None:
avg = sum(element) / len(element)
pass
with Flow("test client register flow 1", storage=Local()) as flow:
extracted = extract()
processed = always_process_and_pass_downstream(element=extracted, mapped=True)
process_and_finish(element=processed)
load(elements=processed)
result = flow.run()
Thanks!Dharhas Pothina
02/01/2020, 4:53 PMfrom prefect import Flow, task
@task
def read_parquet(src_path)
return df # returns dask df
@task
def retrieve_img(x):
return img
@task
def model(img):
return result
@task
def write_db(result):
# write to db
@task
def write_parquet(dst_path, df):
# write to disk
with Flow('Test') as flow:
df = read_parquet(path)
images = retrieve_img.map(df.iterrows())
results = model.map(images)
write_db.map(results)
write_parquet(df.join(dd.from_sequence(results)))
essentially each row of my input dask dataframe needs to be processed and the results appended to the dataframe and saved in a database.itay livni
02/02/2020, 6:43 PMtask, make_node
in a LOOP
. The current implementation is a task
that sits inside a Flow
wrapped up in a looped
task
. I tried some quick stabs at the Dask Executor
. Errors arose. Is there a better pattern to achieve parallelizing make_node
? ThanksRomain
02/03/2020, 3:15 PMmap_fn = FunctionTask(lambda x: x + 1, tags=["dask-resource:GPU=1"])
with Flow('test') as flow:
list = [1]
map_fn.map(list)
While running such a flow on a dask cluster with 1 scheduler, and 1 worker (with --resources GPU=1
), it turns out that the flow is blocked because there are 2 tasks that "reserved" the gpu resources (see attached pics from the dask dashboard).
It looks like the map itself reserved 1 GPU, and then the map_fn map on element from numbers cannot be processed because dask wait for the resource to be released.
Is that the expected behavior of the mapping with dask resources?Jackson Maxfield Brown
02/03/2020, 10:20 PMTsang Yong
02/04/2020, 1:28 AMFabian Thomas
02/04/2020, 1:51 PMFabian Thomas
02/04/2020, 2:27 PMDaniel Gazit
02/04/2020, 2:52 PMRyanB
02/04/2020, 9:37 PM[2020-02-04 05:17:43,587] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
I try and terminate the dask distrbuted cluster cleanly, but always get : 2020-02-04 05:17:44,467 INFO stopping Dask cluster
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.scheduler - INFO - Remove worker <Worker '<tcp://100.96.120.2:37227>', name: <tcp://100.96.120.2:37227>, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://100.96.120.2:37227>
distributed.batched - INFO - Batched Comm Closed: in <closed TCP>: Stream is closed
distributed.scheduler - INFO - Remove worker <Worker '<tcp://100.96.120.2:43873>', name: <tcp://100.96.120.2:43873>, memory: 0, processing: 0>
followed by: distributed.scheduler - INFO - Lost all workers
2020-02-04 05:17:45,604 WARNING Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f3856212470>: Failed to establish a new connection: [Errno 111] Connection refused',)': /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot
2020-02-04 05:17:45,604 WARNING Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f384f59eb38>: Failed to establish a new connection: [Errno 111] Connection refused',)': /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot
2020-02-04 05:17:45,605 WARNING Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f384f59e0b8>: Failed to establish a new connection: [Errno 111] Connection refused',)': /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/urllib3/connection.py", line 157, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/usr/local/lib/python3.6/dist-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/usr/local/lib/python3.6/dist-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.6/dist-packages/urllib3/connection.py", line 300, in connect
conn = self._new_conn()
File "/usr/local/lib/python3.6/dist-packages/urllib3/connection.py", line 169, in _new_conn
self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.VerifiedHTTPSConnection object at 0x7f3874be2fd0>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.6/weakref.py", line 624, in _exitfunc
f()
File "/usr/lib/python3.6/weakref.py", line 548, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/usr/local/lib/python3.6/dist-packages/dask_kubernetes/core.py", line 623, in _cleanup_resources
pods = core_api.list_namespaced_pod(namespace, label_selector=format_labels(labels))
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/apis/core_v1_api.py", line 12372, in list_namespaced_pod
(data) = self.list_namespaced_pod_with_http_info(namespace, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/apis/core_v1_api.py", line 12472, in list_namespaced_pod_with_http_info
collection_formats=collection_formats)
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/api_client.py", line 334, in call_api
_return_http_data_only, collection_formats, _preload_content, _request_timeout)
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/api_client.py", line 168, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/api_client.py", line 355, in request
headers=headers)
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/rest.py", line 231, in GET
query_params=query_params)
File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/rest.py", line 205, in request
headers=headers)
File "/usr/local/lib/python3.6/dist-packages/urllib3/request.py", line 76, in request
method, url, fields=fields, headers=headers, **urlopen_kw
File "/usr/local/lib/python3.6/dist-packages/urllib3/request.py", line 97, in request_encode_url
return self.urlopen(method, url, **extra_kw)
File "/usr/local/lib/python3.6/dist-packages/urllib3/poolmanager.py", line 330, in urlopen
response = conn.urlopen(method, u.request_uri, **kw)
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 760, in urlopen
**response_kw
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 760, in urlopen
**response_kw
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 760, in urlopen
**response_kw
File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 720, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.6/dist-packages/urllib3/util/retry.py", line 436, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=443): Max retries exceeded with url: /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f3874be2fd0>: Failed to establish a new connection: [Errno 111] Connection refused',))
is there something specific I need to do to properly tear down the dask cluster?Artem Andrienko
02/05/2020, 2:00 PMwith Flow("Data collection") as flow:
params = task(params)
another_task(**params)
but python raise an error
TypeError: FunctionTask object argument after ** must be a mapping, not FunctionTask
Is there any way to unpack dict from task?itay livni
02/06/2020, 5:42 AMPreston Marshall
02/06/2020, 2:47 PMPreston Marshall
02/06/2020, 3:31 PMJohn Ramirez
02/06/2020, 8:01 PMmap
to run a cross product. For example, task_one with inputs [a, b] and [c, d] will run {a,c] [a,d] [b, c] [b,d]Preston Marshall
02/06/2020, 10:36 PMPreston Marshall
02/06/2020, 10:58 PMPreston Marshall
02/07/2020, 3:44 PMNate Atkins
02/07/2020, 7:43 PMexecutor = DaskExecutor(address="<tcp://192.168.86.23:8786>")
flow.run(executor=executor)
I get the KilledWorker error.
ERROR - prefect.Flow: x-files | Unexpected error occured in FlowRunner: KilledWorker('retrieve_url-a032a3d8-fa89-4131-ad4c-4f3c5fbd282c', <Worker 'tcp://192.168.86.23:58923', name: tcp://192.168.86.23:58923, memory: 0, processing: 1>)
By commenting things out I'm down to the offending line being
html = requests.get(url)
in the retrieve_url function.
Any thoughts?Nate Atkins
02/07/2020, 7:59 PMjorwoods
02/07/2020, 9:13 PMpython flow_x.py
?
2. If I update the flow, I then need to stop the existing process running said flow, and restart it with the updated version. Is that correct? Is there a way to refresh this automatically? (Seems like this is part of prefect cloud, but wanted to check)
3. What if I have a branch in a flow where several mapped tasks share some common dependency that has costly start-up/tear-down? (E.G. Several tasks in the flow can run, then start a shared EMR cluster, use the EMR cluster, then when all are complete, tear the cluster down)Preston Marshall
02/07/2020, 11:14 PMPreston Marshall
02/08/2020, 1:30 AMPreston Marshall
02/08/2020, 1:31 AMitay livni
02/08/2020, 4:35 PMKillworker
and dask-worker: error: unrecognized arguments: <tcp://127.0.0.5:8786>
with the second error always being any arguments I give the scheduler. Any thoughts on how to profile the DaskExecutor?itay livni
02/08/2020, 4:35 PMKillworker
and dask-worker: error: unrecognized arguments: <tcp://127.0.0.5:8786>
with the second error always being any arguments I give the scheduler. Any thoughts on how to profile the DaskExecutor?Chris White
02/08/2020, 4:36 PMitay livni
02/08/2020, 4:38 PMexecutor = DaskExecutor(address="<tcp://127.0.0.5:8786>", debug=True)
or debug=Falseif make_nodes_executor=='dask':
from prefect.engine.executors import DaskExecutor
#client = Client()
executor = DaskExecutor(address="<tcp://127.0.0.5:8786>", debug=True) # Does not work
else:# make_nodes_executor=="local":
from prefect.engine.executors import LocalExecutor
executor = LocalExecutor()
Chris White
02/08/2020, 4:47 PMitay livni
02/08/2020, 5:21 PMFlow
within a LOOP
task
. That task
is triggered by another flow
run locally.Chris White
02/08/2020, 5:25 PMitay livni
02/08/2020, 5:27 PMChris White
02/08/2020, 5:29 PMitay livni
02/08/2020, 5:33 PMChris White
02/08/2020, 5:35 PMitay livni
02/08/2020, 5:39 PMmultiprocessing
or some other library/method?generate_knowledge_graph
is put together. It works but.. running it in the cloud or using storage does not. Basically anything that has a cloudpickle
fails. I need to learn how to chain and update flows.Chris White
02/08/2020, 7:46 PMitay livni
02/08/2020, 9:30 PM