<@ULVA73B9P> how to schedule a new task from withi...
# marvin-in-the-wild
z
@Marvin how to schedule a new task from within the flow?
m
To schedule a new task from within a flow, you have to define tasks using the
@task
decorator and ensure they are called within the
@flow
decorated function. Direct scheduling of tasks independent of a flow is not supported; they must be part of a flow to be scheduled and executed by Prefect. For scheduling flows (which comprise tasks), you can use external schedulers or create deployments on Prefect Cloud or a local Prefect server that include a schedule. Tasks are used for encapsulating discrete units of work and are always executed as part of a flow run. Here is a basic example of defining and calling tasks within a flow:
Copy code
python
from prefect import flow, task

@task(task_run_name="Adding {x} and {y}")
def add(x: int, y: int) -> int:
    """just a python function with a fancy hat on!"""
    return x + y

@flow
def my_flow():
    """tasks must be called in the flow to be executed"""
    first_result = add(1, 2)
    second_result = add(first_result, first_result)
    return add(first_result, second_result)

if __name__ == "__main__":
    # run the flow
    my_flow()
    
    # you can run tasks' functions directly outside of a flow if you want
    # but it won't be tracked by prefect, it's just a normal function
    assert add.fn(1, 2) == 3
For detailed information, please refer to the relevant sections of the Prefect documentation: - Tasks in a flow - Flow runs - Tasks You can also examine the example code on GitHub.
z
@Marvin How I schedule a subflow at the specific time from within a parent flow ?
m
Scheduling a subflow at a specific time from within a parent flow is not directly addressed in the excerpts from the Prefect documentation. However, you can compose flows by calling a flow function inside the execution of another flow. The primary flow is the "parent" flow, and the flow created within the parent is the "child" or "subflow". Although the documents do not detail the mechanism for triggering subflows based on time from a parent flow, we can deduce a general approach. Subflows, as described, will block the execution of the parent flow until they complete. They can also be run asynchronously for concurrent execution with the parent flow. Regarding scheduling, when you want to trigger a subflow from a parent flow at a specific time, you may need to construct logic that determines whether the current time matches the desired time for the subflow to run. Here's a conceptual example of how that might look:
Copy code
python
from prefect import flow
from datetime import datetime, timedelta
import asyncio

@flow
def child_flow():
    # Subflow logic here
    pass

@flow
def parent_flow():
    # Check the current time and desired subflow time
    now = datetime.now()
    subflow_time = datetime(2023, 4, 20, 12, 0) # 12:00 PM on April 20, 2023

    # Calculate the time difference
    time_to_wait = subflow_time - now
    if time_to_wait.total_seconds() > 0:
        # Wait until the scheduled time
        asyncio.sleep(time_to_wait.total_seconds())

    # Run the subflow
    child_flow()

if __name__ == "__main__":
    parent_flow()
This example assumes that the parent flow is running and will execute the subflow when the scheduled time is reached. For a more robust solution, you might need to integrate Prefect's Automation concepts, which allow you to react to certain events, but in this case, you would be creating a custom event based on time within your own parent flow's logic. For more advanced scheduling capabilities, I recommend reviewing the Automations section of the Prefect documentation and considering the setup of an external scheduler or a deployment that invokes the subflow at the desired time.
n
@Marvin
run_deployment
!
👍 1
m
To schedule a subflow at a specific time from within a parent flow, you can use the
run_deployment
function provided by Prefect. The
run_deployment
function can schedule the run of an existing deployment when your Python code executes. Here's an example snippet of how you might use `run_deployment`:
Copy code
python
from prefect.deployments import run_deployment

@flow
def parent_flow():
    # Logic to determine if it's the right time to run the subflow
    # ...

    # Run a deployment of the subflow
    run_deployment(name="my_subflow_deployment_name")
Remember to replace
"my_subflow_deployment_name"
with the actual name of the deployment for your subflow. This function allows you to programmatically control flow execution from within Python, meaning that you can potentially put it within a time-checking mechanism in your parent flow's logic to wait for a specific schedule. For detailed instructions on creating a deployment, you can refer to the Block-based Deployments documentation. This section outlines the configuration, application, and listing of deployments using the Prefect CLI. Furthermore, the concept of Automations and schedulers available in Prefect also provides event-based rules and time-based schedules for running a deployment. Once you have your deployment configured, using
run_deployment
in your parent flow should allow you to trigger a subflow precisely when your logic conditions (such as reaching a specific time) are met.