Daniel Chapsky
03/22/2022, 9:32 PMfrom prefect import flow
from prefect.deployments import DeploymentSpec, create_deployment_from_spec
@flow
def hello_world(name):
print(f"Hello {name}!")
spec = DeploymentSpec(
flow=hello_world,
name="inline-deployment",
parameters={"name": "Marvin"},
tags=["foo", "bar"],
)
create_deployment_from_spec(spec=spec)
expected str, bytes or os.PathLike object, not NoneType
because it seems the create_deployment_from_spec
function is expecting 2 argumentsZanie
03/22/2022, 9:38 PMDaniel Chapsky
03/22/2022, 10:14 PMIn [4]: @flow
...: def hello_world(name):
...: print(f"Hello {name}!") ...:
In [5]: spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
...: create_deployment_from_spec(spec=spec)
...:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [5], in <cell line: 2>()
1 spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
----> 2 create_deployment_from_spec(spec=spec)
File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/utilities/asyncio.py:120, in sync_compatible.<locals>.wrapper(*args, **kw
args)
116 return run_async_from_worker_thread(async_fn, *args, **kwargs)
117 else:
118 # In a sync context and there is no event loop; just create an event loop
File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/utilities/asyncio.py:67, in run_async_in_new_loop(__fn, *args, **kwargs)
66 def run_async_in_new_loop(__fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any):
---> 67 return anyio.run(partial(__fn, *args, **kwargs))
File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_core/_eventloop.py:56, in run(func, backend, backend_options, *args)
54 try:
55 backend_options = backend_options or {}
---> 56 return asynclib.run(func, *args, **backend_options)
57 finally:
58 if token:
File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:233, in run(func, debug, use_uvloop, policy, *args)
230 del _task_states[task]
232 _maybe_set_event_loop_policy(policy, use_uvloop)
--> 233 return native_run(wrapper(), debug=debug)
File /usr/lib/python3.9/asyncio/runners.py:44, in run(main, debug)
42 if debug is not None:
43 loop.set_debug(debug)
---> 44 return loop.run_until_complete(main)
45 finally:
46 try:
File /usr/lib/python3.9/asyncio/base_events.py:642, in BaseEventLoop.run_until_complete(self, future)
639 if not future.done():
640 raise RuntimeError('Event loop stopped before Future completed.')
--> 642 return future.result()
File ~/dev/hkg-data/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:228, in run.<locals>.wrapper()
225 task.set_name(task_state.name)
227 try:
--> 228 return await func(*args)
229 finally:
230 del _task_states[task]
File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/client.py:82, in inject_client.<locals>.with_injected_client(*args, **kwargs)
80 async with client_context as client:
81 kwargs.setdefault("client", client)
---> 82 return await fn(*args, **kwargs)
File ~/dev/hkg-data/env/lib/python3.9/site-packages/prefect/deployments.py:222, in create_deployment_from_spec(spec, client)
219 flow_id = await client.create_flow(spec.flow)
221 if spec.push_to_server:
--> 222 with open(spec.flow_location, "rb") as flow_file:
223 flow_data = await client.persist_data(flow_file.read())
224 else:
TypeError: expected str, bytes or os.PathLike object, not NoneType
Zanie
03/22/2022, 10:25 PMDaniel Chapsky
03/22/2022, 10:32 PMZanie
03/22/2022, 10:34 PMDaniel Chapsky
03/22/2022, 10:34 PMZanie
03/22/2022, 10:34 PMDaniel Chapsky
03/22/2022, 10:34 PMZanie
03/22/2022, 10:35 PMDaniel Chapsky
03/22/2022, 10:36 PMZanie
03/22/2022, 10:37 PM@sync_compatible
@inject_client
async def create_deployment_from_spec(
spec: DeploymentSpec, client: OrionClient
) -> UUID:
"""
Create a deployment from a specification.
"""
spec.load_flow()
flow_id = await client.create_flow(spec.flow)
if spec.push_to_server:
with open(spec.flow_location, "rb") as flow_file:
flow_data = await client.persist_data(flow_file.read())
else:
flow_data = DataDocument(encoding="file", blob=spec.flow_location.encode())
deployment_id = await client.create_deployment(
flow_id=flow_id,
name=spec.name,
schedule=spec.schedule,
flow_data=flow_data,
parameters=spec.parameters,
tags=spec.tags,
flow_runner=spec.flow_runner,
)
return deployment_id
flow_data
document gets created@sync_compatible
@inject_client
async def create_pickled_flow_deployment(
spec: DeploymentSpec, client: OrionClient
) -> UUID:
"""
Create a deployment from a specification.
"""
spec.load_flow()
flow_id = await client.create_flow(spec.flow)
flow_data = DataDocument.encode("cloudpickle", spec.flow)
deployment_id = await client.create_deployment(
flow_id=flow_id,
name=spec.name,
schedule=spec.schedule,
flow_data=flow_data,
parameters=spec.parameters,
tags=spec.tags,
flow_runner=spec.flow_runner,
)
return deployment_id
Daniel Chapsky
03/22/2022, 10:39 PMZanie
03/22/2022, 10:39 PM"cloudpickle"
not “pickle”@register_serializer("cloudpickle")
class PickleSerializer:
"""
Serializes arbitrary objects using the pickle protocol.
Wraps `cloudpickle` to encode bytes in base64 for safe transmission.
"""
@staticmethod
def dumps(data: Any) -> bytes:
data_bytes = cloudpickle.dumps(data)
return base64.encodebytes(data_bytes)
@staticmethod
def loads(blob: bytes) -> Any:
return cloudpickle.loads(base64.decodebytes(blob))
Daniel Chapsky
03/22/2022, 10:41 PMZanie
03/22/2022, 10:42 PMDaniel Chapsky
03/22/2022, 10:43 PMZanie
03/22/2022, 10:43 PM@sync_compatible
decorator lets you just call it without awaiting@inject_client
retrieves and passes a client for youDaniel Chapsky
03/22/2022, 10:43 PMfrom prefect.utilities.asyncio import sync
from prefect import flow
from prefect.deployments import DeploymentSpec, create_deployment_from_spec, sync_compatible, inject_client, OrionClient
@sync_compatible
@inject_client
async def create_pickled_flow_deployment(
spec: DeploymentSpec, client: OrionClient
):
"""
Create a deployment from a specification.
"""
spec.load_flow()
flow_id = await client.create_flow(spec.flow)
flow_data = DataDocument.encode("cloudpickle", spec.flow)
deployment_id = await client.create_deployment(
flow_id=flow_id,
name=spec.name,
schedule=spec.schedule,
flow_data=flow_data,
parameters=spec.parameters,
tags=spec.tags,
flow_runner=spec.flow_runner,
)
return deployment_id
@flow
def hello_world(name):
print(f"Hello {name}!")
spec = DeploymentSpec(flow=hello_world, name="inline-deployment", parameters={"name": "Marvin"}, tags=["foo", "bar"],)
sync(create_deployment_from_spec(spec=spec))
Zanie
03/22/2022, 10:55 PMDaniel Chapsky
03/22/2022, 10:56 PMZanie
03/22/2022, 11:12 PMDaniel Chapsky
03/22/2022, 11:14 PMZanie
03/22/2022, 11:14 PMDaniel Chapsky
03/22/2022, 11:14 PMZanie
03/22/2022, 11:20 PM❯ python flow.py
Created deployment hello-world/test
orion-dev-38
❯ prefect deployment execute hello-world/test
18:19:26.788 | INFO | prefect.engine - Created flow run 'russet-anaconda' for flow 'hello-world'
18:19:26.789 | INFO | Flow run 'russet-anaconda' - Using task runner 'ConcurrentTaskRunner'
18:19:26.789 | DEBUG | prefect.task_runner.concurrent - Starting task runner...
18:19:26.800 | DEBUG | Flow run 'russet-anaconda' - Executing flow 'hello-world' for flow run 'russet-anaconda'...
18:19:26.801 | DEBUG | Flow run 'russet-anaconda' - Beginning execution...
Hello Marvin!
/Users/mz/dev/orion/src/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
18:19:26.831 | DEBUG | prefect.task_runner.concurrent - Shutting down task runner...
18:19:26.831 | INFO | Flow run 'russet-anaconda' - Finished in state Completed(None)
orion-dev-38
❯ prefect deployment run hello-world/test
Created flow run 'fuzzy-wildebeest' (fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab)
orion-dev-38
❯ prefect work-queue create default
UUID('2b1b8923-8885-4630-a5de-2d63d655cd3b')
orion-dev-38
❯ prefect agent start default
Starting agent with ephemeral API...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue 'default'...
18:19:53.055 | DEBUG | prefect.agent - Checking for flow runs...
18:19:53.089 | INFO | prefect.agent - Submitting flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'
18:19:53.105 | INFO | prefect.flow_runner.subprocess - Opening subprocess for flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'...
18:19:53.105 | DEBUG | prefect.flow_runner.subprocess - Using command: /opt/homebrew/Caskroom/miniconda/base/envs/orion-dev-38/bin/python -m prefect.engine fb6e4976a68e44a7b3d3c4f2e1f0b1ab
18:19:53.114 | INFO | prefect.agent - Completed submission of flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab'
18:19:54.806 | INFO | prefect.flow_runner.subprocess - Subprocess for flow run 'fb6e4976-a68e-44a7-b3d3-c4f2e1f0b1ab' exited cleanly.
from prefect import flow
from prefect.deployments import (
DataDocument,
DeploymentSpec,
OrionClient,
inject_client,
sync_compatible,
)
@sync_compatible
@inject_client
async def create_pickled_flow_deployment(spec: DeploymentSpec, client: OrionClient):
"""
Create a deployment from a specification.
"""
spec.load_flow()
flow_id = await client.create_flow(spec.flow)
flow_data = DataDocument.encode("cloudpickle", spec.flow)
deployment_id = await client.create_deployment(
flow_id=flow_id,
name=spec.name,
schedule=spec.schedule,
flow_data=flow_data,
parameters=spec.parameters,
tags=spec.tags,
flow_runner=spec.flow_runner,
)
return deployment_id
@flow
def hello_world(name):
print(f"Hello {name}!")
spec = DeploymentSpec(
flow=hello_world,
name="test",
parameters={"name": "Marvin"},
tags=["foo", "bar"],
)
create_pickled_flow_deployment(spec)
print(f"Created deployment {spec.flow.name}/{spec.name}")
Daniel Chapsky
03/23/2022, 12:47 AMVladimir Bolshakov
03/23/2022, 7:04 AMInstead, we infer the path to the source code from python object metadata and try to read the source from there.
Easy, but it’ll change in the near future.Michael, what changes about exchange of flow data will be implemented in near future?
Zanie
03/23/2022, 3:20 PM