https://prefect.io logo
Title
l

Lawrence Finn

11/06/2021, 1:07 PM
I’ve been playing with orion, running into some issues: 1. lots of sqlite sqlalchemy errors 2. temp local dask doesnt work
🙏 1
python3 ~/orion_example_flow.py
Traceback (most recent call last):
  File "/Users/lawrencefinn/orion_example_flow.py", line 13, in <module>
    def my_favorite_function(executor=exec):
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 326, in flow
    Flow(
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 109, in __init__
    self.parameters = parameter_schema(self.fn)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/orion/utilities/functions.py", line 56, in parameter_schema
    **pydantic.create_model("Parameters", **model_fields).schema()
  File "pydantic/main.py", line 990, in pydantic.main.create_model
  File "pydantic/main.py", line 299, in pydantic.main.ModelMetaclass.__new__
  File "pydantic/fields.py", line 411, in pydantic.fields.ModelField.infer
  File "pydantic/fields.py", line 342, in pydantic.fields.ModelField.__init__
  File "pydantic/fields.py", line 445, in pydantic.fields.ModelField.prepare
  File "pydantic/fields.py", line 473, in pydantic.fields.ModelField._set_default_and_type
  File "pydantic/fields.py", line 345, in pydantic.fields.ModelField.get_default
  File "pydantic/utils.py", line 630, in pydantic.utils.smart_deepcopy
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 272, in _reconstruct
    y.__setstate__(state)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/executors.py", line 394, in __setstate__
    self._client = distributed.get_client()
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/distributed/worker.py", line 3919, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near ",": syntax error
[SQL: WITH intervals AS 
(
            -- recursive CTE to mimic the behavior of `generate_series`, 
            -- which is only available as a compiled extension 
            WITH RECURSIVE intervals(interval_start, interval_end, counter) AS (
                VALUES(
                    strftime('%Y-%m-%d %H:%M:%f000', ?), 
                    strftime('%Y-%m-%d %H:%M:%f000', ?, ?),
                    1
                    )
                
                UNION ALL
                
                SELECT interval_end, strftime('%Y-%m-%d %H:%M:%f000', interval_end, ?), counter + 1
                FROM intervals
                -- subtract interval because recursive where clauses are effectively evaluated on a t-1 lag
                WHERE 
                    interval_start < strftime('%Y-%m-%d %H:%M:%f000', ?, ?)
                    -- don't compute more than 500 intervals
                    AND counter < 500
            )
            SELECT * FROM intervals
            )
 SELECT counts.interval_start, counts.interval_end, coalesce(json_group_array(json(counts.state_agg)) FILTER (WHERE counts.state_agg IS NOT NULL), '[]') AS states 
FROM (SELECT intervals.interval_start AS interval_start, intervals.interval_end AS interval_end, CASE WHEN (count(runs.id) = ?) THEN NULL ELSE json_object(?, runs.state_type, ?, runs.state_name, ?, count(runs.id), ?, sum(max(?, CAST(STRFTIME('%s', runs.estimated_run_time) AS INTEGER))), ?, sum(max(?, CAST(STRFTIME('%s', runs.estimated_start_time_delta) AS INTEGER)))) END AS state_agg 
FROM intervals LEFT OUTER JOIN (SELECT flow_run.id AS id, flow_run.expected_start_time AS expected_start_time, (SELECT CASE WHEN (flow_run.state_type = ?) THEN strftime(?, (julianday(flow_run.total_run_time) + julianday(strftime(?, (? + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(flow_run_state.timestamp)))) - ?) ELSE flow_run.total_run_time END AS anon_1 
WHERE flow_run.state_id = flow_run_state.id) AS estimated_run_time, CASE WHEN (flow_run.start_time > flow_run.expected_start_time) THEN strftime(?, (? + julianday(flow_run.start_time)) - julianday(flow_run.expected_start_time)) WHEN (flow_run.start_time IS NULL AND (flow_run.state_type NOT IN (?, ?, ?)) AND flow_run.expected_start_time < strftime('%Y-%m-%d %H:%M:%f000', 'now')) THEN strftime(?, (? + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(flow_run.expected_start_time)) ELSE ? END AS estimated_start_time_delta, flow_run_state.type AS state_type, flow_run_state.name AS state_name 
FROM flow_run JOIN flow_run_state ON flow_run.state_id = flow_run_state.id 
WHERE flow_run.state_type IN (?, ?, ?, ?, ?, ?) AND flow_run.expected_start_time <= ? AND flow_run.expected_start_time >= ?) AS runs ON runs.expected_start_time >= intervals.interval_start AND runs.expected_start_time < intervals.interval_end GROUP BY intervals.interval_start, intervals.interval_end, runs.state_type, runs.state_name) AS counts GROUP BY counts.interval_start, counts.interval_end ORDER BY counts.interval_start
 LIMIT ? OFFSET ?]
[parameters: ('2021-11-06T11:39:36.738000+00:00', '2021-11-06T11:39:36.738000+00:00', '+480.0 seconds', '+480.0 seconds', '2021-11-06T13:39:36.738000+00:00', '-480.0 seconds', 0, 'state_type', 'state_name', 'count_runs', 'sum_estimated_run_time', 0, 'sum_estimated_lateness', 0, 'RUNNING', '%Y-%m-%d %H:%M:%f000', '%Y-%m-%d %H:%M:%f000', 2440587.5, 2440587.5, '%Y-%m-%d %H:%M:%f000', 2440587.5, 'FAILED', 'COMPLETED', 'CANCELLED', '%Y-%m-%d %H:%M:%f000', 2440587.5, '1970-01-01 00:00:00.000000', 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED', '2021-11-06 13:39:36.741000', '2021-11-06 11:39:36.741000', 500, 0)]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
a

Anna Geller

11/08/2021, 9:53 AM
@Lawrence Finn can you share: 1. Which Orion version do you use? (you can run: prefect version) - to use Dask, it should be 2.0a4 2. Do you get SQL Alchemy errors only when using Dask in your flow, or with all flows? 3. Can you share your flow that caused this error? Here is an example that you could use to test whether Dask executor is working - you can run this once with Dask executor and once without:
from prefect import flow, task
from prefect.executors import DaskExecutor
from typing import List
import time


@task
def plus_one(x):
    return x + 1


@task
def plus_two(x):
    return x + 2


@task
def sum_all(x, y):
    result = x + y
    print(result)
    time.sleep(10)
    return result


@flow(executor=DaskExecutor())
def run_flow(numbers: List[int] = None):
    for number in numbers:
        one = plus_one(number)
        two = plus_two(number)
        sum_all(one, two)


if __name__ == "__main__":
    start = time.time()
    run_flow(numbers=[1, 2, 3, 4])
    end = time.time()
    print(f"it took {end - start}")
l

Lawrence Finn

11/08/2021, 12:52 PM
Many good questions 😛 1. 2.0a4 2. That sql alchemy error is from the prefect server when I navigate to the UI 3.
cat ~/orion_example_flow.py
from prefect import flow, task
from prefect.executors import DaskExecutor, SequentialExecutor

exec = DaskExecutor(
    cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}
)

@task
def say_it(stuff):
    print(f"Saying {stuff}")

@flow
def my_favorite_function(executor=exec):
    say_it("hello")
    return 0

if __name__ == "__main__":
    my_favorite_function()
a

Anna Geller

11/08/2021, 12:57 PM
ok, this makes sense 🙂 the executor should be specified on the flow decorator, rather than on the flow function:
from prefect import flow, task
from prefect.executors import DaskExecutor


@task
def say_it(stuff):
    print(f"Saying {stuff}")


@flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
def my_favorite_function():
    say_it("hello")
    return 0


if __name__ == "__main__":
    my_favorite_function()
this should work
l

Lawrence Finn

11/08/2021, 12:58 PM
i got the same error
a

Anna Geller

11/08/2021, 12:58 PM
regarding the SQL alchemy error: could you try to install Orion in a fresh virtual environment? alternatively you can reset the DB and try starting the server again:
prefect orion reset-db
prefect orion start
l

Lawrence Finn

11/08/2021, 1:02 PM
resetting and recreating the virtualenv didnt help
a

Anna Geller

11/08/2021, 1:13 PM
can you share a bit more how do you create the environment? so you can run all flows, but when you start Orion to see the UI, you get those sqlalchemy errors? Did you change any of the default configuration? I was trying to reproduce in a fresh conda environment, but in conda it worked well. I see you use pyenv?
l

Lawrence Finn

11/08/2021, 1:33 PM
Here’s what I did: 1. pyenv virtualenv 3.8.5 orion 2. pyenv activate 3.8.5/envs/orion 3. pip install -U “prefect>=2.0.0a” 4. prefect orion start 5. navigate to http://localhost:4200 see errors
sqlite3 -version
3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl
a

Anna Geller

11/08/2021, 1:45 PM
thx 🙌 will try to reproduce today
I had trouble configuring pyenv with pyenv-virtualenv extension. It gave me in the end:
(base) ➜ pyenv virtualenv 3.8.5 orion
pyenv-virtualenv: `3.8.5' is not installed in pyenv.
So I tried reproducing using the same Python version in conda:
conda create -n "orion" python=3.8.5
and it worked well. Would you give conda or venv a try?
z

Zanie

11/08/2021, 4:16 PM
ValueError: No global client found and no address provided
- This error is definitely caused by passing the executor to the flow instead of the decorator as Anna noted. Moving the definition in your sample script, I no longer get this error and the flow runs as expected.
It's possible that we are using a sqlite3 feature not available in that version, we can investigate that. Creating an environment with conda will generally get you the latest sqlite3 in your virtual environment.
:upvote: 1
In fact, anaconda will complain that 3.28 is too old to support python 3.8+
❯ conda install sqlite==3.28.0                    
Collecting package metadata (current_repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Solving environment: | 
Found conflicts! Looking for incompatible packages.
This can take several minutes.  Press CTRL-C to abort.
failed                                                                                                               

UnsatisfiableError: The following specifications were found to be incompatible with each other:

Output in format: Requested package -> Available versions

Package sqlite conflicts for:
python=3.8 -> sqlite[version='>=3.30.0,<4.0a0|>=3.30.1,<4.0a0|>=3.31.1,<4.0a0|>=3.32.3,<4.0a0|>=3.33.0,<4.0a0|>=3.35.4,<4.0a0|>=3.36.0,<4.0a0']
sqlite==3.28.0
l

Lawrence Finn

11/08/2021, 6:00 PM
@Anna Geller you need to install the python version first, i think
pyenv install 3.8.5
@Zanie I changed it to the annotation and it didn’t fix the problem
z

Zanie

11/08/2021, 6:02 PM
from prefect import flow, task
from prefect.executors import DaskExecutor


@task
def say_it(stuff):
    print(f"Saying {stuff}")


@flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
def my_favorite_function():
    say_it("hello")
    return 0


if __name__ == "__main__":
    my_favorite_function()
❯ python 1-orion.py           
12:02:45.060 | Beginning flow run 'rousing-malkoha' for flow 'my-favorite-function'...
12:02:45.061 | Starting executor `DaskExecutor`...
12:02:45.061 | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
12:02:46.206 | The Dask dashboard is available at <http://127.0.0.1:8787/status>
12:02:46.256 | Submitting task run 'say_it-ded9b056-0' to executor...
12:02:46.328 | Shutting down executor `DaskExecutor`...
Saying hello
12:02:46.387 | Task run 'say_it-ded9b056-0' finished in state Completed(message=None, type=COMPLETED)
12:02:46.614 | Flow run 'rousing-malkoha' finished in state Completed(message=None, type=COMPLETED)
l

Lawrence Finn

11/08/2021, 6:03 PM
python3 ~/orion_example_flow.py
Traceback (most recent call last):
  File "/Users/lawrencefinn/orion_example_flow.py", line 9, in <module>
    def my_favorite_function(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})):
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 326, in flow
    Flow(
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 109, in __init__
    self.parameters = parameter_schema(self.fn)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/orion/utilities/functions.py", line 56, in parameter_schema
    **pydantic.create_model("Parameters", **model_fields).schema()
  File "pydantic/main.py", line 990, in pydantic.main.create_model
  File "pydantic/main.py", line 299, in pydantic.main.ModelMetaclass.__new__
  File "pydantic/fields.py", line 411, in pydantic.fields.ModelField.infer
  File "pydantic/fields.py", line 342, in pydantic.fields.ModelField.__init__
  File "pydantic/fields.py", line 445, in pydantic.fields.ModelField.prepare
  File "pydantic/fields.py", line 473, in pydantic.fields.ModelField._set_default_and_type
  File "pydantic/fields.py", line 345, in pydantic.fields.ModelField.get_default
  File "pydantic/utils.py", line 630, in pydantic.utils.smart_deepcopy
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 272, in _reconstruct
    y.__setstate__(state)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/executors.py", line 394, in __setstate__
    self._client = distributed.get_client()
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/distributed/worker.py", line 3919, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
cat ~/orion_example_flow.py
from prefect import flow, task
from prefect.executors import DaskExecutor, SequentialExecutor

@task
def say_it(stuff):
    print(f"Saying {stuff}")

@flow
def my_favorite_function(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})):
    say_it("hello")
    return 0

if __name__ == "__main__":
    my_favorite_function()
z

Zanie

11/08/2021, 6:03 PM
You need to pass it to the
@flow
decorator
See my example vs yours.
l

Lawrence Finn

11/08/2021, 6:03 PM
OH
z

Zanie

11/08/2021, 6:03 PM
😄 yeah
:upvote: 1
l

Lawrence Finn

11/08/2021, 6:03 PM
hahaha
z

Zanie

11/08/2021, 6:03 PM
I'm curious if we can guard against that somehow
Like warn if you do it
l

Lawrence Finn

11/08/2021, 6:04 PM
im a dodo, but the sqlite issue still stands. I thought sqlite driver is built into python and you cannot actually choose the version. the sqlite version i have comes from brew?
python3
Python 3.8.5 (default, Jun 21 2021, 11:47:43) 
[Clang 11.0.3 (clang-1103.0.32.29)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.version
'2.6.0'
>>> sqlite3.sqlite_version
'3.28.0'
i tried upgrading to python 3.9, same issue
z

Zanie

11/08/2021, 6:48 PM
sqlite is on your machine, the binary is not bundled with Python
There's a sqlite module that interfaces with the binary in the python stdlib
l

Lawrence Finn

11/08/2021, 6:49 PM
so what could it be? the sqlite binary version on my mac is the highest version available
z

Zanie

11/08/2021, 6:49 PM
3.28 is from 2019 and is definitely not current.
What's
brew info sqlite
?
l

Lawrence Finn

11/08/2021, 6:50 PM
brew upgrade sqlite3
Warning: sqlite3 3.36.0 already installed
(deploy) Lawrences-MacBook-Pro-2:aiq lawrencefinn$ sqlite3 --version
3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl
o_O
ill try uninstalling and reinstalling
z

Zanie

11/08/2021, 6:51 PM
Note
==> Caveats
sqlite is keg-only, which means it was not symlinked into /opt/homebrew,
because macOS already provides this software and installing another version in
parallel can cause all kinds of trouble.
l

Lawrence Finn

11/08/2021, 6:52 PM
😐
z

Zanie

11/08/2021, 6:53 PM
We use macos but haven't run into this since we use conda which installs sqlite in the virtual environment.
Hm..
You can try adding wherever brew installs sqlite to your path to override the builtin one
l

Lawrence Finn

11/08/2021, 6:55 PM
yeah let me give that a whirl
somehow i broke my pyenv 😕
ok all fixed!
had to add those env vars from sqlite3 brew info, recompile python3 😐
all is well, thanks!
z

Zanie

11/08/2021, 7:32 PM
Great!
Let me know if you can think of a way to make this easier. hmm.
l

Lawrence Finn

11/08/2021, 7:33 PM
mac and python is always a funny dance
perhaps adding a note on how to check your sqlite version from within python would help
>>> import sqlite3
>>> sqlite3.sqlite_version
z

Zanie

11/08/2021, 7:34 PM
We've actually added a warning / auto-detection for sqllite versions to orion but we were wrong about our minimum required version so you didn't see it.
l

Lawrence Finn

11/08/2021, 7:38 PM
ah 😄 in theory i could use postgres, it’d just be more effort to set up locally?
z

Zanie

11/08/2021, 8:15 PM
Just a bit more effort!
I configure it with this environment variable and just have a brew postgres running
PREFECT_ORION_DATABASE_CONNECTION_URL=<postgresql+asyncpg://postgres:postgres@localhost:5432/orion>
I think I had to grant some permissions in psql first.
l

Lawrence Finn

11/08/2021, 8:16 PM
im guessing mysql wouldn’t work? it looks like ur just using sqlalchemy under the hood, tho i dunno if mysql has all the field types
oh theres also no async mysql
z

Zanie

11/08/2021, 8:18 PM
We have to add explicit support for each backend, yeah. Maybe mysql down the line, but not now.