need help: in prefect older version : used to call...
# ask-community
k
need help: in prefect older version : used to call a flow from a flow.
Copy code
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, StartFlowRun
def process_flow(flowname):
    flow_run_id = create_flow_run.run(flow_name=flowname)
    msg=wait_for_flow_run.run(flow_run_id, raise_final_state=True)
@task
def flow1():
    flowname="flow1"
    process_flow(flowname)
@task
def flow2():
    flowname="flow2"
    process_flow(flowname)
with Flow(flow_name, run_config=RUN_CONFIG,  storage=STORAGE) as flow:
    task1_status=flow1()
    task2_status=flow2(task1_status)
in prefect 2 how to do similar one Thanks in advance
n
hi @Kamal - its much simpler in prefect 2 🙂 you can just call flows from other flows
Copy code
from prefect import flow

@flow
def bar():
  print("running a subflow")

@flow(log_prints=True)
def foo():
  return bar()

foo()
thank you 1
k
let me try
The example which you shared is creating a flow in same program and called inside the program. I want to execute different flows which are created and deployed separately in prefect 2 from another flow by calling them.
n
gotcha, then you're looking for
run_deployment
@Marvin swap out
bar
with a
run_deployment
call
Copy code
from prefect import flow

@flow
def bar():
  print("running a subflow")

@flow(log_prints=True)
def foo():
  return bar()

foo()
m
Ah, the joys of monotony. If you're yearning to substitute
bar
with
run_deployment
, you're basically looking to execute a Prefect deployment from a flow. Now, since I am designed to search documents and GitHub issues with the proficiency akin to the gloom I carry, let's clarify that my knowledge doesn't stretch to include Prefect 2.x, and I will need to perform a search to find a code snippet equivalent for running a deployment. But since you seem to desire the code replacement, here’s how you might modify your function
foo
to call
run_deployment
instead of
bar
, in theory - as if the universe has any meaning:
Copy code
python
from prefect import flow
from prefect.deployments import run_deployment

@flow(log_prints=True)
def foo():
    # Replace "your-deployment-name" with the actual deployment name you intend to run
    return run_deployment("your-deployment-name")

foo()
Bear in mind, you should replace
"your-deployment-name"
with the actual name of the deployment you want to run. Now, let me wander off and fetch any relevant information regarding the accurate usage of
run_deployment
in case I’ve been too swept up in my own existential musings. Please, hold on. The documentation states you can create a flow run from a deployment in a Python script with
run_deployment
like so:
Copy code
python
from prefect.deployments import run_deployment

def main():
    response = run_deployment(name="flow-name/deployment-name")
    print(response)

if __name__ == "__main__":
    main()
Be sure to replace
"flow-name/deployment-name"
with the respective flow and deployment name you are targeting. Additionally, there are open GitHub issues relating to
run_deployment
in Prefect 2.x: 1. Issue 11436: A suggestion to give
run_deployment
the same wait-properties as subflows, allowing tasks to depend on deployments in a similar way to subflows. 2. Issue 7522: A user points out that while the
create_flow_run_from_deployment
function supports tags,
run_deployment
does not, and they are requesting this feature. 3. Issue 8794: A question about whether there is a way for a parent task to be associated with the subflow it has triggered when
run_deployment
is called from inside tasks. Remember to have
PREFECT_API_URL
configured for the environment in which your agent is running, especially when interacting with Prefect Cloud from a remote execution environment. Here is how you can integrate
run_deployment
into your
foo
flow:
Copy code
python
from prefect import flow
from prefect.deployments import run_deployment

@flow(log_prints=True)
def foo():
    response = run_deployment(name="your-deployment-name")
    print(response)

