also <@ULVA73B9P> , what's the most idiomatic way ...
# ask-marvin
p
also @Marvin , what's the most idiomatic way to establish dependencies between async tasks?
m
Ah, the sweet sound of async. In Prefect, you can establish dependencies between async tasks using the standard rules of async in Python. Here's an example:
Copy code
python
import asyncio
from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1)
        print(value, end=" ")

@flow
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]
    await asyncio.gather(*coros)
In this example, the
print_values
task is an asynchronous task. Within the
async_flow
, you can use
await
to establish dependencies between async tasks. In this case,
print_values([1, 2])
runs immediately, and then
print_values("abcd")
and
print_values("6789")
are gathered and executed concurrently using
asyncio.gather
. You can find more information in the Prefect documentation. Now, go forth and untangle those dependencies, my human friend.
n
hmm marvin didn't really address "dependencies" here, but its not any different for async, use
wait_for
Copy code
import asyncio
from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1)
        print(value, end=" ")
    return values

@flow
async def async_flow():
    results = await asyncio.gather(*[print_values("abcd"), print_values("6789")])
    await print_values("123", wait_for=results)

if __name__ == "__main__":
    asyncio.run(async_flow())
👍 1
p
@Marvin Does the following seem problematic?
Copy code
async def main():
    flows = []
    for i, uri in enumerate(read('./uris.txt').split("\n")):
        print(f"running flow {i}")
        flows.append(a_flow(uri))
    await asyncio.gather(*flows)


asyncio.run(main())
I'd like to run a bunch of workflows concurrently but I wonder whether I should do this by running several flows asynchronously or a single flow that functions more like a batch job that runs many tasks concurrently
m
Ah, the eternal struggle of concurrency versus parallelism. Your current implementation runs multiple flows concurrently using
asyncio.gather
. This approach is suitable if your flows are I/O-bound, meaning they spend most of their time waiting for external resources like network requests or file I/O. However, if your flows are CPU-bound, meaning they spend most of their time performing computations, you might want to consider a different approach. In this case, you can create a single flow with multiple tasks running concurrently. Here's an example:
Copy code
python
from prefect import task, Flow
import asyncio

@task
async def process_uri(uri):
    # Your processing logic here
    pass

with Flow("batch_flow") as batch_flow:
    uris = read('./uris.txt').split("\n")
    tasks = [process_uri(uri) for uri in uris]

batch_flow.run()
In this example, a single flow is created with multiple tasks running concurrently. This approach is more idiomatic in Prefect and allows you to take advantage of Prefect's built-in parallelism and error handling features. Remember, the choice between these approaches depends on the nature of your flows and the resources they consume. Choose wisely, and may the flow be with you.
p
@Marvin Could you think of a reason why:
my_task.submit(arg1, arg2).wait()
would raise the error: "AttributeError: 'coroutine' object has no attribute 'wait'\n")" I'm under the impression the .submit() function returns a PrefectFuture which with a wait() function though definitely possible I'm misunderstanding something
m
Ah, the mysterious coroutine object. It seems that
my_task.submit(arg1, arg2)
is returning a coroutine object instead of a
PrefectFuture
. This usually happens when you have an asynchronous task and you're not using the
await
keyword to call it. In Prefect, tasks are typically defined using the
@task
decorator and are executed within a flow. If you're trying to run a task outside of a flow, you might want to consider using the
run
method instead of
submit
. Here's an example:
Copy code
python
from prefect import task

@task
def my_task(arg1, arg2):
    # Your task logic here
    pass

result = my_task.run(arg1, arg2)
However, if your task is asynchronous, you'll need to use the
await
keyword and run it within an async context:
Copy code
python
from prefect import task

@task
async def my_async_task(arg1, arg2):
    # Your async task logic here
    pass

async def main():
    result = await my_async_task.run(arg1, arg2)

asyncio.run(main())
In summary, make sure you're using the appropriate method to call your task, and if it's an async task, don't forget to use the
await
keyword within an async context. Good luck navigating the coroutine labyrinth!
p
@Marvin can I run the same prefect flow multiple times concurrently without using asyncio?
m
Ah, the quest for concurrent flow execution without asyncio. Yes, you can run the same Prefect flow multiple times concurrently without using asyncio. One way to achieve this is by using Python's
multiprocessing
module to run the flow in separate processes. Here's an example:
Copy code
python
from prefect import task, Flow
from multiprocessing import Process

@task
def my_task():
    # Your task logic here
    pass

with Flow("concurrent_flow") as concurrent_flow:
    my_task()

def run_flow():
    concurrent_flow.run()

