https://prefect.io logo
Title
d

Daniel Chapsky

03/22/2022, 9:32 PM
Yo all. Ran into an Orion issue and wondering if anyone had something similar. Basically I’m trying to create a flow + DeploymentSpec, then register it to Orion from within the same python file. (Currently have orion server running locally). @Darren was able to forward along this code snippet. But I’ve been getting an error getting it to work:
from prefect import flow
from prefect.deployments import DeploymentSpec, create_deployment_from_spec


@flow
def hello_world(name):
    print(f"Hello {name}!")


spec = DeploymentSpec(
    flow=hello_world,
    name="inline-deployment",
    parameters={"name": "Marvin"},
    tags=["foo", "bar"],
)

create_deployment_from_spec(spec=spec)
🙌 1
Basically the error I’m getting is
expected str, bytes or os.PathLike object, not NoneType
because it seems the
create_deployment_from_spec
function is expecting 2 arguments
the second argument seems like it should be an Orion client, but I’ve been having trouble figuring out how to get that incorporated
z

Zanie

03/22/2022, 9:38 PM
The client will automatically be provided to that function
Can you share the traceback?
(This snippet runs successfully on my machine)
d

Daniel Chapsky

03/22/2022, 10:14 PM
sure, what version are you on? maybe I’m behind or something
In [4]: @flow
   ...: def hello_world(name):
   ...:     print(f"Hello {name}!")                                                                                                     ...:

In [5]: spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
   ...: create_deployment_from_spec(spec=spec)
   ...:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [5], in <cell line: 2>()
      1 spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
----> 2 create_deployment_from_spec(spec=spec)

File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/utilities/asyncio.py:120, in sync_compatible.<locals>.wrapper(*args, **kw
args)
    116     return run_async_from_worker_thread(async_fn, *args, **kwargs)
    117 else:
    118     # In a sync context and there is no event loop; just create an event loop
File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/utilities/asyncio.py:67, in run_async_in_new_loop(__fn, *args, **kwargs)
     66 def run_async_in_new_loop(__fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any):
---> 67     return anyio.run(partial(__fn, *args, **kwargs))

File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_core/_eventloop.py:56, in run(func, backend, backend_options, *args)
     54 try:
     55     backend_options = backend_options or {}
---> 56     return asynclib.run(func, *args, **backend_options)
     57 finally:
     58     if token:

File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:233, in run(func, debug, use_uvloop, policy, *args)
    230         del _task_states[task]
    232 _maybe_set_event_loop_policy(policy, use_uvloop)
--> 233 return native_run(wrapper(), debug=debug)

File /usr/lib/python3.9/asyncio/runners.py:44, in run(main, debug)
     42     if debug is not None:
     43         loop.set_debug(debug)
---> 44     return loop.run_until_complete(main)
     45 finally:
     46     try:

File /usr/lib/python3.9/asyncio/base_events.py:642, in BaseEventLoop.run_until_complete(self, future)
    639 if not future.done():
    640     raise RuntimeError('Event loop stopped before Future completed.')
--> 642 return future.result()

File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:228, in run.<locals>.wrapper()
    225     task.set_name(task_state.name)
    227 try:
--> 228     return await func(*args)
    229 finally:
    230     del _task_states[task]

File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/client.py:82, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     80 async with client_context as client:
     81     kwargs.setdefault("client", client)
---> 82     return await fn(*args, **kwargs)

File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/deployments.py:222, in create_deployment_from_spec(spec, client)
    219 flow_id = await client.create_flow(spec.flow)
    221 if spec.push_to_server:
--> 222     with open(spec.flow_location, "rb") as flow_file:
    223         flow_data = await client.persist_data(flow_file.read())
    224 else:

TypeError: expected str, bytes or os.PathLike object, not NoneType
my other guess would be I set up the orion local server incorrectly somehow
z

Zanie

03/22/2022, 10:25 PM
I just tested with the latest build. Here it seems like we've somehow failed to infer the flow location from the flow object
The complaint is that the spec flow location is null
d

Daniel Chapsky

03/22/2022, 10:32 PM
it is null in this case, this is run within an ipython session
sorry, forgot to specify that part earlier
but it’s within the same directory as the env and where the server is run from
in prefect v1 flow.register() worked with this approach with no issue
(unless it’s supposed to infer location even in that context)
z

Zanie

