Hello, any help on this would be greatly appreciat...
# ask-community
a
Hello, any help on this would be greatly appreciated. I think i may be using Prefect incorrectly. I have a job where I want to consume a url, given that url I will run processing on it and embed it. Is this how I should be calling that job from my client-server (not the workflow server)
Copy code
flow_run = await get_client().create_flow_run(
            flow=embed_pdf,
            parameters={"url": "test/sample.pdf"},
        )
n
if
embed_pdf
is already served elsewhere, you should be able to change this
Copy code
flow_run = await get_client().create_flow_run(
            flow=embed_pdf,
            parameters={"url": "test/sample.pdf"},
        )
to this
Copy code
flow_run = await run_deployment("<flow-name>/embed-pdf", parameters={"url": "test/sample.pdf"})
where you can do •
prefect deployment ls
to figure out what exactly this should be
"<flow-name>/embed-pdf"
run_deployment(..., timeout=0)
if you don't want this call to block until the flow finishes
for more context, you'll almost never want
create_flow_run
, instead you'll want
create_flow_run_from_deployment
where
run_deployment
is just a convenience method for the latter
a
Am I missing a step as far as serving the flow? I'm getting some issues with the
flow code
which i'm assuming is something you need to configure on the serve. Here is my worker.py
Copy code
from core.workers.workflows.embed_pdf import embed_pdf
from prefect import serve


def deploy():
    embed_deployment = embed_pdf.to_deployment(
        name="default",
        version="1.0.0",
    )
    serve(embed_deployment)
when execute the worker in terminal 1 i'm seeing the following the in the terminal
Copy code
$ uv run --package core run-workflows
Your deployments are being served and polling for scheduled runs!

     Deployments     
┌───────────────────┐
│ embed-pdf/default │
└───────────────────┘

To trigger any of these deployments, use the following command:

        $ prefect deployment run [DEPLOYMENT_NAME]

You can also trigger your deployments via the Prefect UI: <http://127.0.0.1:4200/deployments>
client.py
Copy code
from prefect.deployments import run_deployment
from loguru import logger
import asyncio

async def run_embed_pdf():
    <http://logger.info|logger.info>("Running embed-pdf")
    flow_run = await run_deployment("embed-pdf/default", parameters={"url": "test/sample.pdf"}, timeout=0)
    <http://logger.info|logger.info>(f"Created flow run {flow_run.id}")


def main():
    asyncio.run(run_embed_pdf())
