<@ULVA73B9P> How do I run .deploy to a process typ...
# ask-marvin
c
@Marvin How do I run .deploy to a process type worker
m
To run a deployment using a process-type worker in Prefect, you need to set up a work pool and associate your deployment with it. Then start a worker that matches the type of the work pool. Here are the steps you'd typically follow: 1. Define a work pool in Prefect that corresponds to the process execution environment. This work pool will define the configuration necessary to run flow runs within that environment. 2. Create a deployment associated with that work pool. Make sure to specify the appropriate
work_pool_name
in your deployment configuration. 3. Start a process-type worker to poll the work pool for scheduled runs. You can do that by running the following command:
Copy code
prefect worker start -t process
This starts a worker that can execute flow runs within a subprocess on the machine where the worker is running. 4. Ensure your deployment is configured to use the work pool that this worker will poll by updating the deployment's
work_pool_name
to match the worker's pool. Please see the documentation on workers and work pools for more detailed instructions and examples. Remember that workers must match the work pool type they are polling, so make sure that your process-type worker is associated with a process-type work pool.
c
@Marvin How to deploy a flow to the Prefect server to be picked up by a process type worker
m
To deploy a flow to Prefect Server and have it picked up by a process type worker, follow these steps: 1. Ensure you have Prefect Server running. If not, launch Prefect Server by executing the following command in your terminal:
Copy code
prefect server start
This will start Prefect Server along with the UI and all the necessary Prefect Core services. 2. Define your flow in Python using the Prefect API. 3. Specify the work pool that your deployment should use. This work pool should be of the process type to match with your process type worker. For example:
Copy code
python
    from prefect.deployments import DeploymentSpec
    from prefect.flow_runners import SubprocessFlowRunner

    DeploymentSpec(
        flow_location="/path/to/flow.py",
        name="example-deployment",
        work_pool_name="my-process-work-pool",
        flow_runner=SubprocessFlowRunner(),
    )
Replace
/path/to/flow.py
with the location of your flow file and
my-process-work-pool
with the name of your work pool. 4. Start your process type worker that will pick up the flow runs from the specified work pool. Use the following command:
Copy code
prefect worker start -t process
5. Use the Prefect UI or Prefect CLI to create and manage your deployment. In the command above, it is assumed that a work pool with the name
my-process-work-pool
has been set up to handle process type work. If such a work pool does not exist, you will need to create it first. For more information, please refer to the Prefect Deployment documentation and workers and work pools for detailed instructions and examples. Please note that it is crucial to align the type of the work pool with the worker you're using to ensure that your flow runs are correctly picked up and executed.
c
@Marvin Using .deploy() to a process type worker
m
It seems that the code example I retrieved doesn't directly address deploying to a process type worker using
.deploy()
. My apologies for this oversight, although it does offer a glimpse at handling asynchronous functions within a flow. To properly address your question regarding deployment to a process type worker using the
.deploy()
method, here's an example that includes steps for deployment: Let's assume you have a flow:
Copy code
python
from prefect import flow

@flow
def example_flow():
    print("This is an example flow.")
To deploy the
example_flow
to be picked up by a process type worker, you would:
Copy code
python
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner

# Create a deployment for the flow
DeploymentSpec(
    flow=example_flow,
    name="process-worker-deployment",
    flow_runner=SubprocessFlowRunner(),
    work_pool_name="my-process-work-pool"
)

# The following code creates the deployment spec in the local file system instead of Prefect Cloud.
example_flow.deploy(
    name="process-worker-deployment",
    work_pool_name="my-process-work-pool",
    flow_runner=SubprocessFlowRunner()
)
Replace
my-process-work-pool
with the name of the process type work pool you have set up in your Prefect Server or Prefect Cloud account. Make sure that the process-type worker(s) have been correctly started and configured to pull work from the
my-process-work-pool
to execute the flow runs. For the latest information on deployment types and worker configuration, consult the Prefect documentation.
c
@Marvin ModuleNotFoundError: No module named 'prefect.flow_runners'
n
yeah this is not good output from marvin
m
It appears there was a misunderstanding regarding the module name for flow runners. The correct module for deploying with flow runners doesn't seem to be `prefect.flow_runners`; the GitHub issue related to a
ModuleNotFoundError
did not specifically address your issue, and the documentation did not provide a direct solution either. To clarify the appropriate way to deploy a flow that will be picked up by a process type worker, you should follow documentation for the current version of Prefect you are using. Since Prefect 2.x is new and different from prior versions, ensure you are following the guides for Prefect 2.x only. Please verify the import paths and module names from the official Prefect documentation that match your version. If you are using a module or feature that no longer exists or has been replaced, the documentation will guide you to the new usage patterns and necessary imports. If you are currently working with Prefect 2.x, some modules from Prefect 1.x like
prefect.flow_runners
might no longer apply, and you might need to use new constructs like
flow.deploy()
with proper parameters specific to Prefect 2.x. In case the problem persists, consider seeking assistance from the Prefect community on their Slack channel or forum, where developers and other users can help with such issues.
n
you should be able to just specify a process work pool as
work_pool_name
with
.deploy
did you hit an issue with that?
c
I ran it like this: asyncio.run( test_100_worker.deploy( work_pool_name="default-worker-pool", work_queue_name="default", name="kdb_robustness_sleep_100_tasks", parameters={ "testcase": "100 tasks concurrently", }, ) ) But I get an error - ValueError: Either an image or remote storage location must be provided when deploying a deployment.
n
yep okay that makes sense you essentially need to tell the worker where to get the code from, since your process worker might not have direct access to your disk where the code is do you keep code in github? this is what
from_source
is for
c
Its just kept OnPrem in a folder, i can use from_source to specify the locaton of the code?
previously, i always just used this when I was using agents
Copy code
deployment = Deployment.build_from_flow(
        flow=test_100,
        name="kdb_robustness_sleep_100_tasks",
        parameters={
            "testcase": "100 tasks concurrently",
        },
    )
    deployment.apply()
