I'm interested in running a prefect flow as a scri...
# ask-community
m
I'm interested in running a prefect flow as a script in a kubernetes cronjob. In prefect 3 is it possible to do this without creating a deployment?
I've got my toy flow in a docker image
Copy code
from prefect import flow

@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello, {name}!")

if __name__ == "__main__":
    my_flow()
but when I call
docker run  python-app-prefect
I get this sqllite error:
Copy code
18:30:18.498 | INFO    | prefect - Starting temporary server on <http://127.0.0.1:8173>
See <https://docs.prefect.io/3.0/manage/self-host#self-host-a-prefect-server> for more information on running a dedicated Prefect server.
18:30:28.344 | ERROR   | uvicorn.error - Traceback (most recent call last):
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 147, in execute
    self._adapt_connection._handle_exception(error)
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 298, in _handle_exception
    raise error
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 129, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 48, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 40, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 132, in _execute
    return await future
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 115, in run
    result = function()
sqlite3.OperationalError: near "FROM": syntax error
k
it looks like it's missing env vars about where your API is, and is trying to use an ephemeral server instead. are you hosting a server somewhere or using cloud?
m
I'm neither hosting a server or using cloud. I trying to just run it as a script.
k
not sure why it's failing, but the server is a requirement, so if you're not pointing to one, a temporary server will be started in the local exec environment that'll also be destroyed when the run is finished
n
hi @Mark Harrison Mcdonald i'd recommend either of the following โ€ข using
.serve()
(technically this does create a deployment, but you dont need to engage with the idea of workers, or work pools or all that dynamic dispatch stuff) to just run it every so often
Copy code
from prefect import flow

@flow
def foo(): pass

if __name__ == "__main__":
  foo.serve(cron="*/15 * * * *")
and then just
python this_file.py
in a pod in your cluster โ€ข if you dont want to do that for some reason, you can always just do this which you can use like this, ie basically just call your function in a loop and sleep etc
๐Ÿ‘€ 1
m
thanks for the suggestion. I'm still running into some errors. Here's more of the logs. It seems like sqlite is trying to perform a migration that it's failing on:
Copy code
File "/srv/app/.venv/lib/python3.10/site-packages/alembic/runtime/environment.py", line 946, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/srv/app/.venv/lib/python3.10/site-packages/alembic/runtime/migration.py", line 628, in run_migrations
    step.migration_fn(**kw)
  File "/srv/app/.venv/lib/python3.10/site-packages/prefect/server/database/migrations/versions/sqlite/2024_09_16_162719_4ad4658cbefe_add_deployment_to_global_concurrency_.py", line 51, in upgrade
    op.execute(sql)
  File "<string>", line 8, in execute
  File "<string>", line 3, in execute
  File "/srv/app/.venv/lib/python3.10/site-packages/alembic/operations/ops.py", line 2551, in execute
    return operations.invoke(op)
  File "/srv/app/.venv/lib/python3.10/site-packages/alembic/operations/base.py", line 442, in invoke
    return fn(self, operation)
  File "/srv/app/.venv/lib/python3.10/site-packages/alembic/operations/toimpl.py", line 236, in execute_sql
    operations.migration_context.impl.execute(
  File "/srv/app/.venv/lib/python3.10/site-packages/alembic/ddl/impl.py", line 217, in execute
    self._exec(sql, execution_options)
  File "/srv/app/.venv/lib/python3.10/site-packages/alembic/ddl/impl.py", line 210, in _exec
    return conn.execute(construct, params)
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
    return meth(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
    ret = self._execute_context(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2355, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 147, in execute
    self._adapt_connection._handle_exception(error)
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 298, in _handle_exception
    raise error
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 129, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/srv/app/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 48, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 40, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 132, in _execute
    return await future
  File "/srv/app/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 115, in run
    result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near "FROM": syntax error
[SQL: 
            WITH deployment_limit_mapping AS (
                SELECT d.id AS deployment_id, l.id AS limit_id
                FROM deployment d
                JOIN concurrency_limit_v2 l ON l.name = 'deployment:' || d.id
            )
            UPDATE deployment
            SET concurrency_limit_id = dlm.limit_id
            FROM deployment_limit_mapping dlm
            WHERE deployment.id = dlm.deployment_id;
    ]
(Background on this error at: <https://sqlalche.me/e/20/e3q8>)

Application startup failed. Exiting.
n
hmm yeah you're right, seems like something is going wrong with the deployment concurrency limit migration can you share how you're attempting to run prefect / what version? i.e image tag or however you're selecting prefect version
m
Copy code
[tool.poetry]
name = "prefect-cron"
version = "0.1.0"
description = ""
authors = ["Mark McDonald"]
readme = "README.md"

[tool.poetry.dependencies]
python = ">=3.10"
prefect = {extras = ["snowflake"], version = "^3.0.11"}

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.3"
black = "^24.10.0"
isort = "^5.13.2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
my flow file: "src/flow.py:
Copy code
from prefect import flow

@flow
def foo(): pass

if __name__ == "__main__":
    foo.serve(cron="*/15 * * * *")
from my Dockerfile:
Copy code
CMD ["python", "src/flow.py"]
n
hrm, interestingly I can't quite reproduce that, with the same flow and this dockerfile
Copy code
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY serve.py .

CMD ["python", "serve.py"]
Copy code
ยป docker build . -f normal.Dockerfile -t a && docker run a
[+] Building 2.1s (10/10) FINISHED                                                                          
.....
 => => writing image sha256:453f3b6a43047daf86c085b449b2e0737a4cfe5673caa5c74573f4119808c469                                

19:38:47.435 | INFO    | prefect - Starting temporary server on <http://127.0.0.1:8233>
See <https://docs.prefect.io/3.0/manage/self-host#self-host-a-prefect-server> for more information on running a dedicated Prefect server.
19:38:56.554 | WARNING | prefect.runner - Cannot schedule flows on an ephemeral server; run `prefect server start` to start the scheduler.
Your flow 'foo' is being served and polling for scheduled runs!

To trigger a run for this flow, use the following command:

        $ prefect deployment run 'foo/foo'
however, I did just realize that the scheduling work work anyways, bc the scheduler runs on the server .. so if you don't want to run a server in your cluster, you might just while / sleep it
m
alright, thanks for looking at this. I will test out your suggestions.
๐Ÿ‘ 1
j
The sqlite error is due to your python build not being compatible with the sqlite version expected by prefect. I hit this with python3.10. You can see this link to try to update the sqlite without changing python versions https://shuaib.org/technical-guide/how-to-update-or-upgrade-sqlite3-version-in-python/ or you could just use a newer python
๐Ÿ™Œ 1
m
thanks Janet
j
@Chris White Another example of someone having a sqlite problem
๐Ÿ‘ 1
n
hey @Janet Carson - i believe this is fixed in main. that PR also adds integration tests against a couple older versions of sqlite, including our minimum required version would you be able to try it out and verify that it now works for you?
j
@Nate I can try... can you give me some instructions on how to install a build of that branch?
n
yep should be just uv pip install git+https://github.com/prefecthq/prefect.git or omit uv for pip, on mobile sorry
j
OK, will try by end of week - I have another thing that needs to get done first
๐Ÿ‘ 1
n
thanks!
j
Yes, this did fix the sqlite issue! thank you
๐Ÿ™Œ 1
n
great! thanks for the follow up