I execute this in a terminal 2 and the following prints
Copy code
uv run --package core trigger-workflow
2024-11-13 11:58:19.069 | INFO     | core.workers.workflow_client:run_embed_pdf:6 - Running embed-pdf
2024-11-13 11:58:19.164 | INFO     | core.workers.workflow_client:run_embed_pdf:8 - Created flow run 4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c
Once I run the client.py, in terminal 1 I see
Copy code
11:58:19.820 | INFO    | prefect.flow_runs.runner - Runner 'runner-46745b13-c72c-4aec-9320-ab069d1d815b' submitting flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c'
11:58:19.863 | INFO    | prefect.flow_runs.runner - Opening process...
11:58:19.872 | INFO    | prefect.flow_runs.runner - Completed submission of flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c'
11:58:21.123 | INFO    | Flow run 'mega-markhor' - Downloading flow code from storage at '.'
11:58:21.152 | ERROR   | prefect.engine - Engine execution of flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c' exited with unexpected exception
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/workflows/embed_pdf.py", line 4, in <module>
    from core.workers.tasks.pdf_processing import convert_pdf_to_images
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/pdf_processing.py", line 10, in <module>
    from core.workers.tasks.storage import download_r2_bytes, upload_r2_bytes
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/storage.py", line 3, in <module>
    from core.env import settings
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/env.py", line 49, in <module>
    settings = Settings()
               ^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic_settings/main.py", line 167, in __init__
    super().__init__(
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic/main.py", line 212, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 3 validation errors for Settings
PREFECT.flow_run_id
  Extra inputs are not permitted [type=extra_forbidden, input_value='4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.storage_base_path
  Extra inputs are not permitted [type=extra_forbidden, input_value='/var/folders/92/shnvzlnn...-41ff-84eb-8a2e6e4c23fd', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.enable_cancellation_and_crashed_hooks
  Extra inputs are not permitted [type=extra_forbidden, input_value='false', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/engine.py", line 37, in <module>
    flow_run, flow = load_flow_and_flow_run(flow_run_id=flow_run_id)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 104, in load_flow_and_flow_run
    flow = run_coro_as_sync(
           ^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Library/Application Support/uv/python/cpython-3.12.4-macos-aarch64-none/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/client/utilities.py", line 75, in wrapper
    return await func(client, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1949, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 269, in run_sync_in_worker_thread
    result = await anyio.to_thread.run_sync(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2441, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 943, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 279, in call_with_mark
    return call()
           ^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1742, in load_flow_from_entrypoint
    flow = import_object(entrypoint)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 205, in import_object
    module = load_script_as_module(script_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 168, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'packages/core/src/core/workers/workflows/embed_pdf.py' encountered an exception: 3 validation errors for Settings
PREFECT.flow_run_id
  Extra inputs are not permitted [type=extra_forbidden, input_value='4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.storage_base_path
  Extra inputs are not permitted [type=extra_forbidden, input_value='/var/folders/92/shnvzlnn...-41ff-84eb-8a2e6e4c23fd', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
PREFECT.enable_cancellation_and_crashed_hooks
  Extra inputs are not permitted [type=extra_forbidden, input_value='false', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
11:58:21.428 | ERROR   | prefect.flow_runs.runner - Process for flow run 'mega-markhor' exited with status code: 1
11:58:21.449 | INFO    | prefect.flow_runs.runner - Reported flow run '4d2b02dc-bbc0-4aab-9a52-a8b1abcc208c' as crashed: Flow run process exited with non-zero status code 1.
11:58:21.495 | INFO    | Flow run 'mega-markhor' - Downloading flow code from storage at '.'
Is there a way I can set the deployment/serve in the worker.py to use the embed_pdf source code instead of trying to download remotely?
n
i mean do you need a deployment? can you just call
embed_pdf
directly (blocking) or
create_task
(background it)? or are you not doing that so it doesn't block inside of
run_embed_pdf
these settings-related "extra_forbidden" errors look suspicious
a
maybe i'm thinking about the architecture incorrectly?
embed_pdf
will be a very heavy task, so I am going to deploy it in its own
worker
node I have a separate server that I want to call
embed_pdf
similar to the client.py function I wrote. I assume for this setup 1. I would need a prefect server that hosts the flows/deployments/ui/etc, 2. a
worker
node for the
embed_pdf
flow run to actually execute on 3. node
server
that will use a client to trigger the
embed_pdf
flow run visual illustration of what I'm thinking: each is its own node
server
--(run_deployment(..)-->
prefect_server
--(handled by prefect)-
worker
(that runs the embed_pdf)
n
1. yep! 2. yep, and this can either be
prefect worker start
or your
embed_pdf.serve()
process (listening for scheduled work and submitting to the runtime, which sounds like subprocesses in this case) 3. 👍 in the case of
.serve()
it wont have to pull code (
Downloading flow code from storage at '.'
just means, the code is already "here" i.e. "`.`") but if it were a worker then you'd define like a
pull
step that would clone a repo from somewhere else i think conceptually you're on the right track and that you're just running into some settings issue
all these "Extra inputs are not permitted" things
a
Ohh I think I see the issue, thank you! Its a pydantic thing, I guess when prefect sets up the process, its adding env variables that the settings don't allow for
n
yes! if you dont mind me asking, what version of the SDK do you have? and do do you know where those settings are coming from? ie .env or somewhere else?
we've made some changes to settings pretty recently, but we should ignore extras, not fail on them
a
here is my version
Copy code
prefect>=3.1.1
and the env are coming from a .env that is imported via Settings which is used within a @ task
I also have a Prefect settings within my env.py
Copy code
class PrefectSettings(BaseSettings):
    """Settings for Prefect"""
    API_URL: str = "<http://127.0.0.1:4200/api>"
    class Config:
        extra = "allow" 

class Settings(BaseSettings):
    """Environment settings for recall package"""
    model_config = SettingsConfigDict(
        env_file='.env',
        env_file_encoding='utf-8',
        env_nested_delimiter='__',
        extra='ignore'
    )
n
aha. i would nix that PrefectSettings class since our settings are already pydantic settings and so whether you export PREFECT_API_URL or set it in .env we should find and use that api url you set
a
nice, will do, ty
On that note acutally, I get an error without that Prefect class, I think its due to my env_nested_delimiter
Copy code
uv run --package core run-workflows
Traceback (most recent call last):
  File "/Users/anthonysanchez/Repos/mk1/.venv/bin/run-workflows", line 5, in <module>
    from core.workers.run_workflows import deploy
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/run_workflows.py", line 1, in <module>
    from core.workers.workflows.embed_pdf import embed_pdf
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/workflows/embed_pdf.py", line 3, in <module>
    from core.workers.tasks.pdf_processing import convert_pdf_to_images
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/pdf_processing.py", line 10, in <module>
    from core.workers.tasks.storage import download_r2_bytes, upload_r2_bytes
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/workers/tasks/storage.py", line 3, in <module>
    from core.env import settings
  File "/Users/anthonysanchez/Repos/mk1/packages/core/src/core/env.py", line 52, in <module>
    settings = Settings()
               ^^^^^^^^^^
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic_settings/main.py", line 167, in __init__
    super().__init__(
  File "/Users/anthonysanchez/Repos/mk1/.venv/lib/python3.12/site-packages/pydantic/main.py", line 212, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 2 validation errors for Settings
prefect__api_url
  Extra inputs are not permitted [type=extra_forbidden, input_value='<http://127.0.0.1:4200/api>', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>
prefect_api_url
  Extra inputs are not permitted [type=extra_forbidden, input_value='<http://127.0.0.1:4200/api>', input_type=str]
    For further information visit <https://errors.pydantic.dev/2.9/v/extra_forbidden>