Colton
04/17/2024, 9:10 PMMarvin
04/17/2024, 9:10 PMDELETE FROM flow_run_notificaion_queue
query in the context of Prefect. This might be a less common error, or it could be something specific to your setup.
A "database is locked" error typically indicates that a SQLite database is in use, and another process is currently writing to it, which prevents your current process from executing the delete operation.
Here are a few steps you can try:
1. Ensure that no other processes are interacting with the database when the delete operation is attempted.
2. Check for any long-running transactions that may not have been committed or rolled back.
3. If possible, switch to a more robust database backend like PostgreSQL for production workloads to avoid such locking issues.
If these general suggestions don't resolve your issue, you may want to provide more context or directly seek help from the Prefect community, possibly by creating a new issue on the GitHub repository if you suspect this is a bug.Colton
04/17/2024, 9:22 PMMarvin
04/17/2024, 9:22 PMpython
import time
from prefect import flow, task
@task
def sleep(t: int):
print(f"Going to sleep for {t} seconds...")
time.sleep(t)
print("...and I'm awake!")
@flow(log_prints=True)
def my_flow():
"""flows run submitted tasks concurrently by default"""
sleep(3) # block the flow for 3 seconds (not concurrently)
future = sleep.submit(2) # submit a task run to the task runner, doesn't block flow
future.wait() # block the flow until the submitted task run is complete
sleep.map([5] * 10) # submit 10 at once, each sleeps for 5 seconds, don't block
# even though the flow is done, we won't exit until all the task runs are complete
if __name__ == "__main__":
my_flow()
In this example, the initial sleep(3)
enforces a sequential block in the flow for 3 seconds.
For sequential task runs, you can forego submit
or map
for task execution. Simply call the task function with required arguments, just as sleep(3)
is called, and it will run sequentially.
Please note that .submit()
and .map()
are used for concurrent execution and generally don't guarantee order unless combined with additional synchronization mechanisms such as future.wait()
, which would force sequential behavior but is usually not ideal for this purpose.
Here is the link to the code example for your reference.Colton
04/17/2024, 10:08 PMMarvin
04/17/2024, 10:08 PMColton
04/17/2024, 11:20 PMMarvin
04/17/2024, 11:20 PMprefect.deployments.Deployment
class. Below is an example of how you can create a new deployment using configuration defaults for an imported flow:
python
from my_project.flows import my_flow
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="example-deployment",
version=1,
work_queue_name="demo",
work_pool_name="default-agent-pool",
)
deployment.apply()
In this example, my_flow
is a flow that you have defined in your project. You set the deployment name, version, work queue, and work pool name. After that, you use deployment.apply()
to apply the deployment configuration.
If you have a pre-defined storage block, for instance, an S3 bucket, and you'd like to use it along with an infrastructure override, this is how you could define the deployment:
python
from my_project.flows import my_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3
storage = S3.load("dev-bucket") # load a pre-defined block
deployment = Deployment.build_from_flow(
flow=my_flow,
name="s3-example",
version=2,
work_queue_name="prod",
storage=storage
# ... you can include other overrides like infrastructure, schedule, etc.
)
deployment.apply()
You can find detailed instructions on Deployments in the Prefect documentation.
Please replace my_project.flows
and my_flow
with the actual package and flow names from your project, and adjust the parameters such as name
, version
, work_queue_name
, work_pool_name
, and storage
to your needs.Colton
04/18/2024, 4:01 PMMarvin
04/18/2024, 4:02 PMdeployment.yaml
file, you need to add a schedules
section to the file. Below is an example illustrating how to specify schedules in a deployment.yaml
, which controls the scheduled runs of your deployment:
yaml
deployments:
...
schedules:
- cron: "0 0 * * *"
timezone: "America/Chicago"
active: false
- cron: "0 12 * * *"
timezone: "America/New_York"
active: true
- cron: "0 18 * * *"
timezone: "Europe/London"
active: true
In this snippet, the schedules
section is an array of schedule configurations with cron
expressions that define the interval at which the deployment should run. Each schedule also specifies a timezone
and whether the schedule is active
or not. The cron
syntax follows the standard pattern of minute, hour, day of the month, month, and day of the week.
For more information on creating schedules, you can refer to the following sections of the Prefect documentation:
- Schedules Overview
- Guide to Prefect Deploy commands and prefect.yaml
Remember to replace the cron expressions and timezones with the values that suit your scheduling needs.