03/22/2022, 10:34 PM
Oh if you're in an ipython session we can't infer a flow location from the object so yeah it'll break
d

Daniel Chapsky

03/22/2022, 10:34 PM
yeah that makes sense. and in prefect v1. did flow.register do some pickle magic to make that work?
z

Zanie

03/22/2022, 10:34 PM
V1 pickles flows and stores them in the file system. V2 doesn't automatically pickle flows for you yet.
d

Daniel Chapsky

03/22/2022, 10:34 PM
ah ok
how hard would it be for me to roll my own pickler?
z

Zanie

03/22/2022, 10:35 PM
Instead, we infer the path to the source code from python object metadata and try to read the source from there.
Easy, but it'll change in the near future.
d

Daniel Chapsky

03/22/2022, 10:36 PM
any chance you could point me towards a short term pickle solution and I’ll update whenever it changes? (this would allow me to keep testing stuff without having to change my current setup too much)
(also if your team sold swag that said “We Do Pickle Magic” I’d buy it)
z

Zanie

03/22/2022, 10:37 PM
lol
So if we take a look at the source code for the function you’re working with
@sync_compatible
@inject_client
async def create_deployment_from_spec(
    spec: DeploymentSpec, client: OrionClient
) -> UUID:
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    if spec.push_to_server:
        with open(spec.flow_location, "rb") as flow_file:
            flow_data = await client.persist_data(flow_file.read())
    else:
        flow_data = DataDocument(encoding="file", blob=spec.flow_location.encode())

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id
You can see where the
flow_data
document gets created
You’ll want that to be a pickle document instead
@sync_compatible
@inject_client
async def create_pickled_flow_deployment(
    spec: DeploymentSpec, client: OrionClient
) -> UUID:
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    flow_data = DataDocument.encode("cloudpickle", spec.flow)

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id
d

Daniel Chapsky

03/22/2022, 10:39 PM
super dope, I’ll try it out
z

Zanie

03/22/2022, 10:39 PM
Oops the encoder is
"cloudpickle"
not “pickle”
Edited to fix
This uses our pickle serializer
@register_serializer("cloudpickle")
class PickleSerializer:
    """
    Serializes arbitrary objects using the pickle protocol.

    Wraps `cloudpickle` to encode bytes in base64 for safe transmission.
    """

    @staticmethod
    def dumps(data: Any) -> bytes:
        data_bytes = cloudpickle.dumps(data)
        return base64.encodebytes(data_bytes)

    @staticmethod
    def loads(blob: bytes) -> Any:
        return cloudpickle.loads(base64.decodebytes(blob))
d

Daniel Chapsky

03/22/2022, 10:41 PM
quick Q (apologies if this is naive, all the python I know comes from needing it for data science, so missed some stuff) - can I drop the async prefix + decorators if I’m happy with it running serially?
Or should I keep those as is just to play nice with everything else
z

Zanie

03/22/2022, 10:42 PM
You need the async stuff for now
👍 1
There’s no synchronous client interface yet so anytime you want to use the API you need an async function
d

Daniel Chapsky

03/22/2022, 10:43 PM
Does that mean I’ll also need to pass in the OrionClient excplicitly?
z

Zanie

03/22/2022, 10:43 PM
But the
@sync_compatible
decorator lets you just call it without awaiting
The
@inject_client
retrieves and passes a client for you
d

Daniel Chapsky

03/22/2022, 10:43 PM
sweet
Still getting the same issue, maybe I’m calling the function incorrectly? (really appreciate your help btw)
from prefect.utilities.asyncio import sync
from prefect import flow
from prefect.deployments import DeploymentSpec, create_deployment_from_spec, sync_compatible, inject_client, OrionClient


@sync_compatible
@inject_client
async def create_pickled_flow_deployment(
    spec: DeploymentSpec, client: OrionClient
):
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    flow_data = DataDocument.encode("cloudpickle", spec.flow)

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id


@flow
def hello_world(name):
    print(f"Hello {name}!")

spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
sync(create_deployment_from_spec(spec=spec))
z

Zanie

03/22/2022, 10:55 PM
You’re calling the old function
I named it something new
d

Daniel Chapsky

03/22/2022, 10:56 PM
ha my bd
alright, I think I’m almost there. So I’m getting a UUID back from the function, but i’m not seeing it get picked up by my agents / server
I’ve tried a couple of variations of work-queues and agent runs
is there anything special I’ll need to do to shoot this up into orion? Maybe call the flow from the deployment object?
z

