<@ULVA73B9P> receiving a database is locked error ...
# ask-marvin
c
@Marvin receiving a database is locked error in terminal SQL: DELETE FROM flow_run_notificaion_queue
m
It seems there are no open issues on GitHub related to the "database is locked" error with the specific SQL
DELETE FROM flow_run_notificaion_queue
query in the context of Prefect. This might be a less common error, or it could be something specific to your setup. A "database is locked" error typically indicates that a SQLite database is in use, and another process is currently writing to it, which prevents your current process from executing the delete operation. Here are a few steps you can try: 1. Ensure that no other processes are interacting with the database when the delete operation is attempted. 2. Check for any long-running transactions that may not have been committed or rolled back. 3. If possible, switch to a more robust database backend like PostgreSQL for production workloads to avoid such locking issues. If these general suggestions don't resolve your issue, you may want to provide more context or directly seek help from the Prefect community, possibly by creating a new issue on the GitHub repository if you suspect this is a bug.
c
@Marvin what is the syntax to force sequential runs in tasks
m
To force tasks to run sequentially in a flow using Prefect, you can simply call each task as a regular Python function, and the flow will wait for each task to complete before moving on to the next. Below is an example:
Copy code
python
import time
from prefect import flow, task

@task
def sleep(t: int):
    print(f"Going to sleep for {t} seconds...")
    time.sleep(t)
    print("...and I'm awake!")

@flow(log_prints=True)
def my_flow():
    """flows run submitted tasks concurrently by default"""
    sleep(3) # block the flow for 3 seconds (not concurrently)
    
    future = sleep.submit(2) # submit a task run to the task runner, doesn't block flow
    
    future.wait() # block the flow until the submitted task run is complete
    
    sleep.map([5] * 10) # submit 10 at once, each sleeps for 5 seconds, don't block
    
    # even though the flow is done, we won't exit until all the task runs are complete

if __name__ == "__main__":
    my_flow()
In this example, the initial
sleep(3)
enforces a sequential block in the flow for 3 seconds. For sequential task runs, you can forego
submit
or
map
for task execution. Simply call the task function with required arguments, just as
sleep(3)
is called, and it will run sequentially. Please note that
.submit()
and
.map()
are used for concurrent execution and generally don't guarantee order unless combined with additional synchronization mechanisms such as
future.wait()
, which would force sequential behavior but is usually not ideal for this purpose. Here is the link to the code example for your reference.
c
@Marvin is having tasks within tasks bad practice?
m
The Prefect documentation does not explicitly mention that having tasks within tasks (nested tasks) is a bad practice. However, it is important to consider the principles of Prefect and best practices in workflow design. Tasks in Prefect are meant to represent discrete units of work within a workflow. While you can define tasks within the same file as your flow definition and even within other task definitions, it's essential to maintain clarity, modularity, and manageability of your flows. Here are a few points to consider: 1. Reusability: Define tasks in such a way that they are reusable across different flows or subflows. 2. Observability: Each task should be an observable unit with its state clearly understandable and trackable within the flow run. 3. Dependencies: Tasks are designed to depend on the state of upstream tasks. Make sure that these dependencies are clear and intentional. If "tasks within tasks" results in confusing dependencies, diminished clarity of the workflow's operation, or difficulty in tracking the state and outcomes of individual tasks, it might be considered a bad practice. Instead of putting tasks within tasks, you could: - Create helper functions that are called within tasks, encapsulating parts of the logic without additional task overhead. - Define sub-flows if you have a complex sequence of tasks that should be reused. For more comprehensive understanding of tasks and flows in Prefect, you can explore the following resources: - Tasks Overview - Prefect Tutorial on Tasks - Successfully Deploying Task Queue Remember, the design you choose should aim to keep your workflows maintainable, efficient, and scalable, and there might be trade-offs depending on your specific situation.
c
@Marvin build deployment from python documentation
m
To build a deployment from a Python script using Prefect, you'll use the
prefect.deployments.Deployment
class. Below is an example of how you can create a new deployment using configuration defaults for an imported flow:
Copy code
python
from my_project.flows import my_flow
from prefect.deployments import Deployment

deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="example-deployment",
    version=1,
    work_queue_name="demo",
    work_pool_name="default-agent-pool",
)
deployment.apply()
In this example,
my_flow
is a flow that you have defined in your project. You set the deployment name, version, work queue, and work pool name. After that, you use
deployment.apply()
to apply the deployment configuration. If you have a pre-defined storage block, for instance, an S3 bucket, and you'd like to use it along with an infrastructure override, this is how you could define the deployment:
Copy code
python
from my_project.flows import my_flow
from prefect.deployments import Deployment
from prefect.filesystems import S3

storage = S3.load("dev-bucket")  # load a pre-defined block

deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="s3-example",
    version=2,
    work_queue_name="prod",
    storage=storage
    # ... you can include other overrides like infrastructure, schedule, etc.
)
deployment.apply()
You can find detailed instructions on Deployments in the Prefect documentation. Please replace
my_project.flows
and
my_flow
with the actual package and flow names from your project, and adjust the parameters such as
name
,
version
,
work_queue_name
,
work_pool_name
, and
storage
to your needs.
c
@Marvin Where can i find proper documentation on deployment.yaml. I want to put a deployment on a schedule
m
To schedule a deployment using a
deployment.yaml
file, you need to add a
schedules
section to the file. Below is an example illustrating how to specify schedules in a
deployment.yaml
, which controls the scheduled runs of your deployment:
Copy code
yaml
deployments:
  ...
schedules:
  - cron: "0 0 * * *"
    timezone: "America/Chicago"
    active: false
  - cron: "0 12 * * *"
    timezone: "America/New_York"
    active: true
  - cron: "0 18 * * *"
    timezone: "Europe/London"
    active: true
In this snippet, the
schedules
section is an array of schedule configurations with
cron
expressions that define the interval at which the deployment should run. Each schedule also specifies a
timezone
and whether the schedule is
active
or not. The
cron
syntax follows the standard pattern of minute, hour, day of the month, month, and day of the week. For more information on creating schedules, you can refer to the following sections of the Prefect documentation: - Schedules Overview - Guide to Prefect Deploy commands and prefect.yaml Remember to replace the cron expressions and timezones with the values that suit your scheduling needs.