Charles Leung
05/14/2024, 9:53 PMMarvin
05/14/2024, 9:53 PMwork_pool_name
in your deployment configuration.
3. Start a process-type worker to poll the work pool for scheduled runs. You can do that by running the following command:
prefect worker start -t process
This starts a worker that can execute flow runs within a subprocess on the machine where the worker is running.
4. Ensure your deployment is configured to use the work pool that this worker will poll by updating the deployment's work_pool_name
to match the worker's pool.
Please see the documentation on workers and work pools for more detailed instructions and examples.
Remember that workers must match the work pool type they are polling, so make sure that your process-type worker is associated with a process-type work pool.Charles Leung
05/14/2024, 9:54 PMMarvin
05/14/2024, 9:55 PMprefect server start
This will start Prefect Server along with the UI and all the necessary Prefect Core services.
2. Define your flow in Python using the Prefect API.
3. Specify the work pool that your deployment should use. This work pool should be of the process type to match with your process type worker. For example:
python
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
DeploymentSpec(
flow_location="/path/to/flow.py",
name="example-deployment",
work_pool_name="my-process-work-pool",
flow_runner=SubprocessFlowRunner(),
)
Replace /path/to/flow.py
with the location of your flow file and my-process-work-pool
with the name of your work pool.
4. Start your process type worker that will pick up the flow runs from the specified work pool. Use the following command:
prefect worker start -t process
5. Use the Prefect UI or Prefect CLI to create and manage your deployment.
In the command above, it is assumed that a work pool with the name my-process-work-pool
has been set up to handle process type work. If such a work pool does not exist, you will need to create it first.
For more information, please refer to the Prefect Deployment documentation and workers and work pools for detailed instructions and examples.
Please note that it is crucial to align the type of the work pool with the worker you're using to ensure that your flow runs are correctly picked up and executed.Charles Leung
05/14/2024, 10:12 PMMarvin
05/14/2024, 10:12 PM.deploy()
. My apologies for this oversight, although it does offer a glimpse at handling asynchronous functions within a flow. To properly address your question regarding deployment to a process type worker using the .deploy()
method, here's an example that includes steps for deployment:
Let's assume you have a flow:
python
from prefect import flow
@flow
def example_flow():
print("This is an example flow.")
To deploy the example_flow
to be picked up by a process type worker, you would:
python
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
# Create a deployment for the flow
DeploymentSpec(
flow=example_flow,
name="process-worker-deployment",
flow_runner=SubprocessFlowRunner(),
work_pool_name="my-process-work-pool"
)
# The following code creates the deployment spec in the local file system instead of Prefect Cloud.
example_flow.deploy(
name="process-worker-deployment",
work_pool_name="my-process-work-pool",
flow_runner=SubprocessFlowRunner()
)
Replace my-process-work-pool
with the name of the process type work pool you have set up in your Prefect Server or Prefect Cloud account.
Make sure that the process-type worker(s) have been correctly started and configured to pull work from the my-process-work-pool
to execute the flow runs.
For the latest information on deployment types and worker configuration, consult the Prefect documentation.Charles Leung
05/14/2024, 10:31 PMNate
05/14/2024, 10:31 PMMarvin
05/14/2024, 10:31 PMModuleNotFoundError
did not specifically address your issue, and the documentation did not provide a direct solution either.
To clarify the appropriate way to deploy a flow that will be picked up by a process type worker, you should follow documentation for the current version of Prefect you are using. Since Prefect 2.x is new and different from prior versions, ensure you are following the guides for Prefect 2.x only.
Please verify the import paths and module names from the official Prefect documentation that match your version.
If you are using a module or feature that no longer exists or has been replaced, the documentation will guide you to the new usage patterns and necessary imports. If you are currently working with Prefect 2.x, some modules from Prefect 1.x like prefect.flow_runners
might no longer apply, and you might need to use new constructs like flow.deploy()
with proper parameters specific to Prefect 2.x.
In case the problem persists, consider seeking assistance from the Prefect community on their Slack channel or forum, where developers and other users can help with such issues.Nate
05/14/2024, 10:32 PMwork_pool_name
with .deploy
did you hit an issue with that?Charles Leung
05/14/2024, 10:36 PMNate
05/14/2024, 10:40 PMfrom_source
is forCharles Leung
05/14/2024, 10:42 PMCharles Leung
05/14/2024, 10:43 PMdeployment = Deployment.build_from_flow(
flow=test_100,
name="kdb_robustness_sleep_100_tasks",
parameters={
"testcase": "100 tasks concurrently",
},
)
deployment.apply()
and it worked without needing to specify the location of the code. WAnted to see if this could be replicatedNate
05/14/2024, 10:48 PMCharles Leung
05/14/2024, 10:49 PMNate
05/14/2024, 10:55 PMserve
to avoid the need for a process worker at all, but with many deployments a worker is convenient
if you don't want to do docker, I would explore a remote storage location like github for your flow code (which can be private) so that you can write a pull
step that your process worker can use to get the flow code wherever it happens to be runningCharles Leung
05/14/2024, 11:06 PMCharles Leung
05/14/2024, 11:07 PMCharles Leung
05/14/2024, 11:24 PMNate
05/15/2024, 12:28 AM.serve
but when serving you need to know what deployments you have when you start the serve process like serve(*list_of_deployments)
> single deployment with multiple agents, I could still have that deployment's job distributed amongst the agent
workers dont necessarily load balance like that, and I dont believe agents were guaranteed to either, but background tasks do if you're curious.
concretely I would recommend:
• push your code to a place you can pull
it down at runtime (like github, gitlab, bitbucket, s3 etc) - then you could happily use from_source().deploy()
etc
• keeping only local code, using prefect.yaml
and define a pull step for deployments that sets the working directory to the right place for that flow so the worker knows where to get your local flow code
• use serve(*many_deployments)
where you can set <http://my_flow.to|my_flow.to>_deployment(..., storage=LocalFilesystem(basepath="to/my/flow"))
that second path would look like this
# /Users/nate/github.com/prefecthq/prefect/flows/proofs/local_storage.py
from prefect import flow
@flow(log_prints=True)
def test():
print("Hello, world!")
so i set an absolute path in the pull
step so my worker is set up to read the entrypoint
I give
# /Users/nate/github.com/prefecthq/prefect/prefect.yaml
deployments:
- name: local_test
entrypoint: flows/proofs/local_storage.py:test
work_pool:
name: local
pull:
- prefect.deployments.steps.set_working_directory:
directory: /Users/nate/github.com/prefecthq/prefect
(prefect) nate :: ~/github.com/prefecthq/prefect ‹main›
» prefect deploy -n local_test
...
» prefect worker start --pool 'local'
Discovered type 'process' for work pool 'local'.
Worker 'ProcessWorker 3f9243f5-827d-43b0-af34-e9ff02c6c94b' started!
19:26:14.317 | INFO | prefect.flow_runs.worker - Worker 'ProcessWorker 3f9243f5-827d-43b0-af34-e9ff02c6c94b' submitting flow run 'c1690c0b-7613-4b05-9c50-3318ed8368e3'
> Running set_working_directory step...
19:26:14.764 | INFO | prefect.flow_runs.worker - Opening process...
19:26:14.926 | INFO | prefect.flow_runs.worker - Completed submission of flow run 'c1690c0b-7613-4b05-9c50-3318ed8368e3'
19:26:17.619 | INFO | Flow run 'peculiar-reindeer' - Hello, world!
19:26:17.756 | INFO | Flow run 'peculiar-reindeer' - Finished in state Completed()
19:26:18.336 | INFO | prefect.flow_runs.worker - Process 64343 exited cleanly
Charles Leung
05/15/2024, 3:58 PM