Lawrence Finn
11/06/2021, 1:07 PMpython3 ~/orion_example_flow.py
Traceback (most recent call last):
File "/Users/lawrencefinn/orion_example_flow.py", line 13, in <module>
def my_favorite_function(executor=exec):
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 326, in flow
Flow(
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 109, in __init__
self.parameters = parameter_schema(self.fn)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/orion/utilities/functions.py", line 56, in parameter_schema
**pydantic.create_model("Parameters", **model_fields).schema()
File "pydantic/main.py", line 990, in pydantic.main.create_model
File "pydantic/main.py", line 299, in pydantic.main.ModelMetaclass.__new__
File "pydantic/fields.py", line 411, in pydantic.fields.ModelField.infer
File "pydantic/fields.py", line 342, in pydantic.fields.ModelField.__init__
File "pydantic/fields.py", line 445, in pydantic.fields.ModelField.prepare
File "pydantic/fields.py", line 473, in pydantic.fields.ModelField._set_default_and_type
File "pydantic/fields.py", line 345, in pydantic.fields.ModelField.get_default
File "pydantic/utils.py", line 630, in pydantic.utils.smart_deepcopy
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 272, in _reconstruct
y.__setstate__(state)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/executors.py", line 394, in __setstate__
self._client = distributed.get_client()
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/distributed/worker.py", line 3919, in get_client
raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near ",": syntax error
[SQL: WITH intervals AS
(
-- recursive CTE to mimic the behavior of `generate_series`,
-- which is only available as a compiled extension
WITH RECURSIVE intervals(interval_start, interval_end, counter) AS (
VALUES(
strftime('%Y-%m-%d %H:%M:%f000', ?),
strftime('%Y-%m-%d %H:%M:%f000', ?, ?),
1
)
UNION ALL
SELECT interval_end, strftime('%Y-%m-%d %H:%M:%f000', interval_end, ?), counter + 1
FROM intervals
-- subtract interval because recursive where clauses are effectively evaluated on a t-1 lag
WHERE
interval_start < strftime('%Y-%m-%d %H:%M:%f000', ?, ?)
-- don't compute more than 500 intervals
AND counter < 500
)
SELECT * FROM intervals
)
SELECT counts.interval_start, counts.interval_end, coalesce(json_group_array(json(counts.state_agg)) FILTER (WHERE counts.state_agg IS NOT NULL), '[]') AS states
FROM (SELECT intervals.interval_start AS interval_start, intervals.interval_end AS interval_end, CASE WHEN (count(runs.id) = ?) THEN NULL ELSE json_object(?, runs.state_type, ?, runs.state_name, ?, count(runs.id), ?, sum(max(?, CAST(STRFTIME('%s', runs.estimated_run_time) AS INTEGER))), ?, sum(max(?, CAST(STRFTIME('%s', runs.estimated_start_time_delta) AS INTEGER)))) END AS state_agg
FROM intervals LEFT OUTER JOIN (SELECT flow_run.id AS id, flow_run.expected_start_time AS expected_start_time, (SELECT CASE WHEN (flow_run.state_type = ?) THEN strftime(?, (julianday(flow_run.total_run_time) + julianday(strftime(?, (? + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(flow_run_state.timestamp)))) - ?) ELSE flow_run.total_run_time END AS anon_1
WHERE flow_run.state_id = flow_run_state.id) AS estimated_run_time, CASE WHEN (flow_run.start_time > flow_run.expected_start_time) THEN strftime(?, (? + julianday(flow_run.start_time)) - julianday(flow_run.expected_start_time)) WHEN (flow_run.start_time IS NULL AND (flow_run.state_type NOT IN (?, ?, ?)) AND flow_run.expected_start_time < strftime('%Y-%m-%d %H:%M:%f000', 'now')) THEN strftime(?, (? + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(flow_run.expected_start_time)) ELSE ? END AS estimated_start_time_delta, flow_run_state.type AS state_type, flow_run_state.name AS state_name
FROM flow_run JOIN flow_run_state ON flow_run.state_id = flow_run_state.id
WHERE flow_run.state_type IN (?, ?, ?, ?, ?, ?) AND flow_run.expected_start_time <= ? AND flow_run.expected_start_time >= ?) AS runs ON runs.expected_start_time >= intervals.interval_start AND runs.expected_start_time < intervals.interval_end GROUP BY intervals.interval_start, intervals.interval_end, runs.state_type, runs.state_name) AS counts GROUP BY counts.interval_start, counts.interval_end ORDER BY counts.interval_start
LIMIT ? OFFSET ?]
[parameters: ('2021-11-06T11:39:36.738000+00:00', '2021-11-06T11:39:36.738000+00:00', '+480.0 seconds', '+480.0 seconds', '2021-11-06T13:39:36.738000+00:00', '-480.0 seconds', 0, 'state_type', 'state_name', 'count_runs', 'sum_estimated_run_time', 0, 'sum_estimated_lateness', 0, 'RUNNING', '%Y-%m-%d %H:%M:%f000', '%Y-%m-%d %H:%M:%f000', 2440587.5, 2440587.5, '%Y-%m-%d %H:%M:%f000', 2440587.5, 'FAILED', 'COMPLETED', 'CANCELLED', '%Y-%m-%d %H:%M:%f000', 2440587.5, '1970-01-01 00:00:00.000000', 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED', '2021-11-06 13:39:36.741000', '2021-11-06 11:39:36.741000', 500, 0)]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Anna Geller
from prefect import flow, task
from prefect.executors import DaskExecutor
from typing import List
import time
@task
def plus_one(x):
return x + 1
@task
def plus_two(x):
return x + 2
@task
def sum_all(x, y):
result = x + y
print(result)
time.sleep(10)
return result
@flow(executor=DaskExecutor())
def run_flow(numbers: List[int] = None):
for number in numbers:
one = plus_one(number)
two = plus_two(number)
sum_all(one, two)
if __name__ == "__main__":
start = time.time()
run_flow(numbers=[1, 2, 3, 4])
end = time.time()
print(f"it took {end - start}")
Lawrence Finn
11/08/2021, 12:52 PMcat ~/orion_example_flow.py
from prefect import flow, task
from prefect.executors import DaskExecutor, SequentialExecutor
exec = DaskExecutor(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}
)
@task
def say_it(stuff):
print(f"Saying {stuff}")
@flow
def my_favorite_function(executor=exec):
say_it("hello")
return 0
if __name__ == "__main__":
my_favorite_function()
Anna Geller
from prefect import flow, task
from prefect.executors import DaskExecutor
@task
def say_it(stuff):
print(f"Saying {stuff}")
@flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
def my_favorite_function():
say_it("hello")
return 0
if __name__ == "__main__":
my_favorite_function()
this should workLawrence Finn
11/08/2021, 12:58 PMAnna Geller
prefect orion reset-db
prefect orion start
Lawrence Finn
11/08/2021, 1:02 PMAnna Geller
Lawrence Finn
11/08/2021, 1:33 PMsqlite3 -version
3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl
Anna Geller
(base) ➜ pyenv virtualenv 3.8.5 orion
pyenv-virtualenv: `3.8.5' is not installed in pyenv.
So I tried reproducing using the same Python version in conda:
conda create -n "orion" python=3.8.5
and it worked well. Would you give conda or venv a try?Zanie
ValueError: No global client found and no address provided
- This error is definitely caused by passing the executor to the flow instead of the decorator as Anna noted. Moving the definition in your sample script, I no longer get this error and the flow runs as expected.❯ conda install sqlite==3.28.0
Collecting package metadata (current_repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Solving environment: |
Found conflicts! Looking for incompatible packages.
This can take several minutes. Press CTRL-C to abort.
failed
UnsatisfiableError: The following specifications were found to be incompatible with each other:
Output in format: Requested package -> Available versions
Package sqlite conflicts for:
python=3.8 -> sqlite[version='>=3.30.0,<4.0a0|>=3.30.1,<4.0a0|>=3.31.1,<4.0a0|>=3.32.3,<4.0a0|>=3.33.0,<4.0a0|>=3.35.4,<4.0a0|>=3.36.0,<4.0a0']
sqlite==3.28.0
Lawrence Finn
11/08/2021, 6:00 PMpyenv install 3.8.5
Zanie
from prefect import flow, task
from prefect.executors import DaskExecutor
@task
def say_it(stuff):
print(f"Saying {stuff}")
@flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
def my_favorite_function():
say_it("hello")
return 0
if __name__ == "__main__":
my_favorite_function()
❯ python 1-orion.py
12:02:45.060 | Beginning flow run 'rousing-malkoha' for flow 'my-favorite-function'...
12:02:45.061 | Starting executor `DaskExecutor`...
12:02:45.061 | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
12:02:46.206 | The Dask dashboard is available at <http://127.0.0.1:8787/status>
12:02:46.256 | Submitting task run 'say_it-ded9b056-0' to executor...
12:02:46.328 | Shutting down executor `DaskExecutor`...
Saying hello
12:02:46.387 | Task run 'say_it-ded9b056-0' finished in state Completed(message=None, type=COMPLETED)
12:02:46.614 | Flow run 'rousing-malkoha' finished in state Completed(message=None, type=COMPLETED)
Lawrence Finn
11/08/2021, 6:03 PMpython3 ~/orion_example_flow.py
Traceback (most recent call last):
File "/Users/lawrencefinn/orion_example_flow.py", line 9, in <module>
def my_favorite_function(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})):
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 326, in flow
Flow(
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 109, in __init__
self.parameters = parameter_schema(self.fn)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/orion/utilities/functions.py", line 56, in parameter_schema
**pydantic.create_model("Parameters", **model_fields).schema()
File "pydantic/main.py", line 990, in pydantic.main.create_model
File "pydantic/main.py", line 299, in pydantic.main.ModelMetaclass.__new__
File "pydantic/fields.py", line 411, in pydantic.fields.ModelField.infer
File "pydantic/fields.py", line 342, in pydantic.fields.ModelField.__init__
File "pydantic/fields.py", line 445, in pydantic.fields.ModelField.prepare
File "pydantic/fields.py", line 473, in pydantic.fields.ModelField._set_default_and_type
File "pydantic/fields.py", line 345, in pydantic.fields.ModelField.get_default
File "pydantic/utils.py", line 630, in pydantic.utils.smart_deepcopy
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 272, in _reconstruct
y.__setstate__(state)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/executors.py", line 394, in __setstate__
self._client = distributed.get_client()
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/distributed/worker.py", line 3919, in get_client
raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
cat ~/orion_example_flow.py
from prefect import flow, task
from prefect.executors import DaskExecutor, SequentialExecutor
@task
def say_it(stuff):
print(f"Saying {stuff}")
@flow
def my_favorite_function(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})):
say_it("hello")
return 0
if __name__ == "__main__":
my_favorite_function()
Zanie
@flow
decoratorLawrence Finn
11/08/2021, 6:03 PMZanie
Lawrence Finn
11/08/2021, 6:03 PMZanie
Lawrence Finn
11/08/2021, 6:04 PMpython3
Python 3.8.5 (default, Jun 21 2021, 11:47:43)
[Clang 11.0.3 (clang-1103.0.32.29)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.version
'2.6.0'
>>> sqlite3.sqlite_version
'3.28.0'
Zanie
Lawrence Finn
11/08/2021, 6:49 PMZanie
brew info sqlite
?Lawrence Finn
11/08/2021, 6:50 PMbrew upgrade sqlite3
Warning: sqlite3 3.36.0 already installed
(deploy) Lawrences-MacBook-Pro-2:aiq lawrencefinn$ sqlite3 --version
3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl
o_OZanie
==> Caveats
sqlite is keg-only, which means it was not symlinked into /opt/homebrew,
because macOS already provides this software and installing another version in
parallel can cause all kinds of trouble.
Lawrence Finn
11/08/2021, 6:52 PMZanie
Lawrence Finn
11/08/2021, 6:55 PMZanie
Lawrence Finn
11/08/2021, 7:33 PM>>> import sqlite3
>>> sqlite3.sqlite_version
Zanie
Lawrence Finn
11/08/2021, 7:38 PMZanie
PREFECT_ORION_DATABASE_CONNECTION_URL=<postgresql+asyncpg://postgres:postgres@localhost:5432/orion>
Lawrence Finn
11/08/2021, 8:16 PMZanie