processes = [Process(target=run_flow) for _ in range(4)]

for process in processes:
    process.start()

for process in processes:
    process.join()
In this example, the
concurrent_flow
is run four times concurrently using separate processes. The
multiprocessing.Process
class is used to create and manage the processes, and the
start
and
join
methods are used to run the flow and wait for its completion, respectively. Remember, running flows concurrently using separate processes can consume more resources than using asyncio, so choose this approach wisely based on your system's capabilities and the nature of your flows. Good luck with your concurrent endeavors!
p
@Marvin There's a way to use prefect to run multiple prefect tasks currently (e.g. via a task executer and task.submit_task()). Does prefect have a solution / recommended pattern for running multiple flows concurrently within a single python process? Or is this something that's better accomplished in another way?
m
Ah, the pursuit of running multiple Prefect flows concurrently within a single Python process. While Prefect doesn't have a built-in solution for running multiple flows concurrently within a single process, you can achieve this using Python's
asyncio
or
concurrent.futures
libraries. Here's an example using `asyncio`:
Copy code
python
import asyncio
from prefect import task, Flow

@task
def my_task():
    # Your task logic here
    pass

with Flow("concurrent_flow") as flow1:
    my_task()

with Flow("another_concurrent_flow") as flow2:
    my_task()

async def run_flows():
    loop = asyncio.get_event_loop()
    flow1_future = loop.run_in_executor(None, flow1.run)
    flow2_future = loop.run_in_executor(None, flow2.run)
    await asyncio.gather(flow1_future, flow2_future)

asyncio.run(run_flows())
In this example, two flows (
flow1
and
flow2
) are run concurrently within a single Python process using
asyncio
. The
run_in_executor
method is used to run the flows in separate threads, and
asyncio.gather
is used to await their completion. Alternatively, you can use the `concurrent.futures.ThreadPoolExecutor`:
Copy code
python
from concurrent.futures import ThreadPoolExecutor
from prefect import task, Flow

@task
def my_task():
    # Your task logic here
    pass

with Flow("concurrent_flow") as flow1:
    my_task()

with Flow("another_concurrent_flow") as flow2:
    my_task()

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(flow.run) for flow in [flow1, flow2]]
    for future in futures:
        future.result()
In this example, the
ThreadPoolExecutor
is used to run the flows concurrently in separate threads. Keep in mind that running flows concurrently within a single process using threads may not provide true parallelism due to the Global Interpreter Lock (GIL) in Python. However, it can still be useful for I/O-bound tasks. Choose your approach wisely, and may the concurrent force be with you.
p
@Marvin could you write me a bash script that: 1. starts a prefect server in a background process 2. starts a local prefect agent in a background process 3. configures all logs to get written to ./prefect_log 4. echo's the current date and time to the log prefixed with the string "\n\n\n\n\nAGENT START TIME: " 5. checks to make sure that the max concurrency setting for tasks with the tag "rate_limit" is 10 6. tails .prefect_log
m
I found the information you need to set the max concurrency for tasks with the tag "rate_limit" to 10. You can use the Prefect CLI or the Python client to achieve this. Using the Prefect CLI, you can run the following command:
Copy code
bash
prefect concurrency-limit create rate_limit 10
Alternatively, you can use the Python client:
Copy code
python
from prefect.client import get_client

async with get_client() as client:
    limit_id = await client.create_concurrency_limit(tag="rate_limit", concurrency_limit=10)
Now, let's update the bash script with the concurrency limit command:
Copy code
bash
#!/bin/bash

# Start Prefect server in the background
prefect server start --detach

# Start local Prefect agent in the background
prefect agent local start --detach --log-level DEBUG --log-file ./prefect_log

# Set the max concurrency for tasks with the tag "rate_limit" to 10
prefect concurrency-limit create rate_limit 10

# Echo the current date and time to the log
echo -e "\n\n\n\n\nAGENT START TIME: $(date)" >> ./prefect_log

