Yo all. Ran into an Orion issue and wondering if a...
# prefect-community
d
Yo all. Ran into an Orion issue and wondering if anyone had something similar. Basically I’m trying to create a flow + DeploymentSpec, then register it to Orion from within the same python file. (Currently have orion server running locally). @Darren was able to forward along this code snippet. But I’ve been getting an error getting it to work:
Copy code
from prefect import flow
from prefect.deployments import DeploymentSpec, create_deployment_from_spec


@flow
def hello_world(name):
    print(f"Hello {name}!")


spec = DeploymentSpec(
    flow=hello_world,
    name="inline-deployment",
    parameters={"name": "Marvin"},
    tags=["foo", "bar"],
)

create_deployment_from_spec(spec=spec)
🙌 1
Basically the error I’m getting is
expected str, bytes or os.PathLike object, not NoneType
because it seems the
create_deployment_from_spec
function is expecting 2 arguments
the second argument seems like it should be an Orion client, but I’ve been having trouble figuring out how to get that incorporated
z
The client will automatically be provided to that function
Can you share the traceback?
(This snippet runs successfully on my machine)
d
sure, what version are you on? maybe I’m behind or something
Copy code
In [4]: @flow
   ...: def hello_world(name):
   ...:     print(f"Hello {name}!")                                                                                                     ...:

In [5]: spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
   ...: create_deployment_from_spec(spec=spec)
   ...:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [5], in <cell line: 2>()
      1 spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
----> 2 create_deployment_from_spec(spec=spec)

File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/utilities/asyncio.py:120, in sync_compatible.<locals>.wrapper(*args, **kw
args)
    116     return run_async_from_worker_thread(async_fn, *args, **kwargs)
    117 else:
    118     # In a sync context and there is no event loop; just create an event loop
File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/utilities/asyncio.py:67, in run_async_in_new_loop(__fn, *args, **kwargs)
     66 def run_async_in_new_loop(__fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any):
---> 67     return anyio.run(partial(__fn, *args, **kwargs))

File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_core/_eventloop.py:56, in run(func, backend, backend_options, *args)
     54 try:
     55     backend_options = backend_options or {}
---> 56     return asynclib.run(func, *args, **backend_options)
     57 finally:
     58     if token:

File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:233, in run(func, debug, use_uvloop, policy, *args)
    230         del _task_states[task]
    232 _maybe_set_event_loop_policy(policy, use_uvloop)
--> 233 return native_run(wrapper(), debug=debug)

File /usr/lib/python3.9/asyncio/runners.py:44, in run(main, debug)
     42     if debug is not None:
     43         loop.set_debug(debug)
---> 44     return loop.run_until_complete(main)
     45 finally:
     46     try:

File /usr/lib/python3.9/asyncio/base_events.py:642, in BaseEventLoop.run_until_complete(self, future)
    639 if not future.done():
    640     raise RuntimeError('Event loop stopped before Future completed.')
--> 642 return future.result()

File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:228, in run.<locals>.wrapper()
    225     task.set_name(task_state.name)
    227 try:
--> 228     return await func(*args)
    229 finally:
    230     del _task_states[task]

File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/client.py:82, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     80 async with client_context as client:
     81     kwargs.setdefault("client", client)
---> 82     return await fn(*args, **kwargs)

File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/deployments.py:222, in create_deployment_from_spec(spec, client)
    219 flow_id = await client.create_flow(spec.flow)
    221 if spec.push_to_server:
--> 222     with open(spec.flow_location, "rb") as flow_file:
    223         flow_data = await client.persist_data(flow_file.read())
    224 else:

TypeError: expected str, bytes or os.PathLike object, not NoneType
my other guess would be I set up the orion local server incorrectly somehow
z
I just tested with the latest build. Here it seems like we've somehow failed to infer the flow location from the flow object
The complaint is that the spec flow location is null
d
it is null in this case, this is run within an ipython session
sorry, forgot to specify that part earlier
but it’s within the same directory as the env and where the server is run from
in prefect v1 flow.register() worked with this approach with no issue
(unless it’s supposed to infer location even in that context)
z
Oh if you're in an ipython session we can't infer a flow location from the object so yeah it'll break
d
yeah that makes sense. and in prefect v1. did flow.register do some pickle magic to make that work?
z
V1 pickles flows and stores them in the file system. V2 doesn't automatically pickle flows for you yet.
d
ah ok
how hard would it be for me to roll my own pickler?
z
Instead, we infer the path to the source code from python object metadata and try to read the source from there.
Easy, but it'll change in the near future.
d
any chance you could point me towards a short term pickle solution and I’ll update whenever it changes? (this would allow me to keep testing stuff without having to change my current setup too much)
(also if your team sold swag that said “We Do Pickle Magic” I’d buy it)
z
lol
So if we take a look at the source code for the function you’re working with
Copy code
@sync_compatible
@inject_client
async def create_deployment_from_spec(
    spec: DeploymentSpec, client: OrionClient
) -> UUID:
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    if spec.push_to_server:
        with open(spec.flow_location, "rb") as flow_file:
            flow_data = await client.persist_data(flow_file.read())
    else:
        flow_data = DataDocument(encoding="file", blob=spec.flow_location.encode())

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id
You can see where the
flow_data
document gets created
You’ll want that to be a pickle document instead
Copy code
@sync_compatible
@inject_client
async def create_pickled_flow_deployment(
    spec: DeploymentSpec, client: OrionClient
) -> UUID:
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    flow_data = DataDocument.encode("cloudpickle", spec.flow)

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id
d
super dope, I’ll try it out
z
Oops the encoder is
"cloudpickle"
not “pickle”
Edited to fix
This uses our pickle serializer
Copy code
@register_serializer("cloudpickle")
class PickleSerializer:
    """
    Serializes arbitrary objects using the pickle protocol.

    Wraps `cloudpickle` to encode bytes in base64 for safe transmission.
    """

    @staticmethod
    def dumps(data: Any) -> bytes:
        data_bytes = cloudpickle.dumps(data)
        return base64.encodebytes(data_bytes)

    @staticmethod
    def loads(blob: bytes) -> Any:
        return cloudpickle.loads(base64.decodebytes(blob))