Zanie

03/22/2022, 11:12 PM
You shouldn’t
Are you creating the deployment with the same API your agents are using?
d

Daniel Chapsky

03/22/2022, 11:14 PM
I’ve been calling them from the CLI within the same virtual env. Should I be calling them from the python session somehow?
z

Zanie

03/22/2022, 11:14 PM
Is this all on one machine?
d

Daniel Chapsky

03/22/2022, 11:14 PM
yep
oh, should I set that push server field to false
z

Zanie

03/22/2022, 11:20 PM
❯ python flow.py       
Created deployment hello-world/test
                                                                                                                                                                   orion-dev-38
❯ prefect deployment execute hello-world/test
18:19:26.788 | INFO    | prefect.engine - Created flow run 'russet-anaconda' for flow 'hello-world'
18:19:26.789 | INFO    | Flow run 'russet-anaconda' - Using task runner 'ConcurrentTaskRunner'
18:19:26.789 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
18:19:26.800 | DEBUG   | Flow run 'russet-anaconda' - Executing flow 'hello-world' for flow run 'russet-anaconda'...
18:19:26.801 | DEBUG   | Flow run 'russet-anaconda' - Beginning execution...
Hello Marvin!
/Users/mz/dev/orion/src/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
18:19:26.831 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner...
18:19:26.831 | INFO    | Flow run 'russet-anaconda' - Finished in state Completed(None)
                                                                                                                                                                    orion-dev-38
❯ prefect deployment run hello-world/test                    
Created flow run 'fuzzy-wildebeest' (fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab)
                                                                                                                                                                   orion-dev-38
❯ prefect work-queue create default      
UUID('2b1b8923-8885-4630-a5de-2d63d655cd3b')
                                                                                                                                                                   orion-dev-38
❯ prefect agent start default      
Starting agent with ephemeral API...

  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|


Agent started! Looking for work from queue 'default'...
18:19:53.055 | DEBUG   | prefect.agent - Checking for flow runs...
18:19:53.089 | INFO    | prefect.agent - Submitting flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'
18:19:53.105 | INFO    | prefect.flow_runner.subprocess - Opening subprocess for flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'...
18:19:53.105 | DEBUG   | prefect.flow_runner.subprocess - Using command: /opt/homebrew/Caskroom/miniconda/base/envs/orion-dev-38/bin/python -m prefect.engine fb6e4976a68e44a7b3d3c4f2e1f0b1ab
18:19:53.114 | INFO    | prefect.agent - Completed submission of flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'
18:19:54.806 | INFO    | prefect.flow_runner.subprocess - Subprocess for flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab' exited cleanly.
from prefect import flow
from prefect.deployments import (
    DataDocument,
    DeploymentSpec,
    OrionClient,
    inject_client,
    sync_compatible,
)


@sync_compatible
@inject_client
async def create_pickled_flow_deployment(spec: DeploymentSpec, client: OrionClient):
    """
    Create a deployment from a specification.
    """
    spec.load_flow()
    flow_id = await client.create_flow(spec.flow)

    flow_data = DataDocument.encode("cloudpickle", spec.flow)

    deployment_id = await client.create_deployment(
        flow_id=flow_id,
        name=spec.name,
        schedule=spec.schedule,
        flow_data=flow_data,
        parameters=spec.parameters,
        tags=spec.tags,
        flow_runner=spec.flow_runner,
    )

    return deployment_id


@flow
def hello_world(name):
    print(f"Hello {name}!")


spec = DeploymentSpec(
    flow=hello_world,
    name="test",
    parameters={"name": "Marvin"},
    tags=["foo", "bar"],
)

create_pickled_flow_deployment(spec)
print(f"Created deployment {spec.flow.name}/{spec.name}")
d

Daniel Chapsky

03/23/2022, 12:47 AM
And we’re good! thanks so much
v

Vladimir Bolshakov

03/23/2022, 7:04 AM
Thank you for useful thread, guys!
Instead, we infer the path to the source code from python object metadata and try to read the source from there.
Easy, but it’ll change in the near future.
Michael, what changes about exchange of flow data will be implemented in near future?
z

Zanie

03/23/2022, 3:20 PM
I’m working on improving flow storage / deployments
Basically making it easier to use and more powerful so we can support common patterns like GitHub storage