Lawrence Finn

    Lawrence Finn

    10 months ago
    I’ve been playing with orion, running into some issues:1. lots of sqlite sqlalchemy errors 2. temp local dask doesnt work
    python3 ~/orion_example_flow.py
    Traceback (most recent call last):
      File "/Users/lawrencefinn/orion_example_flow.py", line 13, in <module>
        def my_favorite_function(executor=exec):
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 326, in flow
        Flow(
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 109, in __init__
        self.parameters = parameter_schema(self.fn)
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/orion/utilities/functions.py", line 56, in parameter_schema
        **pydantic.create_model("Parameters", **model_fields).schema()
      File "pydantic/main.py", line 990, in pydantic.main.create_model
      File "pydantic/main.py", line 299, in pydantic.main.ModelMetaclass.__new__
      File "pydantic/fields.py", line 411, in pydantic.fields.ModelField.infer
      File "pydantic/fields.py", line 342, in pydantic.fields.ModelField.__init__
      File "pydantic/fields.py", line 445, in pydantic.fields.ModelField.prepare
      File "pydantic/fields.py", line 473, in pydantic.fields.ModelField._set_default_and_type
      File "pydantic/fields.py", line 345, in pydantic.fields.ModelField.get_default
      File "pydantic/utils.py", line 630, in pydantic.utils.smart_deepcopy
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 172, in deepcopy
        y = _reconstruct(x, memo, *rv)
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 272, in _reconstruct
        y.__setstate__(state)
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/executors.py", line 394, in __setstate__
        self._client = distributed.get_client()
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/distributed/worker.py", line 3919, in get_client
        raise ValueError("No global client found and no address provided")
    ValueError: No global client found and no address provided
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near ",": syntax error
    [SQL: WITH intervals AS 
    (
                -- recursive CTE to mimic the behavior of `generate_series`, 
                -- which is only available as a compiled extension 
                WITH RECURSIVE intervals(interval_start, interval_end, counter) AS (
                    VALUES(
                        strftime('%Y-%m-%d %H:%M:%f000', ?), 
                        strftime('%Y-%m-%d %H:%M:%f000', ?, ?),
                        1
                        )
                    
                    UNION ALL
                    
                    SELECT interval_end, strftime('%Y-%m-%d %H:%M:%f000', interval_end, ?), counter + 1
                    FROM intervals
                    -- subtract interval because recursive where clauses are effectively evaluated on a t-1 lag
                    WHERE 
                        interval_start < strftime('%Y-%m-%d %H:%M:%f000', ?, ?)
                        -- don't compute more than 500 intervals
                        AND counter < 500
                )
                SELECT * FROM intervals
                )
     SELECT counts.interval_start, counts.interval_end, coalesce(json_group_array(json(counts.state_agg)) FILTER (WHERE counts.state_agg IS NOT NULL), '[]') AS states 
    FROM (SELECT intervals.interval_start AS interval_start, intervals.interval_end AS interval_end, CASE WHEN (count(runs.id) = ?) THEN NULL ELSE json_object(?, runs.state_type, ?, runs.state_name, ?, count(runs.id), ?, sum(max(?, CAST(STRFTIME('%s', runs.estimated_run_time) AS INTEGER))), ?, sum(max(?, CAST(STRFTIME('%s', runs.estimated_start_time_delta) AS INTEGER)))) END AS state_agg 
    FROM intervals LEFT OUTER JOIN (SELECT flow_run.id AS id, flow_run.expected_start_time AS expected_start_time, (SELECT CASE WHEN (flow_run.state_type = ?) THEN strftime(?, (julianday(flow_run.total_run_time) + julianday(strftime(?, (? + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(flow_run_state.timestamp)))) - ?) ELSE flow_run.total_run_time END AS anon_1 
    WHERE flow_run.state_id = flow_run_state.id) AS estimated_run_time, CASE WHEN (flow_run.start_time > flow_run.expected_start_time) THEN strftime(?, (? + julianday(flow_run.start_time)) - julianday(flow_run.expected_start_time)) WHEN (flow_run.start_time IS NULL AND (flow_run.state_type NOT IN (?, ?, ?)) AND flow_run.expected_start_time < strftime('%Y-%m-%d %H:%M:%f000', 'now')) THEN strftime(?, (? + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(flow_run.expected_start_time)) ELSE ? END AS estimated_start_time_delta, flow_run_state.type AS state_type, flow_run_state.name AS state_name 
    FROM flow_run JOIN flow_run_state ON flow_run.state_id = flow_run_state.id 
    WHERE flow_run.state_type IN (?, ?, ?, ?, ?, ?) AND flow_run.expected_start_time <= ? AND flow_run.expected_start_time >= ?) AS runs ON runs.expected_start_time >= intervals.interval_start AND runs.expected_start_time < intervals.interval_end GROUP BY intervals.interval_start, intervals.interval_end, runs.state_type, runs.state_name) AS counts GROUP BY counts.interval_start, counts.interval_end ORDER BY counts.interval_start
     LIMIT ? OFFSET ?]
    [parameters: ('2021-11-06T11:39:36.738000+00:00', '2021-11-06T11:39:36.738000+00:00', '+480.0 seconds', '+480.0 seconds', '2021-11-06T13:39:36.738000+00:00', '-480.0 seconds', 0, 'state_type', 'state_name', 'count_runs', 'sum_estimated_run_time', 0, 'sum_estimated_lateness', 0, 'RUNNING', '%Y-%m-%d %H:%M:%f000', '%Y-%m-%d %H:%M:%f000', 2440587.5, 2440587.5, '%Y-%m-%d %H:%M:%f000', 2440587.5, 'FAILED', 'COMPLETED', 'CANCELLED', '%Y-%m-%d %H:%M:%f000', 2440587.5, '1970-01-01 00:00:00.000000', 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED', '2021-11-06 13:39:36.741000', '2021-11-06 11:39:36.741000', 500, 0)]
    (Background on this error at: <https://sqlalche.me/e/14/e3q8>)
    Anna Geller

    Anna Geller

    10 months ago
    @Lawrence Finn can you share:1. Which Orion version do you use? (you can run: prefect version) - to use Dask, it should be 2.0a4 2. Do you get SQL Alchemy errors only when using Dask in your flow, or with all flows? 3. Can you share your flow that caused this error? Here is an example that you could use to test whether Dask executor is working - you can run this once with Dask executor and once without:
    from prefect import flow, task
    from prefect.executors import DaskExecutor
    from typing import List
    import time
    
    
    @task
    def plus_one(x):
        return x + 1
    
    
    @task
    def plus_two(x):
        return x + 2
    
    
    @task
    def sum_all(x, y):
        result = x + y
        print(result)
        time.sleep(10)
        return result
    
    
    @flow(executor=DaskExecutor())
    def run_flow(numbers: List[int] = None):
        for number in numbers:
            one = plus_one(number)
            two = plus_two(number)
            sum_all(one, two)
    
    
    if __name__ == "__main__":
        start = time.time()
        run_flow(numbers=[1, 2, 3, 4])
        end = time.time()
        print(f"it took {end - start}")
    Lawrence Finn

    Lawrence Finn

    10 months ago
    Many good questions 😛 1. 2.0a4 2. That sql alchemy error is from the prefect server when I navigate to the UI 3.
    cat ~/orion_example_flow.py
    from prefect import flow, task
    from prefect.executors import DaskExecutor, SequentialExecutor
    
    exec = DaskExecutor(
        cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}
    )
    
    @task
    def say_it(stuff):
        print(f"Saying {stuff}")
    
    @flow
    def my_favorite_function(executor=exec):
        say_it("hello")
        return 0
    
    if __name__ == "__main__":
        my_favorite_function()
    Anna Geller

    Anna Geller

    10 months ago
    ok, this makes sense 🙂 the executor should be specified on the flow decorator, rather than on the flow function:
    from prefect import flow, task
    from prefect.executors import DaskExecutor
    
    
    @task
    def say_it(stuff):
        print(f"Saying {stuff}")
    
    
    @flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
    def my_favorite_function():
        say_it("hello")
        return 0
    
    
    if __name__ == "__main__":
        my_favorite_function()
    this should work
    Lawrence Finn

    Lawrence Finn

    10 months ago
    i got the same error
    Anna Geller

    Anna Geller

    10 months ago
    regarding the SQL alchemy error: could you try to install Orion in a fresh virtual environment? alternatively you can reset the DB and try starting the server again:
    prefect orion reset-db
    prefect orion start
    Lawrence Finn

    Lawrence Finn

    10 months ago
    resetting and recreating the virtualenv didnt help
    Anna Geller

    Anna Geller

    10 months ago
    can you share a bit more how do you create the environment? so you can run all flows, but when you start Orion to see the UI, you get those sqlalchemy errors? Did you change any of the default configuration? I was trying to reproduce in a fresh conda environment, but in conda it worked well. I see you use pyenv?
    Lawrence Finn

    Lawrence Finn

    10 months ago
    Here’s what I did:1. pyenv virtualenv 3.8.5 orion 2. pyenv activate 3.8.5/envs/orion 3. pip install -U “prefect>=2.0.0a” 4. prefect orion start 5. navigate to http://localhost:4200 see errors
    sqlite3 -version
    3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl
    Anna Geller

    Anna Geller

    10 months ago
    thx 🙌 will try to reproduce today
    I had trouble configuring pyenv with pyenv-virtualenv extension. It gave me in the end:
    (base) ➜ pyenv virtualenv 3.8.5 orion
    pyenv-virtualenv: `3.8.5' is not installed in pyenv.
    So I tried reproducing using the same Python version in conda:
    conda create -n "orion" python=3.8.5
    and it worked well. Would you give conda or venv a try?
    Michael Adkins

    Michael Adkins

    10 months ago
    ValueError: No global client found and no address provided
    - This error is definitely caused by passing the executor to the flow instead of the decorator as Anna noted. Moving the definition in your sample script, I no longer get this error and the flow runs as expected.
    It's possible that we are using a sqlite3 feature not available in that version, we can investigate that. Creating an environment with conda will generally get you the latest sqlite3 in your virtual environment.
    In fact, anaconda will complain that 3.28 is too old to support python 3.8+
    ❯ conda install sqlite==3.28.0                    
    Collecting package metadata (current_repodata.json): done
    Solving environment: failed with initial frozen solve. Retrying with flexible solve.
    Collecting package metadata (repodata.json): done
    Solving environment: failed with initial frozen solve. Retrying with flexible solve.
    Solving environment: | 
    Found conflicts! Looking for incompatible packages.
    This can take several minutes.  Press CTRL-C to abort.
    failed                                                                                                               
    
    UnsatisfiableError: The following specifications were found to be incompatible with each other:
    
    Output in format: Requested package -> Available versions
    
    Package sqlite conflicts for:
    python=3.8 -> sqlite[version='>=3.30.0,<4.0a0|>=3.30.1,<4.0a0|>=3.31.1,<4.0a0|>=3.32.3,<4.0a0|>=3.33.0,<4.0a0|>=3.35.4,<4.0a0|>=3.36.0,<4.0a0']
    sqlite==3.28.0
    Lawrence Finn

    Lawrence Finn

    10 months ago
    @Anna Geller you need to install the python version first, i think
    pyenv install 3.8.5
    @Michael Adkins I changed it to the annotation and it didn’t fix the problem
    Michael Adkins

    Michael Adkins

    10 months ago
    from prefect import flow, task
    from prefect.executors import DaskExecutor
    
    
    @task
    def say_it(stuff):
        print(f"Saying {stuff}")
    
    
    @flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
    def my_favorite_function():
        say_it("hello")
        return 0
    
    
    if __name__ == "__main__":
        my_favorite_function()
    ❯ python 1-orion.py           
    12:02:45.060 | Beginning flow run 'rousing-malkoha' for flow 'my-favorite-function'...
    12:02:45.061 | Starting executor `DaskExecutor`...
    12:02:45.061 | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
    12:02:46.206 | The Dask dashboard is available at <http://127.0.0.1:8787/status>
    12:02:46.256 | Submitting task run 'say_it-ded9b056-0' to executor...
    12:02:46.328 | Shutting down executor `DaskExecutor`...
    Saying hello
    12:02:46.387 | Task run 'say_it-ded9b056-0' finished in state Completed(message=None, type=COMPLETED)
    12:02:46.614 | Flow run 'rousing-malkoha' finished in state Completed(message=None, type=COMPLETED)
    Lawrence Finn

    Lawrence Finn

    10 months ago
    python3 ~/orion_example_flow.py
    Traceback (most recent call last):
      File "/Users/lawrencefinn/orion_example_flow.py", line 9, in <module>
        def my_favorite_function(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})):
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 326, in flow
        Flow(
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/flows.py", line 109, in __init__
        self.parameters = parameter_schema(self.fn)
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/orion/utilities/functions.py", line 56, in parameter_schema
        **pydantic.create_model("Parameters", **model_fields).schema()
      File "pydantic/main.py", line 990, in pydantic.main.create_model
      File "pydantic/main.py", line 299, in pydantic.main.ModelMetaclass.__new__
      File "pydantic/fields.py", line 411, in pydantic.fields.ModelField.infer
      File "pydantic/fields.py", line 342, in pydantic.fields.ModelField.__init__
      File "pydantic/fields.py", line 445, in pydantic.fields.ModelField.prepare
      File "pydantic/fields.py", line 473, in pydantic.fields.ModelField._set_default_and_type
      File "pydantic/fields.py", line 345, in pydantic.fields.ModelField.get_default
      File "pydantic/utils.py", line 630, in pydantic.utils.smart_deepcopy
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 172, in deepcopy
        y = _reconstruct(x, memo, *rv)
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/copy.py", line 272, in _reconstruct
        y.__setstate__(state)
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/prefect/executors.py", line 394, in __setstate__
        self._client = distributed.get_client()
      File "/Users/lawrencefinn/.pyenv/versions/3.8.5/envs/orion/lib/python3.8/site-packages/distributed/worker.py", line 3919, in get_client
        raise ValueError("No global client found and no address provided")
    ValueError: No global client found and no address provided
    cat ~/orion_example_flow.py
    from prefect import flow, task
    from prefect.executors import DaskExecutor, SequentialExecutor
    
    @task
    def say_it(stuff):
        print(f"Saying {stuff}")
    
    @flow
    def my_favorite_function(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})):
        say_it("hello")
        return 0
    
    if __name__ == "__main__":
        my_favorite_function()
    Michael Adkins

    Michael Adkins

    10 months ago
    You need to pass it to the
    @flow
    decorator
    See my example vs yours.
    Lawrence Finn

    Lawrence Finn

    10 months ago
    OH
    Michael Adkins

    Michael Adkins

    10 months ago
    😄 yeah
    Lawrence Finn

    Lawrence Finn

    10 months ago
    hahaha
    Michael Adkins

    Michael Adkins

    10 months ago
    I'm curious if we can guard against that somehow
    Like warn if you do it
    Lawrence Finn

    Lawrence Finn

    10 months ago
    im a dodo, but the sqlite issue still stands. I thought sqlite driver is built into python and you cannot actually choose the version. the sqlite version i have comes from brew?
    python3
    Python 3.8.5 (default, Jun 21 2021, 11:47:43) 
    [Clang 11.0.3 (clang-1103.0.32.29)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import sqlite3
    >>> sqlite3.version
    '2.6.0'
    >>> sqlite3.sqlite_version
    '3.28.0'
    i tried upgrading to python 3.9, same issue
    Michael Adkins

    Michael Adkins

    10 months ago
    sqlite is on your machine, the binary is not bundled with Python
    There's a sqlite module that interfaces with the binary in the python stdlib
    Lawrence Finn

    Lawrence Finn

    10 months ago
    so what could it be? the sqlite binary version on my mac is the highest version available
    Michael Adkins

    Michael Adkins

    10 months ago
    3.28 is from 2019 and is definitely not current.
    What's
    brew info sqlite
    ?
    Lawrence Finn

    Lawrence Finn

    10 months ago
    brew upgrade sqlite3
    Warning: sqlite3 3.36.0 already installed
    (deploy) Lawrences-MacBook-Pro-2:aiq lawrencefinn$ sqlite3 --version
    3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl
    o_O
    ill try uninstalling and reinstalling
    Michael Adkins

    Michael Adkins

    10 months ago
    Note
    ==> Caveats
    sqlite is keg-only, which means it was not symlinked into /opt/homebrew,
    because macOS already provides this software and installing another version in
    parallel can cause all kinds of trouble.
    Lawrence Finn

    Lawrence Finn

    10 months ago
    😐
    Michael Adkins

    Michael Adkins

    10 months ago
    We use macos but haven't run into this since we use conda which installs sqlite in the virtual environment.
    Hm..
    You can try adding wherever brew installs sqlite to your path to override the builtin one
    Lawrence Finn

    Lawrence Finn

    10 months ago
    yeah let me give that a whirl
    somehow i broke my pyenv 😕
    ok all fixed!
    had to add those env vars from sqlite3 brew info, recompile python3 😐
    all is well, thanks!
    Michael Adkins

    Michael Adkins

    10 months ago
    Great!
    Let me know if you can think of a way to make this easier. hmm.
    Lawrence Finn

    Lawrence Finn

    10 months ago
    mac and python is always a funny dance
    perhaps adding a note on how to check your sqlite version from within python would help
    >>> import sqlite3
    >>> sqlite3.sqlite_version
    Michael Adkins

    Michael Adkins

    10 months ago
    We've actually added a warning / auto-detection for sqllite versions to orion but we were wrong about our minimum required version so you didn't see it.
    Lawrence Finn

    Lawrence Finn

    10 months ago
    ah 😄 in theory i could use postgres, it’d just be more effort to set up locally?
    Michael Adkins

    Michael Adkins

    10 months ago
    Just a bit more effort!
    I configure it with this environment variable and just have a brew postgres running
    PREFECT_ORION_DATABASE_CONNECTION_URL=<postgresql+asyncpg://postgres:postgres@localhost:5432/orion>
    I think I had to grant some permissions in psql first.
    Lawrence Finn

    Lawrence Finn

    10 months ago
    im guessing mysql wouldn’t work? it looks like ur just using sqlalchemy under the hood, tho i dunno if mysql has all the field types
    oh theres also no async mysql
    Michael Adkins

    Michael Adkins

    10 months ago
    We have to add explicit support for each backend, yeah. Maybe mysql down the line, but not now.