d
quick Q (apologies if this is naive, all the python I know comes from needing it for data science, so missed some stuff) - can I drop the async prefix + decorators if I’m happy with it running serially?
Or should I keep those as is just to play nice with everything else
z
You need the async stuff for now
👍 1
There’s no synchronous client interface yet so anytime you want to use the API you need an async function
d
Does that mean I’ll also need to pass in the OrionClient excplicitly?
z
But the
@sync_compatible
decorator lets you just call it without awaiting
The
@inject_client
retrieves and passes a client for you
d
sweet
Still getting the same issue, maybe I’m calling the function incorrectly? (really appreciate your help btw)
Copy code
from prefect.utilities.asyncio import sync
from prefect import flow
from prefect.deployments import DeploymentSpec, create_deployment_from_spec, sync_compatible, inject_client, OrionClient


@sync_compatible
@inject_client
async def create_pickled_flow_deployment(
    spec: DeploymentSpec, client: OrionClient
):
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    flow_data = DataDocument.encode("cloudpickle", spec.flow)

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id


@flow
def hello_world(name):
    print(f"Hello {name}!")

spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
sync(create_deployment_from_spec(spec=spec))
z
You’re calling the old function
I named it something new
d
ha my bd
alright, I think I’m almost there. So I’m getting a UUID back from the function, but i’m not seeing it get picked up by my agents / server
I’ve tried a couple of variations of work-queues and agent runs
is there anything special I’ll need to do to shoot this up into orion? Maybe call the flow from the deployment object?
z
You shouldn’t
Are you creating the deployment with the same API your agents are using?
d
I’ve been calling them from the CLI within the same virtual env. Should I be calling them from the python session somehow?
z
Is this all on one machine?
d
yep
oh, should I set that push server field to false
z
Copy code
❯ python flow.py       
Created deployment hello-world/test
                                                                                                                                                                   orion-dev-38
❯ prefect deployment execute hello-world/test
18:19:26.788 | INFO    | prefect.engine - Created flow run 'russet-anaconda' for flow 'hello-world'
18:19:26.789 | INFO    | Flow run 'russet-anaconda' - Using task runner 'ConcurrentTaskRunner'
18:19:26.789 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
18:19:26.800 | DEBUG   | Flow run 'russet-anaconda' - Executing flow 'hello-world' for flow run 'russet-anaconda'...
18:19:26.801 | DEBUG   | Flow run 'russet-anaconda' - Beginning execution...
Hello Marvin!
/Users/mz/dev/orion/src/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
18:19:26.831 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner...
18:19:26.831 | INFO    | Flow run 'russet-anaconda' - Finished in state Completed(None)
                                                                                                                                                                    orion-dev-38
❯ prefect deployment run hello-world/test                    
Created flow run 'fuzzy-wildebeest' (fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab)
                                                                                                                                                                   orion-dev-38
❯ prefect work-queue create default      
UUID('2b1b8923-8885-4630-a5de-2d63d655cd3b')
                                                                                                                                                                   orion-dev-38
❯ prefect agent start default      
Starting agent with ephemeral API...

  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|


Agent started! Looking for work from queue 'default'...
18:19:53.055 | DEBUG   | prefect.agent - Checking for flow runs...
18:19:53.089 | INFO    | prefect.agent - Submitting flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'
18:19:53.105 | INFO    | prefect.flow_runner.subprocess - Opening subprocess for flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'...
18:19:53.105 | DEBUG   | prefect.flow_runner.subprocess - Using command: /opt/homebrew/Caskroom/miniconda/base/envs/orion-dev-38/bin/python -m prefect.engine fb6e4976a68e44a7b3d3c4f2e1f0b1ab
18:19:53.114 | INFO    | prefect.agent - Completed submission of flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'
18:19:54.806 | INFO    | prefect.flow_runner.subprocess - Subprocess for flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab' exited cleanly.
Copy code
from prefect import flow
from prefect.deployments import (
    DataDocument,
    DeploymentSpec,
    OrionClient,
    inject_client,
    sync_compatible,
)


@sync_compatible
@inject_client
async def create_pickled_flow_deployment(spec: DeploymentSpec, client: OrionClient):
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    flow_data = DataDocument.encode("cloudpickle", spec.flow)

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id


@flow
def hello_world(name):
    print(f"Hello {name}!")


spec = DeploymentSpec(
    flow=hello_world,
    name="test",
    parameters={"name": "Marvin"},
    tags=["foo", "bar"],
)

create_pickled_flow_deployment(spec)
print(f"Created deployment {spec.flow.name}/{spec.name}")
d
And we’re good! thanks so much
v
Thank you for useful thread, guys!
Instead, we infer the path to the source code from python object metadata and try to read the source from there.
Easy, but it’ll change in the near future.
Michael, what changes about exchange of flow data will be implemented in near future?
z
I’m working on improving flow storage / deployments
Basically making it easier to use and more powerful so we can support common patterns like GitHub storage