Evaluation of Orion (Alpha5 as well as the Dev-Ver...
# ask-community
v
Evaluation of Orion (Alpha5 as well as the Dev-Version from GH lead to the same findings) We have certain questions concerning Prefect Orion (Prefect Alpha5) • There seems to be no .result() to flows any more. So how does one get the result of a subflow back? • (Sub)-Flow input parameters have to be JSON serializable. Therefore any object/instance parameters are no longer possible. Not even a simple numpy array would work. Why not cloudpickle? • Failed Tasks are not displayed in Radar. • There is no start/stop/restart functionality for Flows. Will there be checkpointing in Orion? • Each parallelized task instance produces a new box in the radar. In Prefect this was much better visualized. • The radar is an interesting new view on the graph. It can be of use. But what we really would like to have is the possibility to allign a certain flow/task at a given coordinate. This is most interesting if one have production code that runs the same flows 24/7 for the next years. The graphs is then frozen.

https://upload.wikimedia.org/wikipedia/commons/8/89/Kozloduy_Nuclear_Power_Plant_-_Control_Room_of_Unit_5.jpg

• There seems to be no way in Orion to store/retrieve self serialized results any more? The Docu states that results were stored in the SQLiteDB. Is this really the future approach? Usual payloads are in the GB range. • Even for trivial examples (inc/dec example with a flow of flows) with local parallelism (DaskExecutor) we get concurrent access to the SQLiteDB resulting in failures. These faillures seem to heal themselves (retries were made that conclude positive). These collisions are not easily reproduced which hints at a runtime (async probably) problem. • In Prefect there were two APIs to address your problem. Functional and Imperative (Baseclasses). The imperative API is not covered with a single word in the Orion documentation. We would really like to have an imperative API in Orion. What are your plans for the APIs in Orion? Sorry for so much criticism. Orion is quite impressive. The Engine does a good job in auto-parallel. With a few decorators one can optimize its code with no cost. But Orion is IMHO quite far from production. Volker
👀 1
k
No worries at all @volkerjaenisch, we definitely understand that Orion is still in alpha and not ready for some use cases. We appreciate the detailed feedback as we work on making it better. No need to apologize on the criticism, especially when it’s clear you spent a lot of time exploring it 😄 A lot of these questions are beyond me, so similar to last time, I will find someone to answer them. It make take a while as we reached end of day already in the US and these people might be out. We’ll definitely get back to you though
v
Thank you for the warm welcome. Staying tuned.
z
Hi! I can address some of these.
As before, states have a
result()
method to retrieve their underlying data
Copy code
from prefect import flow


@flow
def child(y):
    return y


@flow
def parent(x):
    state = child(x)
    print(state.result())


parent(1)
Flow inputs need to be JSON serializable because they are stored in the database. Object/instance parameters are still possible, but require integration with pydantic. We’ll likely investigate a way to loosen this restriction in the future, but large data objects are intended to be passed from flow -> task not flow -> flow.
Failed tasks should be displayed in the graph. Can you provide a minimal example? I do not think we’ve seen this.
Yes absolutely there will be checkpointing. There is already task state caching. This is an important API/experience and we want to make large improvements over the old design.
We’ve discussed a “stable” or “frozen” graph. There will almost definitely be a concept of this. I’ll forward the radar feedback to the UI team.
Self-serialization will be available as well as persistence to customizable locations, results are being persisted to the local file system right now and the database stores a reference to the location.
We’ve definitely encountered some issues with concurrent access to SQLite. However, we haven’t seen the issue with non-trivial examples (aka your tasks are actually doing work) and, as you’ve said, they’re hard to reproduce and investigate. Can you share your example? We’ll generally recommend switching to Postgres to resolve these sort of problems at scale.
I’m not sure an imperative API makes sense when the flow’s graph is discovered at runtime. From my perspective (I definitely do not speak for the whole team / product here), splitting our framework into two APIs causes a lot of confusion. I’d be curious to hear more about what part of the imperative API is valuable and perhaps a suggestion on what it would look like in your ideal world.
upvote 1
Thanks for all the feedback! We appreciate it a lot.
v
Thank you so much for these really fine answers. I will discuss them tomorrow with my team members and come back to you with detailed comments. Ah and BTW just 5 minutes ago i succeeded in running our simulation in Orion for the first time. I used a really dirty hack to circumvent the JSON limit for paramters in flow to flow communication. I used jsonpickled to encode the instances before they go into the flow. And in the flow I decoded them back. As I said, dirty, inefficient, etc. But we are at an evaluation and liked to have a prove of concept. We have lots of flows nested in flows since we like to have reusable building blocks. We will have a whole bunch of similiar but in detail crucially different simulations and it is simply not effective and long term maintaible to build them up out of simple tasks.
Hi I have some new findings concerning our Orion evaluation. Shall I post them here or open up a new thread? E.G: other things that are broken or do not work, not mentioned in my first post.
k
Here definitely works. We’ll still see it
v
We noticed that Orion does not parallelize subflows. Prefect does this quite well.
z
This will be available when we add a mapping operator. They're still parallelizable when using async at the moment.
v
Also two nested Flows with both executor=DaskExceutor() do crash. With a really strange error message. Claiming he cannot serialize function add since it equals not main.add. Attached a simple code reproducing it.
Why do one need a mapping operator. Orion serializes Task in loops really well.
z
Hm. That shouldn't happen. I'll try to reproduce that this weekend.
v
I also do not understand that you keep the distinction between flows and task. Why not just work with tasks that can be nested.
Sorry I forgot the error.log file.
So, the witching hour closes in and I will start my weekend. CU.
z
Subflows are not submitted to the task executor which makes parallelization for them different. The distinction between tasks and flows is more important than it'd seem at first look. Arbitrary nesting creates a lot of execution and visualization complexity.
Have a nice weekend!
s
Hello, I am an employee of volkerjaenisch. I found another issue today:
@task()
def generate_input():
x = np.arange(100000)
return x
@flow
def parent():
input = generate_input()
parent()
Copy code
[11:17:31] sandra_rum: 11:17:01.911 | Flow run 'pastoral-mole' encountered exception:
Traceback (most recent call last):
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 377, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 48, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "/home/sandra/workspace/orion_eva/orion/test.py", line 14, in parent
    input = generate_input()

[11:17:31] sandra_rum:   File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/tasks.py", line 271, in __call__
    return enter_task_run_engine(
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 450, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 59, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 847, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 440, in result
    return self.__get_result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 510, in create_and_submit_task_run
    future = await flow_run_context.executor.submit(

[11:17:31] sandra_rum:   File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/executors.py", line 180, in submit
    self._results[task_run.id] = await run_fn(**run_kwargs)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/client.py", line 59, in wrapper
    return await fn(*args, **kwargs)
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 620, in orchestrate_task_run
    terminal_state = await user_return_value_to_state(
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 734, in user_return_value_to_state
    if is_state(result) or is_state_iterable(result):
  File "/home/sandra/workspace/venvs/orion-VYmp5l7m-py3.9/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
    if isinstance(obj, IterableABC) and obj:
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
11:17:01.912 | Shutting down executor `SequentialExecutor`...
11:17:01.937 | Flow run 'pastoral-mole' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)

Process finished with exit code 0
This is a simple example working with numpy arrays...
k
Hi @Sandra Rum, this was a similar issue and should be fixed in the next alpha as seen in the comments
v
A simple short question. How/where to configure the DB connection string in Orion? Can someone give me a pointer, please.
k
Did you try this?
Copy code
export PREFECT_ORION_DATABASE_CONNECTION_URL=sqlite+aiosqlite:///orion.db
v
ok found it. Is there a way to specify it from code.
k
Will ask the team about that
v
@Kevin Kho Thank you.
What is the correct syntax for an in memory sqlite DB: PREFECT_ORION_DATABASE_CONNECTION_URL=sqlite+aiosqlite///file:memory does not work.
sqlite+aiosqlite:///file::memory:?cache=shared&uri=true&check_same_thread=false does also not work.
=> Table not found error.
/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/bin/python /home/volker/workspace/ORION/orion/orion/flows.py
Traceback (most recent call last):
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
self.dialect.do_execute(
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
cursor.execute(statement, parameters)
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
self._adapt_connection._handle_exception(error)
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 229, in _handle_exception
raise error
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
self.await_(_cursor.execute(operation, parameters))
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
return current.driver.switch(awaitable)
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
value = await result
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/aiosqlite/core.py", line 129, in _execute
return await future
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/aiosqlite/core.py", line 102, in run
result = function()
sqlite3.OperationalError: no such table: flow
The exactly same thing happens when I try to use a postgresql DB.
/net/zolxq00618/home/ref-data/conda_environments/vtt-experiments/bin/python /home/xyvjaeni/workspace/vtt-experiments/scripts/vtt_tasks.py
Traceback (most recent call last):
 
File "_/home/xyvjaeni/_.local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 397, in _prepare_and_execute
   
operation, self._invalidate_schema_cache_asof
 
File "_/home/xyvjaeni/_.local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 638, in _prepare
   
prepared_stmt = await self._connection.prepare(operation)
 
File "_/home/xyvjaeni/_.local/lib/python3.7/site-packages/asyncpg/connection.py", line 571, in prepare
   
record_class=record_class,
 
File "_/home/xyvjaeni/_.local/lib/python3.7/site-packages/asyncpg/connection.py", line 589, in _prepare
   
record_class=record_class,
 
File "_/home/xyvjaeni/_.local/lib/python3.7/site-packages/asyncpg/connection.py", line 403, in _get_statement
   
ignore_custom_codec=ignore_custom_codec,
 
File "asyncpg/protocol/protocol.pyx", line 168, in prepare
asyncpg.exceptions.UndefinedTableError: relation "flow" does not exist
Are there any precautions to be taken before using any of these DB methods? Like Intializing the DB?
d
Hi Volker, Orion should take care of initializing the database and supports both sqlite and postgres connections, can you walk through how you're starting the Orion server when configuring the postgres connection?
v
@Dustin Ngo The code is the following
Copy code
from prefect import flow

@flow
def my_flow():
    return 1

print(my_flow())
And it is called the following way:
export PREFECT_ORION_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///file::memory:?cache=shared&uri=true&check_same_thread=false"; python simpley.py
What I wonder is that there are files created called file::memory etc.
I am utilizing the alpha5 from PyPI which works great for two weeks in several different setups poetry/Anaconda Ubuntu/Debian.
@Dustin Ngo I am not starting the Orion server explicitly. Do I have to?
(orion-2cnkrIXd-py3.9) volker@runner:~/workspace/ORION/orion/orion$ export PREFECT_ORION_DATABASE_CONNECTION_URL="<postgresql+asyncpg://root:PW@127.0.0.1/orion>"; prefect orion start
Starting Orion API server...
INFO:     Started server process [188892]
23:24:37.817 | Started server process [188892]
INFO:     Waiting for application startup.
23:24:37.817 | Waiting for application startup.
23:24:37.817 | Scheduler service scheduled to start in-app
23:24:37.817 | MarkLateRuns service scheduled to start in-app
INFO:     Application startup complete.
23:24:37.857 | Application startup complete.
INFO:     Uvicorn running on <http://127.0.0.1:4200> (Press CTRL+C to quit)
23:24:37.858 | Uvicorn running on <http://127.0.0.1:4200> (Press CTRL+C to quit)
23:24:37.953 | Unexpected error in: ProgrammingError("(sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.UndefinedTableError'>: Relation »deployment« existiert nicht")
Traceback (most recent call last):
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 396, in _prepare_and_execute
prepared_stmt, attributes = await adapt_connection._prepare(
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 638, in _prepare
prepared_stmt = await self._connection.prepare(operation)
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 566, in prepare
return await self._prepare(
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 584, in _prepare
stmt = await self._get_statement(
File "/home/volker/workspace/venvs/orion-2cnkrIXd-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 398, in _get_statement
statement = await self._protocol.prepare(
File "asyncpg/protocol/protocol.pyx", line 168, in prepare
asyncpg.exceptions.UndefinedTableError: Relation »deployment« existiert nicht
v
I think I had similar issue and I tried running: prefect orion reset-db (This will drop and create the table)
v
@Vipul Thank you so much! That was the solution at least for Postgres. Think that should go straight in to the handbook: There are precautions, when using postgresql.
For the sqlite in-memory DB this does not work.
I got the feeling that the problem is between aiosqlite and memory but my debugging has not concluded yet.
So this has nothing to do with the initialization of the in memory DB. The error comes way before that, by simply opening memory with splite3.
d
hey! sorry for the delayed response: because Orion is still in a technical preview state we haven't finalized our database schemas yet and haven't built out migrations, if there's a breaking change to the schema you'll need to reset the DB for the time being
v
Thank you Dustin. And you do have not to apologize. It is simply work in progress. I like to contribute to this progress so it does not bother me if something is not implemented or broken. I just like to have clear facts about the state of orion. As long as an error or a functionality is explicitly stated as "do not use, it is not implemented" or "you have to use the following workaround" things are heavenly.
But I have not found a way to put the in memory sqlite DB to work. But this is quite important. We have a HUGE overhead of Orion in our evaluation simulation. With huge I mean a factor of 60. Please do not worry the base code without Orion runs in 5 seconds. With Orion it runs in 300 Seconds. A 10 second run is definitly NOT the use case Orion is build for. But our customer is really restrictive concerning its code and we have quite limited possibilities for testing. But never the less 290 Sekonds to manage 6 Flows with 30 Tasks (absolutely, not per flow!) in two nested loops seems a of time spend. We are working locally, not even parallelized. So where is the time wasted? So we are tracking this HUGE factor down. It is probably our fault. Is it the SQLlite, is it the serialization, is it something else? To exclude HDD-IO as a cause it would have been nice to switch sqlite to RAM to eliminate this factor. So we switched to Postgres instead. In our case PG reduced the overhead by a factor of 2.5. My Collegue Sandra today has come up with a different scheme for data transfer for flow to flow interaction and came up with a factor of 5 in demo cases. We will implement this in our customers application combined with Postgres to get a new reading.
Runtime comparison: Orange: Using our Hack: Json-Pickle the params before we transfer them from flow to flow Blue: Utilizing Cloudpickle : The last Task in a flow cloudpickles the result to the HDD. The first task of the following flow unpickles the result from the HDD.
One can clearly see that our hack, is not really efficient. The serialization of the parameters in flow to flow communication goes via the sqlite DB. Utilizing cloudpickle and two additional tasks we are O(n) faster. Test-Data were numpy arrays with 10 to 10.000.000 elements. Storing and reading data from the HDD is nearly O(1), wich is high (Around one second), but can be neglected on time consuming workloads. So we conclude: Never transfer lots of data fro flow to flow. OK you had us warned. But now we had it measured. The heavy lifting of data should be done via Cloud-Pickle. For tasks Orion will do this for you. For flow-to-flow you will have to come up with your own home-brew solution ATM. But that is OK at least to us.
k
Thanks for the info and write-up @volkerjaenisch
z
Hey Volker! Thanks for sharing. Those are some good datapoints. As you’ve demonstrated, you shouldn’t pass data from flow to flow that way and are far better off passing data by reference if you need to 🙂 we’re very likely to expose some APIs that can help with this in the future. Even now, we have the beginnings of a data persistence API with
OrionClient.persist_object
.
v
@Zanie I finally found the source of the HUGE processing overhead of Orion. You remember Faktor 60. It is not as we formerly expected the serialization. It is the DB. Switching to Postgresql from sqlite gained a factor of 2.5 runtime performance. So the overhead is definitivly IO-Related. To give this theory more support we moved the PG-Tablespace from our remote HDD partition to a local HDD partition. And in Fact we gained another 30% of performance. But still we are at 66 seconds runtime with Orion vs. 5 seconds without. So we logged the SQL queries to PG and analyzed them. It took our DB specialist some time but I think we found at least one problem with the Oriono DB structure you should be aware of. The SQL code (generated from SQLalchemy) shows in nearly any operation the pattern INSERT/SELECT, UPDATE/SELECT on the same table.
2021-12-07 23:04:20.544 CET [70132] root@orion LOG:  Anweisung: BEGIN ISOLATION LEVEL READ COMMITTED;
2021-12-07 23:04:20.544 CET [70132] root@orion LOG:  Dauer: 0.061 ms
2021-12-07 23:04:20.544 CET [70132] root@orion LOG:  Dauer: 0.114 ms
2021-12-07 23:04:20.545 CET [70132] root@orion LOG:  Dauer: 0.401 ms
2021-12-07 23:04:20.545 CET [70132] root@orion LOG:  Ausführen __asyncpg_stmt_f__: INSERT INTO task_run (id, created, updated, name, run_count, total_run_time, task_key, dyn
amic_key, cache_key, cache_expiration, task_version, empirical_policy, task_inputs, tags, flow_run_id) VALUES ($1::uuid, $2::timestamp with time zone, $3::timestamp with tim
e zone, $4::varchar, $5::integer, $6::interval, $7::varchar, $8::varchar, $9::varchar, $10::timestamp with time zone, $11::varchar, $12::jsonb, $13::jsonb, $14::jsonb, $15::
uuid) ON CONFLICT (flow_run_id, task_key, dynamic_key) DO NOTHING
2021-12-07 23:04:20.545 CET [70132] root@orion DETAIL:  Parameter: $1 = 'ef312c3e-da62-4619-8089-a592fcce2d87', $2 = '2021-12-07 23:04:20.541589+01', $3 = '2021-12-07 23:04:
20.541608+01', $4 = 'my_task-ec9685da-0', $5 = '0', $6 = '00:00:00', $7 = 'ec9685da70232eae82ef1a236e37c301', $8 = '0', $9 = NULL, $10 = NULL, $11 = NULL, $12 = '{"max_retri
es": 0, "retry_delay_seconds": 0.0}', $13 = '{}', $14 = '[]', $15 = '8d007cd6-b64d-48e5-8723-0c10a26988b8'
2021-12-07 23:04:20.548 CET [70132] root@orion LOG:  Dauer: 2.728 ms
2021-12-07 23:04:20.624 CET [70132] root@orion LOG:  Dauer: 0.181 ms
2021-12-07 23:04:20.626 CET [70132] root@orion LOG:  Dauer: 1.557 ms
2021-12-07 23:04:20.626 CET [70132] root@orion LOG:  Ausführen __asyncpg_stmt_10__: SELECT task_run.id, task_run.created, task_run.updated, task_run.name, task_run.state_typ
e, task_run.run_count, task_run.expected_start_time, task_run.next_scheduled_start_time, task_run.start_time, task_run.end_time, task_run.total_run_time, task_run.task_key,
task_run.dynamic_key, task_run.cache_key, task_run.cache_expiration, task_run.task_version, task_run.empirical_policy, task_run.task_inputs, task_run.tags, task_run.flow_run
_id, task_run.state_id, task_run_state_1.id AS id_1, task_run_state_1.created AS created_1, task_run_state_1.updated AS updated_1, task_run_state_1.type, task_run_state_1.ti
mestamp, task_run_state_1.name AS name_1, task_run_state_1.message, task_run_state_1.state_details, task_run_state_1.data, task_run_state_1.task_run_id
FROM task_run LEFT OUTER JOIN task_run_state AS task_run_state_1 ON task_run_state_1.id = task_run.state_id
WHERE task_run.flow_run_id = $1::uuid AND task_run.task_key = $2::varchar AND task_run.dynamic_key = $3::varchar
LIMIT $4::integer
2021-12-07 23:04:20.626 CET [70132] root@orion DETAIL:  Parameter: $1 = '8d007cd6-b64d-48e5-8723-0c10a26988b8', $2 = 'ec9685da70232eae82ef1a236e37c301', $3 = '0', $4 = '1'
2021-12-07 23:04:20.626 CET [70132] root@orion LOG:  Dauer: 0.045 ms
2021-12-07 23:04:20.632 CET [70132] root@orion LOG:  Dauer: 0.065 ms
2021-12-07 23:04:20.632 CET [70132] root@orion LOG:  Dauer: 0.098 ms
2021-12-07 23:04:20.632 CET [70132] root@orion LOG:  Ausführen __asyncpg_stmt_11__: UPDATE task_run SET state_type=$1, expected_start_time=$2::timestamp with time zone WHERE
task_run.id = $3::uuid
2021-12-07 23:04:20.632 CET [70132] root@orion DETAIL:  Parameter: $1 = 'PENDING', $2 = '2021-12-07 23:04:20.538541+01', $3 = 'ef312c3e-da62-4619-8089-a592fcce2d87'
2021-12-07 23:04:20.633 CET [70132] root@orion LOG:  Dauer: 0.226 ms
2021-12-07 23:04:20.634 CET [70132] root@orion LOG:  Dauer: 0.175 ms
2021-12-07 23:04:20.634 CET [70132] root@orion LOG:  Ausführen __asyncpg_stmt_d__: SELECT flow_run.id AS flow_run_id, flow_run.created AS flow_run_created, flow_run.updated
AS flow_run_updated, flow_run.name AS flow_run_name, flow_run.state_type AS flow_run_state_type, flow_run.run_count AS flow_run_run_count, flow_run.expected_start_time AS fl
ow_run_expected_start_time, flow_run.next_scheduled_start_time AS flow_run_next_scheduled_start_time, flow_run.start_time AS flow_run_start_time, flow_run.end_time AS flow_r
un_end_time, flow_run.total_run_time AS flow_run_total_run_time, flow_run.flow_version AS flow_run_flow_version, flow_run.parameters AS flow_run_parameters, flow_run.idempot
ency_key AS flow_run_idempotency_key, flow_run.context AS flow_run_context, flow_run.empirical_policy AS flow_run_empirical_policy, flow_run.empirical_config AS flow_run_emp
irical_config, flow_run.tags AS flow_run_tags, flow_run.auto_scheduled AS flow_run_auto_scheduled, flow_run.flow_id AS flow_run_flow_id, flow_run.deployment_id AS flow_run_d
eployment_id, flow_run.parent_task_run_id AS flow_run_parent_task_run_id, flow_run.state_id AS flow_run_state_id, flow_run_state_1.id AS flow_run_state_1_id, flow_run_state_
1.created AS flow_run_state_1_created, flow_run_state_1.updated AS flow_run_state_1_updated, flow_run_state_1.type AS flow_run_state_1_type, flow_run_state_1.timestamp AS fl
ow_run_state_1_timestamp, flow_run_state_1.name AS flow_run_state_1_name, flow_run_state_1.message AS flow_run_state_1_message, flow_run_state_1.state_details AS flow_run_st
ate_1_state_details, flow_run_state_1.data AS flow_run_state_1_data, flow_run_state_1.flow_run_id AS flow_run_state_1_flow_run_id
FROM flow_run LEFT OUTER JOIN flow_run_state AS flow_run_state_1 ON flow_run_state_1.id = flow_run.state_id
This is due to the ORM that likes to know the actual values of e.g. primary keys generated by the DB in processing the INSERT/UDATE. This would be standard operation of SA before PG8.2. But after PG8.3 the INSERT and UPDATE statements got a new parameter called RETURNING which SQLalchemy supports. With this parameter SQLalchemy does only need do a single INSERT ... RETURNING ... or UPDATE ... RETURNING statement to get the PG created values. With other words the SELECTs are superfluous. But although SQLalchemy can use RETURNING with the PG dialect, it is not used for the Orion code - doubling the number of statements. I am not sure if this is due to a bug in SQLalchemy OR due to the questionable choice of your developers to use UUIDs as primary keys - which may hinders SA OR PG to use RETURNING (the SA docs are not so clear about this, only talking about integer primary keys). I dug so deep at this point since for our simulation example for Orion with 21 Tasks and 6 flows with 5 seconds runtime the number of INSERTs is 6800 and then number of SELECTs is 17500. Since we have 66 seconds runtime (wall time) this derives to 100 INSERTS per second in the PG DB. Not bad for PG. But this looks a bit like an overkill to me. No wonder that the overhead utilizing Orion is so dramatically. By fixing the RETURNING problem mentioned above the number of SELECTS would be dramatically reduced since it is in the order of magnitude of the number of INSERT and UPDATE statements. In other words there are roughly 14000 SELECTS not necessary. The problem with the number of statements is not only the runtime of the individual statements but also the (Net/IO) latency involved with each statement. I also got the feeling the the UPDATE statement is comletely redundant. It happens in any case in the same transaction as the INSERT and covers the same table. A part of the cause for this is in orm_models.py lines 63ff:
# onupdate is only called when statements are actually issued
# against the database. until COMMIT is issued, this column
# will not be updated
updated = sa.Column(
Timestamp(),
nullable=False,
index=True,
server_default=now(),
default=lambda: pendulum.now("UTC"),
onupdate=now(),
)
Commenting out the onupdate handler removes the "updated" column from the UPDATE statement. But the Update of the "state_type" still remains. I did not found the source for the delayed "state_type", so I leave this for you as homework 🙂 These findings were consistent on Ubuntu 18/PG10 and Debian 12/PG14. Cheers, Volker
z
Thanks for the deep dive Volker! I’ll be sharing this with the rest of the team as well. We’ll definitely be digging into performance eventually as well, our focus so far has been designing the features. Maybe there are some quick wins now though.
upvote 1
v
We would really appreciate to hear back from your developers concerning this mess. We have canceled all Orion/Prefect plans for our customer - since this topic is resolved.
We did a minimal run.
from prefect import flow, task
@task
def my_task():
return 1
@flow
def my_flow():
return my_task()
print(my_flow())
Resulting in 100 Database operations. I would think that one INSERT per Flow/Subflow/Task is reasonable. Then 9 UPDATEs for the state chances on the three nodes. Also 3 SELECTS to gain insight in what to do next. This would result in essentially 15 DB operations. So Orion wastes nearly an order of magnitude more DB operations than needed.
z
Hi Volker, I am one of the developers of Orion. I’d like to reemphasize that Orion is in a “technical preview” stage and we do do not recommend it for production yet (https://orion-docs.prefect.io/faq/#why-is-orion-a-technical-preview). A goal of the preview is to collect valuable feedback from our users as we build it into a fully featured product, so we appreciate your look at the database operations. I’ve already opened a PR to address some of your points and we’re doing additional investigation into
RETURNING
/
SELECT
optimization across different database backends.
v
@Zanie Thank you for the clarification. We are not using Orion for production but doing research in the possibility of doing so in the future. Prefect does not fullfill our requirements, and Orion fulfills already many so it probably will be the choice. So we are in an inconvenient situation to sit and wait till Orion is ready. We use part of this time testing Orion. Is there any information available when you will release Orion? What would be really helpful is some kind of not before statement: Not before mid 2022 or not before 2023 .. .
z
I’m waiting for a response still, but a stable release is expected in 2022
v
Thank you for the time line.
Any news?
z
Sure, we’ve patched this
RETURNING
issue which it turns out is a SQLAlchemy bug https://github.com/sqlalchemy/sqlalchemy/issues/7438 which we’ve contributed a documentation fix for until the bug is resolved. We’ve also been exploring memory performance and resolved some issues with unused database session factories.
There’s lots of feature work going on as well, you can see the most recent release notes at https://github.com/PrefectHQ/prefect/blob/orion/RELEASE-NOTES.md
v
That's good to hear. Have you also addressed the UUID as primary key problem? This influences the performance (of joins, inserts) by 2-3 orders of magnitude, negatively.
I have to correct myself. Even if my statement is theoretically sound it does not seem to have a big effect.
import uuid
from timeit import timeit
from prefect.orion.utilities.database import GenerateUUID, UUID
from sqlalchemy import create_engine, Integer, Column
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_engine('<postgresql+psycopg2://root>:@localhost/test')
session_maker = sessionmaker()
session_maker.configure(bind=engine)
Base = declarative_base()
class UserUUID(Base):
__tablename__ = 'user_uuid'
id = Column( 'id',
UUID(),
primary_key=True,
server_default=GenerateUUID(),
default=uuid.uuid4,
)
other_id = Column('other_id',
Integer,
)
class UserID(Base):
__tablename__ = 'user_id'
other_id = Column('other_id',
UUID(),
server_default=GenerateUUID(),
default=uuid.uuid4,
)
id = Column(Integer, primary_key=True)
Base.metadata.create_all(engine)
session = session_maker()
COUNT = 10000
def run_id():
for i in range(COUNT):
ui = UserID()
session.add(ui)
session.commit()
def run_uuid():
for i in range(COUNT):
ui = UserUUID()
session.add(ui)
session.commit()
print('id: {}'.format(timeit(stmt=run_id, number=3)))
print('uuid: {}'.format(timeit(stmt=run_uuid, number=3)))
Count ID UUID 10.000 3.9 4.0 100.000 41.8 46.4 1.000.000 424.3 454.2 I assume that the overhead of creating the UUIDs is much larger than the index calculation for the inserts. Why do you need UUIDs in the first hand? They are just IDs, also they are generated by the DB itself. So global uniqueness cannot be the reason. I will look into the select and joins tomorrow.
OK. It is not the UUID generation overhead. Postgres is a clever bastard. It just optimizes the inserts because they are all in one transaction. This is also not the usecase of orion where each state change runs in its own transaction. So I started testing the following pattern:
def run_id():
for i in range(COUNT):
ui = UserID()
session.add(ui)
session.commit()
The results are quite astonishing count: 10 id: 0.1719658919992071 uuid: 0.21290569899974798 count: 100 id: 1.7246540099986305 uuid: 1.5880974810006592 count: 1000 id: 18.680973778999032 uuid: 16.67814126200028 count: 10000 id: 172.91288137099946 uuid: 169.88319157000115 To be frank - I do not understand this one bit. Tomorrow I will check for errors. Probably someone else is spotting them.
import uuid
from functools import partial
from timeit import timeit
from prefect.orion.utilities.database import GenerateUUID, UUID
from sqlalchemy import create_engine, Integer, Column
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_engine('<postgresql+psycopg2://root@localhost/test>')
session_maker = sessionmaker()
session_maker.configure(bind=engine)
Base = declarative_base()
class UserUUID(Base):
__tablename__ = 'user_uuid'
id = Column( 'id',
UUID(),
primary_key=True,
server_default=GenerateUUID(),
default=uuid.uuid4,
)
other_id = Column('other_id',
Integer,
)
class UserID(Base):
__tablename__ = 'user_id'
other_id = Column('other_id',
UUID(),
server_default=GenerateUUID(),
default=uuid.uuid4,
)
id = Column(Integer, primary_key=True)
Base.metadata.create_all(engine)
session = session_maker()
COUNTS = [10,100,1000,10000]
def run_id(count):
for i in range(count):
ui = UserID()
session.add(ui)
session.commit()
def run_uuid(count):
for i in range(count):
ui = UserUUID()
session.add(ui)
session.commit()
for count in COUNTS:
print('count: {}'.format(count))
print('id: {}'.format(timeit(stmt=partial(run_id, count), number=3)))
print('uuid: {}'.format(timeit(stmt=partial(run_uuid, count), number=3)))
OK Problem found. The python/ORM overhead is of a factor of 5 compared to the execution time in the Database. Please forget all the measurements above, since they are rubbish.
z
Fun to follow though 🙂
I’m not surprised that python is slowing things down.
v
So now I got measurement directly from the DB that are quite interesting: test=# do $$                                                                      <<first_block>> declare  r integer := 0; begin        FOR i IN 1..1000000 LOOP        INSERT INTO user_id (id) values (default) RETURNING id INTO r;        INSERT INTO host_id (user_id) VALUES (r);        END LOOP; end first_block $$; Doing 1.000.000 INSERTs Integer-PK 23548,779 ms UUID-PK 31262,611 ms VARCHAR-PK 38226,757 ms I was really surprised by the not so bad (Factor two) performance on VARCHARs here. Now lets add 3 Mio. inserts more and make some DB operations involving a join, some comparison and at the end sort the result select count(u.id) from user_id as u, host_id as h where u.id = h.user_id and h.id < 48000000 order by u.id Integer-PK 2436,605 ms UUID-PK 3496,410 ms VARCHAR-PK 7250,452 ms Here the VARCHAR is really of with a factor of 3. Conclusion: UUID vs Integer-PK is around 30% slower on INSERTs and 40% slower on JOINs. VARCHAR-PK vs. Integer-PK is factor 2 slower in INSERT and factor 3 slower on JOINs. Since the python/ORM Overhead is around a factor 5 only the VARCHAR-PK may make a difference in python benchmarks. So I like to apologize - your choice of UUID-PK is no fault. After some deep thinking - it may be good for a PK to be random. Due to the randomness the shape of the btree indexes may be better balanced in the Random UUID case. Btrees perform not so good on sorted data like an INTEGER (autoincrement)-PK. Comparing a UUID costs the double amount of CPU cycles as of Integers. But the randomness prevents btree balancing and so reducing the cycle cost to only 30% and not 100%. Quite fascinating. 🖖