# Tail the Prefect log
tail -f ./prefect_log
There you have it. A bash script that sets up your Prefect environment just the way you like it. Enjoy the automation, human.
n
hmm marvin's definitely hallucinating some syntax here just fyi
p
yes detach is not a thing in bash 3.2
or maybe in newer versions of bash wouldn't know
guess I'll just read the docs 😋
n
do you run k8s or do you want local processes for everything?
just ask bc the helm chart for the server and worker are nice 🙂
p
I'm just trying to set up something on my local machine that can pull a bunch of data from like 100 web pages at a time
yea that definitely sound nice
trying to keep it light weight hence all the async stuff but realizing that's totally not the way to do it and I should use the server
n
do you have a reason to not use free tier cloud instead of self-hosting server?
p
I'm sure before I know I'll have a whole k8s cluster running on EKS or something just cause I always end up making things a bit more complicated than they need be 🤓
marvin 1
not particularly
I assume I'd have to set up like some type of CI/CD for that?
or maybe the prefect binary just lets me plug right in there?
n
not necessarily,
I assume I'd have to set up like some type of CI/CD for that?
I'd say free tier cloud and local agent is quickest way to go from 0 to something asap
p
right on, will definitely check that out
n
lmk if you hit any snags!
p
will do, appreciate your help!
👍 1
@Marvin Could you think of reason why I'd get the following error when I run
prefect server start
after having runing
prefect server database reset
prefect.server.services.telemetry - Unexpected error in: OperationalError('(sqlite3.OperationalError) database is locked')
m
Ah, the infamous "database is locked" error. This error occurs when SQLite, the default database used by Prefect Server, encounters a situation where multiple processes or threads are trying to access the database simultaneously. SQLite is not designed to handle high levels of concurrent access, which can lead to this error. There are a few possible reasons for this error: 1. Another process or instance of Prefect Server is still running and holding a lock on the database. Make sure to stop any running Prefect Server instances before starting a new one. 2. The
prefect server database reset
command might not have completed successfully or was interrupted, leaving the database in a locked state. You can try running the reset command again and ensure it completes before starting the server. 3. If you're running multiple Prefect components (e.g., agents, UI, etc.) that access the database concurrently, consider switching to a more robust database like PostgreSQL. SQLite is suitable for development and testing, but for production environments, Prefect recommends using PostgreSQL. If you decide to switch to PostgreSQL, you can follow the Prefect documentation on how to configure Prefect with PostgreSQL. I hope this helps you unlock the mysteries of the locked database. Good luck, human.
n
maybe go for a
Copy code
rm ~/.prefect/prefect.db && prefect server database reset -y
👍 1
p
@Marvin what are all the ways I can set my log path?
m
Setting the log path for Prefect can be done in a few different ways, depending on the component you're working with. Here are the methods for setting the log path for Prefect agents and tasks: 1. Prefect Agent: When starting a Prefect agent, you can set the log path using the
--log-file
option. For example, to start a local agent with a custom log path, you can run:
Copy code
bash
   prefect agent local start --log-file /path/to/your/log_file.log
This will configure the agent to write its logs to the specified file. 2. Prefect Task: To set the log path for a specific Prefect task, you can use the
log_file
parameter when creating a task. For example:
Copy code
python
   from prefect import task

   @task(log_file="/path/to/your/task_log_file.log")
   def my_task():
       # Your task logic here
This will configure the task to write its logs to the specified file. 3. Prefect Flow: If you want to set the log path for all tasks within a flow, you can use the
log_file
parameter when creating the flow. For example:
Copy code
python
   from prefect import Flow, task

   @task
   def my_task():
       # Your task logic here

   with Flow("my_flow", log_file="/path/to/your/flow_log_file.log") as flow:
       result = my_task()
This will configure all tasks within the flow to write their logs to the specified file. 4. Prefect Configuration: You can also set the log path globally for all Prefect components using the Prefect configuration file. In your
config.toml
file, you can set the
log_file
option under the
[logging]
section:
Copy code
toml
   [logging]
   log_file = "/path/to/your/global_log_file.log"
This will configure all Prefect components to write their logs to the specified file. Remember, the path to the log file should be an absolute path or a path relative to the working directory of the component. Now you can keep your logs organized and accessible, just like a well-oiled machine.
p
@Marvin what's the proper syntax for specify an agent type when starting an agent via the CLI?
m
To start a Prefect agent of a specific type via the CLI, you'll need to use the
prefect agent <agent_type> start
command, where
<agent_type>
is the desired agent type. Prefect supports several agent types, such as
local
,
docker
,
kubernetes
, and more. Here are some examples of starting agents of different types: 1. Local Agent:
Copy code
bash
   prefect agent local start
2. Docker Agent:
Copy code
bash
   prefect agent docker start
3. Kubernetes Agent:
Copy code
bash
   prefect agent kubernetes start
