Anthony
11/13/2024, 7:44 PMflow_run = await get_client().create_flow_run(
flow=embed_pdf,
parameters={"url": "test/sample.pdf"},
)
Nate
11/13/2024, 7:50 PMembed_pdf
is already served elsewhere, you should be able to change this
flow_run = await get_client().create_flow_run(
flow=embed_pdf,
parameters={"url": "test/sample.pdf"},
)
to this
flow_run = await run_deployment("<flow-name>/embed-pdf", parameters={"url": "test/sample.pdf"})
where you can do
• prefect deployment ls
to figure out what exactly this should be "<flow-name>/embed-pdf"
• run_deployment(..., timeout=0)
if you don't want this call to block until the flow finishesNate
11/13/2024, 7:50 PMcreate_flow_run
, instead you'll want create_flow_run_from_deployment
where run_deployment
is just a convenience method for the latterAnthony
11/13/2024, 8:03 PMflow code
which i'm assuming is something you need to configure on the serve.
Here is my worker.py
from core.workers.workflows.embed_pdf import embed_pdf
from prefect import serve
def deploy():
embed_deployment = embed_pdf.to_deployment(
name="default",
version="1.0.0",
)
serve(embed_deployment)
when execute the worker in terminal 1 i'm seeing the following the in the terminal
$ uv run --package core run-workflows
Your deployments are being served and polling for scheduled runs!
Deployments
┌───────────────────┐
│ embed-pdf/default │
└───────────────────┘
To trigger any of these deployments, use the following command:
$ prefect deployment run [DEPLOYMENT_NAME]
You can also trigger your deployments via the Prefect UI: <http://127.0.0.1:4200/deployments>
client.py
from prefect.deployments import run_deployment
from loguru import logger
import asyncio
async def run_embed_pdf():
<http://logger.info|logger.info>("Running embed-pdf")
flow_run = await run_deployment("embed-pdf/default", parameters={"url": "test/sample.pdf"}, timeout=0)
<http://logger.info|logger.info>(f"Created flow run {flow_run.id}")
def main():
asyncio.run(run_embed_pdf())
I execute this in a terminal 2 and the following prints
uv run --package core trigger-workflow
2024-11-13 11:58:19.069 | INFO | core.workers.workflow_client:run_embed_pdf:6 - Running embed-pdf
2024-11-13 11:58:19.164 | INFO | core.workers.workflow_client:run_embed_pdf:8 - Created flow run 4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c
Once I run the client.py, in terminal 1 I see
11:58:19.820 | INFO | prefect.flow_runs.runner - Runner 'runner-46745b13-c72c-4aec-9320-ab069d1d815b' submitting flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c'
11:58:19.863 | INFO | prefect.flow_runs.runner - Opening process...
11:58:19.872 | INFO | prefect.flow_runs.runner - Completed submission of flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c'
11:58:21.123 | INFO | Flow run 'mega-markhor' - Downloading flow code from storage at '.'
11:58:21.152 | ERROR | prefect.engine - Engine execution of flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c' exited with unexpected exception
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/workflows/embed_pdf.py", line 4, in <module>
from core.workers.tasks.pdf_processing import convert_pdf_to_images
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/pdf_processing.py", line 10, in <module>
from core.workers.tasks.storage import download_r2_bytes, upload_r2_bytes
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/storage.py", line 3, in <module>
from core.env import settings
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/env.py", line 49, in <module>
settings = Settings()
^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic_settings/main.py", line 167, in __init__
super().__init__(
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic/main.py", line 212, in __init__
validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 3 validation errors for Settings
PREFECT.flow_run_id
Extra inputs are not permitted [type=extra_forbidden, input_value='4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.storage_base_path
Extra inputs are not permitted [type=extra_forbidden, input_value='/var/folders/92/shnvzlnn...-41ff-84eb-8a2e6e4c23fd', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.enable_cancellation_and_crashed_hooks
Extra inputs are not permitted [type=extra_forbidden, input_value='false', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/engine.py", line 37, in <module>
flow_run, flow = load_flow_and_flow_run(flow_run_id=flow_run_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 104, in load_flow_and_flow_run
flow = run_coro_as_sync(
^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Library/Application Support/uv/python/cpython-3.12.4-macos-aarch64-none/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/client/utilities.py", line 75, in wrapper
return await func(client, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1949, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 269, in run_sync_in_worker_thread
result = await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2441, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 943, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 279, in call_with_mark
return call()
^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1742, in load_flow_from_entrypoint
flow = import_object(entrypoint)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 205, in import_object
module = load_script_as_module(script_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 168, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'packages/core/src/core/workers/workflows/embed_pdf.py' encountered an exception: 3 validation errors for Settings
PREFECT.flow_run_id
Extra inputs are not permitted [type=extra_forbidden, input_value='4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.storage_base_path
Extra inputs are not permitted [type=extra_forbidden, input_value='/var/folders/92/shnvzlnn...-41ff-84eb-8a2e6e4c23fd', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.enable_cancellation_and_crashed_hooks
Extra inputs are not permitted [type=extra_forbidden, input_value='false', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
11:58:21.428 | ERROR | prefect.flow_runs.runner - Process for flow run 'mega-markhor' exited with status code: 1
11:58:21.449 | INFO | prefect.flow_runs.runner - Reported flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c' as crashed: Flow run process exited with non-zero status code 1.
11:58:21.495 | INFO | Flow run 'mega-markhor' - Downloading flow code from storage at '.'
Is there a way I can set the deployment/serve in the worker.py to use the embed_pdf source code instead of trying to download remotely?Nate
11/13/2024, 8:05 PMembed_pdf
directly (blocking) or create_task
(background it)? or are you not doing that so it doesn't block inside of run_embed_pdf
Nate
11/13/2024, 8:05 PMNate
11/13/2024, 8:06 PMAnthony
11/13/2024, 8:10 PMembed_pdf
will be a very heavy task, so I am going to deploy it in its own worker
node
I have a separate server that I want to call embed_pdf
similar to the client.py function I wrote.
I assume for this setup
1. I would need a prefect server that hosts the flows/deployments/ui/etc,
2. a worker
node for the embed_pdf
flow run to actually execute on
3. node server
that will use a client to trigger the embed_pdf
flow run
visual illustration of what I'm thinking: each is its own node
server
--(run_deployment(..)--> prefect_server
--(handled by prefect)- worker
(that runs the embed_pdf)Nate
11/13/2024, 8:13 PMprefect worker start
or your embed_pdf.serve()
process (listening for scheduled work and submitting to the runtime, which sounds like subprocesses in this case)
3. 👍
in the case of .serve()
it wont have to pull code (Downloading flow code from storage at '.'
just means, the code is already "here" i.e. "`.`") but if it were a worker then you'd define like a pull
step that would clone a repo from somewhere else
i think conceptually you're on the right track and that you're just running into some settings issueNate
11/13/2024, 8:14 PMAnthony
11/13/2024, 8:15 PMNate
11/13/2024, 8:15 PMNate
11/13/2024, 8:16 PMAnthony
11/13/2024, 8:17 PMprefect>=3.1.1
and the env are coming from a .env that is imported via Settings which is used within a @ taskAnthony
11/13/2024, 8:23 PMclass PrefectSettings(BaseSettings):
"""Settings for Prefect"""
API_URL: str = "<http://127.0.0.1:4200/api>"
class Config:
extra = "allow"
class Settings(BaseSettings):
"""Environment settings for recall package"""
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
env_nested_delimiter='__',
extra='ignore'
)
Nate
11/13/2024, 8:25 PMAnthony
11/13/2024, 8:31 PMAnthony
11/13/2024, 8:34 PMuv run --package core run-workflows
Traceback (most recent call last):
File "/Users/anthonysanchez/Repos/mk1/.venv/bin/run-workflows", line 5, in <module>
from core.workers.run_workflows import deploy
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/run_workflows.py", line 1, in <module>
from core.workers.workflows.embed_pdf import embed_pdf
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/workflows/embed_pdf.py", line 3, in <module>
from core.workers.tasks.pdf_processing import convert_pdf_to_images
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/pdf_processing.py", line 10, in <module>
from core.workers.tasks.storage import download_r2_bytes, upload_r2_bytes
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/storage.py", line 3, in <module>
from core.env import settings
File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/env.py", line 52, in <module>
settings = Settings()
^^^^^^^^^^
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic_settings/main.py", line 167, in __init__
super().__init__(
File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic/main.py", line 212, in __init__
validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 2 validation errors for Settings
prefect__api_url
Extra inputs are not permitted [type=extra_forbidden, input_value='<http://127.0.0.1:4200/api>', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
prefect_api_url
Extra inputs are not permitted [type=extra_forbidden, input_value='<http://127.0.0.1:4200/api>', input_type=str]
For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>