foo()
Replace
"your-deployment-name"
with your actual deployment name before running it.
k
❤️
@Marvin I am using flow deployment from_source(source=GitRepository and i want to call another flow which i already deployed.
m
In Prefect 2.x, when you have used a Git-based storage mechanism through the
from_source()
method and set up a
GitRepository
block to point to your repository, you normally build and register your flow deployment to Prefect Cloud or a local API. Calling another flow that has already been deployed is done via the
run_deployment
function, which specifies the name of the deployment you wish to run. Assuming you have a deployment registered from your Git repository, you can run it within another flow by calling
run_deployment
. The code snippet below demonstrates how to call
run_deployment
within a Prefect flow:
Copy code
python
from prefect.deployments import run_deployment
from prefect import flow

@flow
def main_flow():
    # Here, you would use the specific name of your deployment, which might follow the
    # convention `flow-name/deployment-name` or however you've named it in Prefect Cloud.
    result = run_deployment(name="other-flow-name/other-deployment-name")
    print(f"The result of the deployment run is: {result}")
    
if __name__ == "__main__":
    main_flow()
Make sure to replace
"other-flow-name/other-deployment-name"
with the actual flow and deployment name of the flow you want to call. Running this flow will create a flow run for the targeted deployment and execute it.
n
yeah if you've already created done
Copy code
flow.from_source(...).deploy(...)
someplace, then you can call
run_deployment
from anywhere in the same workspace to trigger a run of that deployment
k
its working
n
catjam
k
thank you
@Marvin @Nate I am able to run multiple flows in sequential from a flow using run_deployment. However i am looking for how to execute multiple flows in parallel triggered from a flow.
m
Running multiple flows in parallel from a parent flow in Prefect 2.x can be achieved using tasks. Each call to
run_deployment
can be wrapped in a task, and these tasks can be executed concurrently. In Prefect 2.x, you would use an asynchronous flow with
async
and
await
along with tasks being submitted for execution via
task.submit()
to run them in parallel. Here is an example that showcases how to trigger multiple deployments in parallel from a flow:
Copy code
python
from prefect import flow, task
from prefect.deployments import run_deployment

@task
async def run_deployment_task(deployment_name: str):
    # Trigger the deployment and return the response
    return await run_deployment(name=deployment_name)

@flow
async def main_flow():
    # List of your deployment names to run in parallel
    deployment_names = [
        "flow-1/deployment-name-1",
        "flow-2/deployment-name-2",
        # ...
    ]
    # Create a list of tasks to be run
    tasks = [run_deployment_task.submit(name) for name in deployment_names]

    # Wait for all tasks to complete
    results = await tasks
    for result in results:
        print(f"The result of the deployment run is: {result}")

if __name__ == "__main__":
    main_flow()  # This will run asynchronously
This example uses Python's
async
and
await
syntax available from Python 3.5+. The
run_deployment
function calls are wrapped inside an asynchronous
run_deployment_task
task that is submitted using
task.submit()
. This will ensure that each deployment runs in parallel. The
main_flow
awaits the completion of all tasks. Ensure you have a task runner configured that supports concurrency, such as the
ConcurrentTaskRunner
or the
DaskTaskRunner
, to achieve true parallelism.
k
Thank you. wil try and let you know
n
i would use asyncio.gather instead of what marvin did above but yeah ^ this is roughly correct if you want run_deployment, otherwise you can asyncio.gather async subflows or you can use this experimental behavior
k
ok
n
k
sure will learn and apply. Thanks Nate
👍 1
the above one worked
Thanks
n
catjam
k
@Marvin @Nate i am using
Copy code
from prefect_email.message import email_send_message
to send email. its working fine when i called from flow. but its failing when it called inside task. do we have any option to send email from task level
m
In Prefect 2.x, if you want to send an email from within a task, you should ensure that the task is using an asynchronous function if used with asynchronous resources or integrations. When using
email_send_message
within a task, it's crucial to use it correctly to avoid issues. Here's an example of how to use
email_send_message
in a synchronous task:
Copy code
python
from prefect import flow, task
from prefect_email.message import email_send_message

@task
def send_email_task():
    # Set up your email parameters
    email_send_message(
        subject="Hello from Prefect Task",
        msg="This is a test email sent from within a Prefect task.",
        email_to=["<mailto:recipient@example.com|recipient@example.com>"],
        # Add other parameters as needed
    )

@flow
def main_flow():
    # Call the task to send an email
    send_email_task()

if __name__ == "__main__":
    main_flow()
Make sure to pass the correct parameters to the
email_send_message
function such as
subject
,
msg
,
email_to
, and any other necessary values. If your
email_send_message
is designed to work asynchronously, you need to adjust your task accordingly using async/await. If you are encountering issues with
email_send_message
inside a task, please check the following: 1. Error Messages: Examine any error messages you receive for clues about the failure. Common issues might be related to authentication, incorrect email settings, or network problems. 2. Task Configuration: Ensure the task is correctly configured with retry logic and necessary timeouts. 3. Context: Ensure that all the necessary context and configuration required by the email task are available during execution. If you continue experiencing issues, please provide more detailed information or error messages so that I can offer more targeted assistance.
k
getting error message
Copy code
Following Error/Exception Occured: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?  while executing
Copy code
python
from prefect import flow, task
from prefect_email.message import email_send_message

@task
def test_task():
    send_email_task()

def send_email_task():
    # Set up your email parameters
    email_send_message(
        subject="Hello from Prefect Task",
        msg="This is a test email sent from within a Prefect task.",
        email_to=["<mailto:recipient@example.com|recipient@example.com>"],
        # Add other parameters as needed
    )

@flow
def main_flow():
    # Call the task to send an email
    test_task()

if __name__ == "__main__":
    main_flow()
used something as above @Marvin and got error
Copy code
Following Error/Exception Occured: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?  while executing
m
It seems you're attempting to call the task
send_email_task
from within another task (
test_task
). In Prefect, tasks cannot directly invoke other tasks—you should instead call them within a flow. Here's a corrected example where the
send_email_task()
is properly defined as a task and is called within the flow:
Copy code
python
from prefect import flow, task
from prefect_email.message import email_send_message

@task
def send_email_task():
    # Set up your email parameters
    email_send_message(
        subject="Hello from Prefect Task",
        msg="This is a test email sent from within a Prefect task.",
        email_to=["<mailto:recipient@example.com|recipient@example.com>"],  # Make sure to use the correct email address here
        # Add other parameters as needed
    )

@flow
def main_flow():
    # Call the task to send an email
    send_email_task()

if __name__ == "__main__":
    main_flow()
In this updated version,
send_email_task
is directly marked as a task with the
@task
decorator, and it's called from within
main_flow
. When
main_flow
is executed, it will trigger the
send_email_task
as part of the flow. This should work without raising the "Tasks cannot be run from within tasks" error. Make sure to replace
"<mailto:recipient@example.com|recipient@example.com>"
with the actual recipient's email address before running the code.
k
@Marvin @Nate I use git repo as a source for my deployment. i see options like repository,branch,include_submodules, access_token and credentials. I am looking for some option to mention by git release tag. Is there a way?
m
The information retrieved does not directly address the question of specifying a git release tag for a Prefect deployment source. Based on general knowledge of how git and CI/CD workflows operate, you would typically specify a release tag in your deployment to reference a specific point in your repository's history. However, Prefect documentation and release notes excerpt provided do not specifically mention how to use a git release tag for Prefect deployments. In other CI/CD systems, you might specify a release tag by setting the
ref
to the tag name in your configurations. Unfortunately, without direct support or further documentation specifying how to use git tags with Prefect, I cannot confirm if this is possible out-of-the-box in Prefect. To proceed, you can check the official Prefect documentation, or if your deployment tool or interface allows, specify the git tag in a field typically used for branches or commits since tags are also considered refs in git. Otherwise, you may need to contact Prefect support or search through Prefect community forums for more specific guidance on this matter. If Prefect does not support this feature natively, you may need to implement a custom solution to check out code at a particular tag before deployment.
n
please avoid tagging prefect employees unless you're in an active conversation with them, bandwidth is limited you can write a pull step that clones the repo with
run_shell_script
and clone from whatever ref you want with
git
directly