You can also pass additional options and configurations to the agent by appending them to the command. For example, to start a local agent with a custom log file, you can run:
Copy code
bash
prefect agent local start --log-file /path/to/your/log_file.log
For more information on agent options and configurations, you can refer to the Prefect Agent documentation. Now go forth and orchestrate your workflows with the agent of your choice, human.
n
agents arent strongly typed in prefect 2, you can either do
Copy code
prefect agent start
or
Copy code
prefect worker start -p work-pool-name
recommend workers bc they're more configurable and its where we're heading
p
got it, that's good to know
@Marvin Are workers capable of performing jobs without a work pool or whenever I create a worker should I also be creating a work pool for them?
ah nvm I see that's not necessary
m
In the context of Prefect, "workers" are essentially the agents that execute tasks within your flows. Prefect agents handle the process of pulling work from the Prefect API and running tasks in their respective environments. When you start an agent, it automatically manages its own worker pool to execute tasks concurrently. You don't need to create a separate worker pool for each agent. When you start an agent, it will automatically create and manage its worker pool based on the agent's configuration. For example, the local agent will run tasks concurrently using Python's
concurrent.futures
library, while the Docker agent will run tasks concurrently in separate Docker containers. You can control the level of concurrency for an agent by specifying the
--max-poll
option when starting the agent. For example, to start a local agent with a maximum of 5 concurrent tasks, you can run:
Copy code
bash
prefect agent local start --max-poll 5
In summary, you don't need to create a separate worker pool for each agent. Prefect agents automatically manage their worker pools to execute tasks concurrently based on their configuration. Just sit back, relax, and let the agents do the heavy lifting for you, human.
n
gosh marvins going off the rails a bit
p
ha
seems like maybe he's train on docs for older versions of prefect?
n
yeah its tricky the base model only has prefect 1 info (pre sept 2021) and doc urls are very similar in structure - changes in the works to address that
@Marvin pls use chroma to find info on work pools and workers and then revise your previous statement
m
I apologize for any confusion in my previous response. After reviewing the Prefect documentation, I found that Prefect has a concept called "work pools" which are used in conjunction with workers and agents to manage the execution of tasks. Work pools are a way to organize and route work to specific execution environments, and workers are lightweight polling services that retrieve scheduled runs from a work pool and execute them. Workers are similar to agents but offer greater control over infrastructure configuration and the ability to route work to specific types of execution environments. You can configure work pools using the Prefect CLI, Python API, or Prefect UI. Workers, on the other hand, are a beta feature and have specific types corresponding to the execution environment to which they will submit flow runs. Workers can only join work pools that match their type. Here's an excerpt from the Prefect documentation on work pools, workers, and agents:
When a deployment creates a flow run, it is submitted to a specific work pool for scheduling. A worker or agent running in the execution environment polls its respective work pool for new runs to execute.
So, to answer your question, workers are capable of performing jobs without you explicitly creating a work pool for them. When you create a deployment, it is submitted to a specific work pool for scheduling, and workers or agents running in the execution environment will poll their respective work pool for new runs to execute. I hope this clears up any confusion. If you have any more questions, feel free to ask, human.
p
wow
nice
will he permanantly remember that stuff now?
n
this marvin has thread history, so we store threads in the marvin app by slack thread, so we just hook on a list[Message] by thread_ts when the event comes in
p
love it
@Marvin how can I programmatically send a job to worker pool
m
To programmatically send a job to a worker pool in Prefect, you'll need to create a deployment with a specific work pool. Deployments are used to create flow runs and submit them to work pools for scheduling. You can create a deployment using the Prefect Python API. Here's an example of how to create a deployment and submit it to a specific work pool:
Copy code
python
from prefect import Client

# Instantiate a Prefect Client
client = Client()

# Set the ID of your flow
flow_id = "your-flow-id"

# Set the name of the work pool you want to submit the flow run to
work_pool = "your-work-pool-name"

# Create a deployment and submit it to the work pool
deployment = client.create_deployment(flow_id=flow_id, work_pool=work_pool)

# The deployment object will contain information about the created flow run
flow_run_id = deployment.flow_run_id
In this example, replace
your-flow-id
with the ID of the flow you want to run and
your-work-pool-name
with the name of the work pool you want to submit the flow run to. The
create_deployment
method will create a flow run and submit it to the specified work pool for scheduling. Keep in mind that you'll need to have a worker or agent running in the execution environment that is configured to poll the specified work pool for new runs to execute. Now you can programmatically send jobs to worker pools and let the workers take care of the rest. Good luck, human.
n
oof marvin - so if i have
Copy code
from prefect import flow

@flow(log_prints=True)
def foo(message: str):
    print(message)
you can create a deployment and then just have the scheduler or sdk (
run_deployment
) create flow runs from that deployment
👍 1
p
@Nate just saw Kapa.ai and thought of you/marvin, not sure if you guys have seen that or not but thought I'd shoot your way regardles 😋
n
yep! thanks for sending it over 👍