and it worked without needing to specify the location of the code. WAnted to see if this could be replicated
n
not exactly. do you use your worker for many deployments or just this one?
c
Many deployments - we essentially want to have these workers spun up so we can make sure we can handle our load in a production level setting, and distribute tasksacross the workers
n
gotcha. this issue is relevant to this if you dont have a remote storage location our recommendation is generally to build an image that bakes in your flow code and run a docker worker i asked about or many because another easy option would be
serve
to avoid the need for a process worker at all, but with many deployments a worker is convenient if you don't want to do docker, I would explore a remote storage location like github for your flow code (which can be private) so that you can write a
pull
step that your process worker can use to get the flow code wherever it happens to be running
c
Right. We use an ADO repo to connect to it, so I guess Ill have to configure it to pull from there when needed. We want that this flow to be displayed in the UI, so that it can be triggered whenever via the UI. The files were located locally. This was a functionality that was present in Agents that is now not present in Workers then?
Also one more quqestion - When I run a deployment that contains 100 tasks run concurrently, if I have 10 workers or agents running, are we able to distribute amongst the 10 workers/agents? I ran this before with 10 agents set up, and only one agent picked it up doing all of the tasks concurrently
correct me if im wrong, but I do remember that previously in agents if we were running a single deployment with multiple agents, I could still have that deployment's job distributed amongst the agent. I cant seem to have that feature now when I updated Prefect
n
> This was a functionality that was present in Agents that is now not present in Workers then? this specific functionality when you dont have remote storage is covered by
.serve
but when serving you need to know what deployments you have when you start the serve process like
serve(*list_of_deployments)
> single deployment with multiple agents, I could still have that deployment's job distributed amongst the agent workers dont necessarily load balance like that, and I dont believe agents were guaranteed to either, but background tasks do if you're curious. concretely I would recommend: • push your code to a place you can
pull
it down at runtime (like github, gitlab, bitbucket, s3 etc) - then you could happily use
from_source().deploy()
etc • keeping only local code, using
prefect.yaml
and define a pull step for deployments that sets the working directory to the right place for that flow so the worker knows where to get your local flow code • use
serve(*many_deployments)
where you can set
<http://my_flow.to|my_flow.to>_deployment(..., storage=LocalFilesystem(basepath="to/my/flow"))
that second path would look like this
Copy code
# /Users/nate/github.com/prefecthq/prefect/flows/proofs/local_storage.py

from prefect import flow


@flow(log_prints=True)
def test():
    print("Hello, world!")
so i set an absolute path in the
pull
step so my worker is set up to read the
entrypoint
I give
Copy code
# /Users/nate/github.com/prefecthq/prefect/prefect.yaml

deployments:
- name: local_test
  entrypoint: flows/proofs/local_storage.py:test
  work_pool:
    name: local
  pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /Users/nate/github.com/prefecthq/prefect
Copy code
(prefect) nate :: ~/github.com/prefecthq/prefect ‹main›
» prefect deploy -n local_test

...

» prefect worker start --pool 'local'

Discovered type 'process' for work pool 'local'.
Worker 'ProcessWorker 3f9243f5-827d-43b0-af34-e9ff02c6c94b' started!
19:26:14.317 | INFO    | prefect.flow_runs.worker - Worker 'ProcessWorker 3f9243f5-827d-43b0-af34-e9ff02c6c94b' submitting flow run 'c1690c0b-7613-4b05-9c50-3318ed8368e3'
 > Running set_working_directory step...
19:26:14.764 | INFO    | prefect.flow_runs.worker - Opening process...
19:26:14.926 | INFO    | prefect.flow_runs.worker - Completed submission of flow run 'c1690c0b-7613-4b05-9c50-3318ed8368e3'
19:26:17.619 | INFO    | Flow run 'peculiar-reindeer' - Hello, world!
19:26:17.756 | INFO    | Flow run 'peculiar-reindeer' - Finished in state Completed()
19:26:18.336 | INFO    | prefect.flow_runs.worker - Process 64343 exited cleanly
c
Thanks Nate! I will try this out
222 Views