Edmund Tian
12/21/2022, 3:03 PMsqlite3.OperationalError
). Anyone know what’s going on?
Context:
I’m using the @flow
and @task
decorators for my Python functions in my app hosted on Google Cloud Run. I’m not using Prefect Cloud. I’m on the following prefect version: prefect==2.3.2
. It’s hard for me to repo because this occurs during my chron job. I haven’t been able to repo locally.Mark Nuttall-Smith
12/21/2022, 3:19 PMDavid Steiner Sand
12/21/2022, 6:12 PMJessica Smith
12/21/2022, 7:15 PMPaco Ibañez
12/21/2022, 9:46 PMrun_deployment
. What would be the most efficient way of passing the dataframes around?YD
12/21/2022, 9:50 PMfrom prefect import task, flow, get_run_logger, allow_failure
from time import sleep
@task(retries=2, timeout_seconds=3)
def test_1(o, test_retries):
if test_retries:
sleep(5)
else:
raise IOError('Raising some error')
return True
@flow(name='test allow_failure')
def run_test_flow():
t = test_1.submit(True, True)
t = test_1.submit(t, False, wait_for=allow_failure(t))
if __name__ == "__main__":
run_test_flow()
But it does not appear to be working
it does not raise the IOErrorSam Werbalowsky
12/21/2022, 10:24 PMsubmit
? Even if the flow is written in a way that there is no dependencies between tasks?Gows Shaik
12/22/2022, 4:37 AMJons Cyriac
12/22/2022, 5:03 AMChris Arderne
12/22/2022, 7:56 AMv2
(more details in thread!)
1. 🏗️ Structuring project: We're using v2 with a GCS Storage block, a KubernetesJob block and a Flow Deployment (all three defined in Python files). Because we're using GCS Storage, the flow files are not pip installed
so we can only use relative imports in the flow like from .utils import foo
, which can make local development and bugchecking a bit more error-prone. We will probably shift to just using the Docker images local files, but haven't yet experimented with this in Prefect v2. Are there any best-practises around this? (Project structure in thread)
2. 🏃♀️ Running flows: Prefect v1
had a prefect run ...
command in the CLI, which made it possible to run a local flow. Prefect`v2` doesn't seem to have this? We can't add a if __name__ == "__main__"
to the Flow file, because it has relative imports so running it doesn't work. As a workaround I've added a run.py
in a parent directory that imports and runs it, but this isn't very ergonomic. In either case if I want to pass parameters, I must create my own CLI for doing so... Have I missed something in the CLI, or is there a plan to add this? Or is there a better pattern I should be using instead?James Zhang
12/22/2022, 10:14 AMNic
12/22/2022, 10:26 AMNic
12/22/2022, 10:38 AMMohit Singhal
12/22/2022, 12:46 PMMohit Singhal
12/22/2022, 12:48 PMJustin Trautmann
12/22/2022, 2:05 PMDaniel Komisar
12/22/2022, 2:39 PMapply_map
and what I'd really like to do is have each of those be the body of a loop. It looks like you can only loop one single task though. Am I missing something? Thanks!aram
12/22/2022, 3:11 PMPREFECT_DEBUG_MODE
PREFECT_LOGGING_LEVEL
PREFECT_LOGGING_SERVER_LEVEL
.
Non helpsDeepanshu Aggarwal
12/22/2022, 4:01 PMKristian Andersen Hole
12/22/2022, 5:19 PMLaraib Siddiqui
12/22/2022, 5:45 PMfrom prefect.tasks.shell import ShellTask
Laraib Siddiqui
12/22/2022, 5:47 PMfrom prefect.tasks.shell import ShellTask
it throws out and error
ModuleNotFoundError: No module named 'prefect.tasks.shell'; 'prefect.tasks' is not a package
I know i can use prefect-shell
to overcome this and run python scripts but i couldn't find any resolution on the above errorUday
12/22/2022, 7:25 PMKristian Andersen Hole
12/22/2022, 8:03 PMJavier Ochoa
12/22/2022, 8:22 PMwith Flow(
FLOW_NAME,
run_config=UniversalRun(labels=LABELS),
terminal_state_handler=workflow_terminal_state_handler,
) as flow:
file_list = list_unprocessed_files(var)
dataframes = get_dataframes.map(file_list)
dataframes = filter_dataframe.map(
dataframes, resource_type_name=unmapped(var)
)
# this is the final task
print_process_summary_log()
Billy McMonagle
12/23/2022, 3:40 AMAmruth VVKP
12/23/2022, 12:52 PM2022-12-23 12:47:33
2022-12-23 12:47:33 ___ ___ ___ ___ ___ ___ _____ ___ ___ ___ ___ _ _
2022-12-23 12:47:33 | _ \ _ \ __| __| __/ __|_ _| / _ \| _ \_ _/ _ \| \| |
2022-12-23 12:47:33 | _/ / _|| _|| _| (__ | | | (_) | /| | (_) | .` |
2022-12-23 12:47:33 |_| |_|_\___|_| |___\___| |_| \___/|_|_\___\___/|_|\_|
2022-12-23 12:47:33
2022-12-23 12:47:33 Configure Prefect to communicate with the server with:
2022-12-23 12:47:33
2022-12-23 12:47:33 prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
2022-12-23 12:47:33
2022-12-23 12:47:33 View the API reference documentation at <http://0.0.0.0:4200/docs>
2022-12-23 12:47:33
2022-12-23 12:47:33 Check out the dashboard at <http://0.0.0.0:4200>
2022-12-23 12:47:33
2022-12-23 12:47:33
2022-12-23 12:47:33
2022-12-23 12:47:35 Traceback (most recent call last):
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/base.py", line 233, in _catch_revision_errors
2022-12-23 12:47:35 yield
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/base.py", line 443, in _upgrade_revs
2022-12-23 12:47:35 for script in reversed(list(revs))
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 799, in iterate_revisions
2022-12-23 12:47:35 revisions, heads = fn(
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 1454, in _collect_upgrade_revisions
2022-12-23 12:47:35 current_revisions = self.get_revisions(lower)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 527, in get_revisions
2022-12-23 12:47:35 return sum([self.get_revisions(id_elem) for id_elem in id_], ())
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 527, in <listcomp>
2022-12-23 12:47:35 return sum([self.get_revisions(id_elem) for id_elem in id_], ())
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 552, in get_revisions
2022-12-23 12:47:35 return tuple(
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 553, in <genexpr>
2022-12-23 12:47:35 self._revision_for_ident(rev_id, branch_label)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/revision.py", line 624, in _revision_for_ident
2022-12-23 12:47:35 raise ResolutionError(
2022-12-23 12:47:35 alembic.script.revision.ResolutionError: No such revision or branch 'f7587d6c5776'
2022-12-23 12:47:35
2022-12-23 12:47:35 The above exception was the direct cause of the following exception:
2022-12-23 12:47:35
2022-12-23 12:47:35 Traceback (most recent call last):
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 671, in lifespan
2022-12-23 12:47:35 async with self.lifespan_context(app):
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 566, in __aenter__
2022-12-23 12:47:35 await self._router.startup()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 648, in startup
2022-12-23 12:47:35 await handler()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/api/server.py", line 345, in run_migrations
2022-12-23 12:47:35 await db.create_db()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/interface.py", line 55, in create_db
2022-12-23 12:47:35 await self.run_migrations_upgrade()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/interface.py", line 63, in run_migrations_upgrade
2022-12-23 12:47:35 await run_sync_in_worker_thread(alembic_upgrade)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
2022-12-23 12:47:35 return await anyio.to_thread.run_sync(call, cancellable=True)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
2022-12-23 12:47:35 return await get_asynclib().run_sync_in_worker_thread(
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
2022-12-23 12:47:35 return await future
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
2022-12-23 12:47:35 result = context.run(func, *args)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/alembic_commands.py", line 24, in wrapper
2022-12-23 12:47:35 return fn(*args, **kwargs)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/alembic_commands.py", line 53, in alembic_upgrade
2022-12-23 12:47:35 alembic.command.upgrade(alembic_config(), revision, sql=dry_run)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/command.py", line 322, in upgrade
2022-12-23 12:47:35 script.run_env()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/base.py", line 569, in run_env
2022-12-23 12:47:35 util.load_python_file(self.dir, "env.py")
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/util/pyfiles.py", line 94, in load_python_file
2022-12-23 12:47:35 module = load_module_py(module_id, path)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/util/pyfiles.py", line 110, in load_module_py
2022-12-23 12:47:35 spec.loader.exec_module(module) # type: ignore
2022-12-23 12:47:35 File "<frozen importlib._bootstrap_external>", line 883, in exec_module
2022-12-23 12:47:35 File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/migrations/env.py", line 147, in <module>
2022-12-23 12:47:35 apply_migrations()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
2022-12-23 12:47:35 return run_async_from_worker_thread(async_fn, *args, **kwargs)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_from_worker_thread
2022-12-23 12:47:35 return anyio.from_thread.run(call)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
2022-12-23 12:47:35 return asynclib.run_async_from_thread(func, *args)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
2022-12-23 12:47:35 return f.result()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
2022-12-23 12:47:35 return self.__get_result()
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
2022-12-23 12:47:35 raise self._exception
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/migrations/env.py", line 141, in apply_migrations
2022-12-23 12:47:35 await connection.run_sync(do_run_migrations)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py", line 548, in run_sync
2022-12-23 12:47:35 return await greenlet_spawn(fn, conn, *arg, **kw)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
2022-12-23 12:47:35 result = context.switch(value)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/prefect/orion/database/migrations/env.py", line 129, in do_run_migrations
2022-12-23 12:47:35 context.run_migrations()
2022-12-23 12:47:35 File "<string>", line 8, in run_migrations
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/runtime/environment.py", line 853, in run_migrations
2022-12-23 12:47:35 self.get_context().run_migrations(**kw)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/runtime/migration.py", line 611, in run_migrations
2022-12-23 12:47:35 for step in self._migrations_fn(heads, self):
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/command.py", line 311, in upgrade
2022-12-23 12:47:35 return script._upgrade_revs(revision, rev)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/base.py", line 431, in _upgrade_revs
2022-12-23 12:47:35 with self._catch_revision_errors(
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
2022-12-23 12:47:35 self.gen.throw(typ, value, traceback)
2022-12-23 12:47:35 File "/usr/local/lib/python3.10/site-packages/alembic/script/base.py", line 265, in _catch_revision_errors
2022-12-23 12:47:35 raise util.CommandError(resolution) from re
2022-12-23 12:47:35 alembic.util.exc.CommandError: Can't locate revision identified by 'f7587d6c5776'
2022-12-23 12:47:35
2022-12-23 12:47:35 Application startup failed. Exiting.
2022-12-23 12:47:36 Orion stopped!
Logs while trying to launch Prefect Orion on local Ubuntu terminal with 2.7.4 -
___ ___ ___ ___ ___ ___ _____ ___ ___ ___ ___ _ _
| _ \ _ \ __| __| __/ __|_ _| / _ \| _ \_ _/ _ \| \| |
| _/ / _|| _|| _| (__ | | | (_) | /| | (_) | .` |
|_| |_|_\___|_| |___\___| |_| \___/|_|_\___\___/|_|\_|
Configure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
View the API reference documentation at <http://127.0.0.1:4200/docs>
Check out the dashboard at <http://127.0.0.1:4200>
/usr/lib/python3.10/contextlib.py:142: SAWarning: Skipped unsupported reflection of expression-based index ix_flow_run__coalesce_start_time_expected_start_time_desc
next(self.gen)
/usr/lib/python3.10/contextlib.py:142: SAWarning: Skipped unsupported reflection of expression-based index ix_flow_run__coalesce_start_time_expected_start_time_asc
next(self.gen)
Chris Gunderson
12/23/2022, 3:29 PMRobert Esteves
12/23/2022, 6:55 PMPekka
12/24/2022, 2:06 PMPekka
12/24/2022, 2:06 PMRob Freedy
12/24/2022, 5:18 PMAnna Geller
12/25/2022, 1:59 PM