BK Lau
01/19/2021, 5:44 PMNomad
as executor instead of Dask
?? I want to use Prefect on the Cloud to drive a Nomad
cluster installed on-premiseMatic Lubej
01/19/2021, 8:18 PMdask_cloudprovider
API. running tutorials and sample code from dask
this works great, but for prefect I have created a dedicated docker image which I provide to the cluster initializer. The cluster gets created, but as soon as the flow starts, after 10 s I get the following time-out error:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 203, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 748, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 953, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 340, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 324, in f
result[0] = yield future
File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1043, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1100, in _ensure_connected
comm = await connect(
File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
raise IOError(
OSError: Timed out trying to connect to <tcp://172.31.40.184:8786> after 10 s
[2021-01-19 20:14:19+0000] ERROR - prefect.Execute process | Unexpected error occured in FlowRunner: OSError('Timed out trying to connect to <tcp://172.31.40.184:8786> after 10 s')
Traceback (most recent call last):
File "s3_process_l2a_2019.py", line 114, in <module>
assert status.is_successful()
AssertionError
Any ideas what is going on? Is the dask scheduler having issues connecting to the workers? Or might it be something else?
Thanks!Billy McMonagle
01/19/2021, 9:29 PMChris Jordan
01/19/2021, 10:13 PMtmpfile
object between tasks, it'll work as expected, as in
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
logger = prefect.context.get("logger")
the_file = tempfile.NamedTemporaryFile()
for i in range(5000000):
the_file.write(b"lorem ipsum\n")
the_file.seek(0)
<http://logger.info|logger.info>("wrote the file")
return the_file
@task(name="reread that data")
def read_some_data(the_file, result=PrefectResult()):
logger = prefect.context.get("logger")
output = the_file.read()
<http://logger.info|logger.info>(f"read the file")
<http://logger.info|logger.info>(f"length of file is {len(output)}")
with Flow("save_retrieve_file_flow") as flow:
f = save_some_data()
g = read_some_data(f)
but passing the name of the file won't work - the file will not be found
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
logger = prefect.context.get("logger")
the_file = tempfile.NamedTemporaryFile()
for i in range(500000):
the_file.write(b"lorem ipsum\n")
the_file.seek(0)
<http://logger.info|logger.info>(f"wrote the file to {the_file.name}")
return the_file.name
@task(name="reread that data")
def read_some_data(the_file_name, result=PrefectResult()):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"trying to open {the_file_name}")
with open(the_file_name, 'r') as the_file:
output = the_file.read()
<http://logger.info|logger.info>(f"read the file")
<http://logger.info|logger.info>(f"length of file is {len(output)}")
with Flow("save_retrieve_file_flow2",
state_handlers=[cloud_only_slack_handler]
) as flow:
f = save_some_data()
g = read_some_data(f)
what's going on here? is the file system being reset between tasks? does prefect clean up temporary files in the flow if they're not in memory directly? something else?Hui Zheng
01/20/2021, 12:58 AMHTTPSConnectionPool(host='10.124.0.1', port=443): Max retries exceeded with url: /apis/batch/v1/namespaces/default/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7fa7a7705668>: Failed to establish a new connection: [Errno 111] Connection refused',))
Riley Hun
01/20/2021, 1:06 AM0.14.4
and I can no longer run flow.environment.executor = executor
. I get the following error AttributeError: 'NoneType' object has no attribute 'executor'
. Does anyone know how to attach an executor to a flow registered to Prefect Server using Docker Storage in the latest version of Prefect. Had no issues with this is Prefect version 0.13.18
.
In the meantime, I'll downgrade to 0.13.8
for the time being.SK
01/20/2021, 2:28 AMSK
01/20/2021, 2:28 AMAiden Price
01/20/2021, 4:49 AMlatest
tag and Always
pull so I can be sure I'm getting the latest dependencies. I'd like to minimise the delay by changing to a IfNotPresent
image pull policy, but then I'd need to pin the image tag to a specific version, but then I'd want to make sure that specific tag could be updated as I keep pushing updates.
Does anyone have any advice? Thanks.Riley Hun
01/20/2021, 5:33 AMPrefect 0.14.4
. My flow, which is using Docker storage and Dask Gateway as the executor, is working fine after the transition. I finished reading the updated documentation and I'm trying to understand the concept of Run Configuration
. I don't quite understand it, especially since my scheduled flow worked without it, so how do I know when a Run Config is applicable?Vitaly Shulgin
01/20/2021, 1:31 PMPaul Vanetti
01/20/2021, 1:35 PMMarwan Sarieddine
01/20/2021, 1:48 PMStartFlowRun
- I just want to make sure I am doing this correctly - please see the example in the thread.Vitaly Shulgin
01/20/2021, 2:34 PMMatthew Blau
01/20/2021, 3:23 PMVipul
01/20/2021, 4:32 PMJ. Martins
01/20/2021, 5:19 PMLuis Gallegos
01/20/2021, 6:22 PMSK
01/20/2021, 7:44 PMZach
01/20/2021, 8:37 PMZach
01/20/2021, 8:37 PMZach
01/20/2021, 8:37 PMBK Lau
01/20/2021, 8:55 PMAlberto de Santos
01/20/2021, 9:06 PMSean Talia
01/20/2021, 9:15 PMaws
extras?Vitaly Shulgin
01/20/2021, 9:26 PMMatic Lubej
01/20/2021, 9:49 PMSai Srikanth
01/20/2021, 10:11 PMLuis Gallegos
01/20/2021, 11:28 PMwith Flow("example-flow") as flow:
example_param = Parameter('example')
print(example_param)
Thanks for your help!!Darshan
01/20/2021, 11:37 PM@task
def task_1(some_param):
// Do something
@task
def task_2():
// Do something
@task
def task_3(some_other_param):
// Do something
flow.set_dependencies(task = task_1)
flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
Darshan
01/20/2021, 11:37 PM@task
def task_1(some_param):
// Do something
@task
def task_2():
// Do something
@task
def task_3(some_other_param):
// Do something
flow.set_dependencies(task = task_1)
flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
Michael Adkins
01/20/2021, 11:41 PMkeyword_tasks
e.g. flow.set_dependencies(task=task_1, keyword_tasks={"some_param": Parameter("example_param")})
bind
as shown in https://docs.prefect.io/core/getting_started/first-steps.html#imperative-apiParameter("example_param")
in my example can also be a task or a constant value ie 10
Darshan
01/20/2021, 11:44 PMMichael Adkins
01/20/2021, 11:45 PMDarshan
01/20/2021, 11:46 PMMichael Adkins
01/20/2021, 11:50 PMdownstream_task.set_upstream(table_creation)
where neededfrom prefect import task, Flow
@task(log_stdout=True)
def display(value):
print(value)
@task
def create_table():
pass
@task
def insert_data():
pass
@task
def query_data():
return "fake-data-from-table"
with Flow("non-data-dependencies") as flow:
create = create_table()
insert = insert_data()
insert.set_upstream(create)
data = query_data()
data.set_upstream(insert)
display(data)
flow.run()
with Flow("non-data-dependencies") as flow:
create = create_table()
insert = insert_data().set_upstream(create)
data = query_data().set_upstream(insert)
display(data)
Darshan
01/21/2021, 12:15 AMfrom prefect import Flow, task
import prefect
@task(name="create db_table_1")
def task_1():
# Create db_table_1
print("Create db_table_1")
@task(name="create db_table_2 using db_table_1")
def task_2():
# Create db_table_2
print("Create db_table_2")
@task(name="create db_table_3 using db_table_2")
def task_3():
# Create db_table_3
print("Create db_table_3")
@task(name="create db_table_4 using db_table_2")
def task_4():
# Create db_table_4
print("Create db_table_3")
@task(name="create db_table_5 using db_table_3 and db_table_4")
def task_5():
# Create db_table_5
print("Create db_table_3")
@task(name="query data from db_table_5 and display")
def query_data():
# Query data and print
print("result of query")
with Flow('ETL') as flow:
task_2.set_upstream(task_1)
task_3.set_upstream(task_2)
task_4.set_upstream(task_2)
task_5.set_dependencies(upstream_tasks=[task_3, task_4])
query_data.set_upstream(task_5)
flow.run()
flow.visualize()