Peter Berg
06/08/2023, 7:16 PMMarvin
06/08/2023, 7:17 PMpython
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1)
print(value, end=" ")
@flow
async def async_flow():
await print_values([1, 2]) # runs immediately
coros = [print_values("abcd"), print_values("6789")]
await asyncio.gather(*coros)
In this example, the print_values
task is an asynchronous task. Within the async_flow
, you can use await
to establish dependencies between async tasks. In this case, print_values([1, 2])
runs immediately, and then print_values("abcd")
and print_values("6789")
are gathered and executed concurrently using asyncio.gather
.
You can find more information in the Prefect documentation.
Now, go forth and untangle those dependencies, my human friend.Nate
06/08/2023, 7:23 PMwait_for
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1)
print(value, end=" ")
return values
@flow
async def async_flow():
results = await asyncio.gather(*[print_values("abcd"), print_values("6789")])
await print_values("123", wait_for=results)
if __name__ == "__main__":
asyncio.run(async_flow())
Peter Berg
06/08/2023, 7:55 PMasync def main():
flows = []
for i, uri in enumerate(read('./uris.txt').split("\n")):
print(f"running flow {i}")
flows.append(a_flow(uri))
await asyncio.gather(*flows)
asyncio.run(main())
I'd like to run a bunch of workflows concurrently but I wonder whether I should do this by running several flows asynchronously or a single flow that functions more like a batch job that runs many tasks concurrentlyMarvin
06/08/2023, 7:55 PMasyncio.gather
. This approach is suitable if your flows are I/O-bound, meaning they spend most of their time waiting for external resources like network requests or file I/O.
However, if your flows are CPU-bound, meaning they spend most of their time performing computations, you might want to consider a different approach. In this case, you can create a single flow with multiple tasks running concurrently. Here's an example:
python
from prefect import task, Flow
import asyncio
@task
async def process_uri(uri):
# Your processing logic here
pass
with Flow("batch_flow") as batch_flow:
uris = read('./uris.txt').split("\n")
tasks = [process_uri(uri) for uri in uris]
batch_flow.run()
In this example, a single flow is created with multiple tasks running concurrently. This approach is more idiomatic in Prefect and allows you to take advantage of Prefect's built-in parallelism and error handling features.
Remember, the choice between these approaches depends on the nature of your flows and the resources they consume. Choose wisely, and may the flow be with you.Peter Berg
06/08/2023, 8:17 PMmy_task.submit(arg1, arg2).wait()
would raise the error:
"AttributeError: 'coroutine' object has no attribute 'wait'\n")"
I'm under the impression the .submit() function returns a PrefectFuture which with a wait() function though definitely possible I'm misunderstanding somethingMarvin
06/08/2023, 8:18 PMmy_task.submit(arg1, arg2)
is returning a coroutine object instead of a PrefectFuture
. This usually happens when you have an asynchronous task and you're not using the await
keyword to call it.
In Prefect, tasks are typically defined using the @task
decorator and are executed within a flow. If you're trying to run a task outside of a flow, you might want to consider using the run
method instead of submit
. Here's an example:
python
from prefect import task
@task
def my_task(arg1, arg2):
# Your task logic here
pass
result = my_task.run(arg1, arg2)
However, if your task is asynchronous, you'll need to use the await
keyword and run it within an async context:
python
from prefect import task
@task
async def my_async_task(arg1, arg2):
# Your async task logic here
pass
async def main():
result = await my_async_task.run(arg1, arg2)
asyncio.run(main())
In summary, make sure you're using the appropriate method to call your task, and if it's an async task, don't forget to use the await
keyword within an async context. Good luck navigating the coroutine labyrinth!Peter Berg
06/08/2023, 8:36 PMMarvin
06/08/2023, 8:37 PMmultiprocessing
module to run the flow in separate processes. Here's an example:
python
from prefect import task, Flow
from multiprocessing import Process
@task
def my_task():
# Your task logic here
pass
with Flow("concurrent_flow") as concurrent_flow:
my_task()
def run_flow():
concurrent_flow.run()
processes = [Process(target=run_flow) for _ in range(4)]
for process in processes:
process.start()
for process in processes:
process.join()
In this example, the concurrent_flow
is run four times concurrently using separate processes. The multiprocessing.Process
class is used to create and manage the processes, and the start
and join
methods are used to run the flow and wait for its completion, respectively.
Remember, running flows concurrently using separate processes can consume more resources than using asyncio, so choose this approach wisely based on your system's capabilities and the nature of your flows. Good luck with your concurrent endeavors!Peter Berg
06/08/2023, 8:40 PMMarvin
06/08/2023, 8:42 PMasyncio
or concurrent.futures
libraries.
Here's an example using `asyncio`:
python
import asyncio
from prefect import task, Flow
@task
def my_task():
# Your task logic here
pass
with Flow("concurrent_flow") as flow1:
my_task()
with Flow("another_concurrent_flow") as flow2:
my_task()
async def run_flows():
loop = asyncio.get_event_loop()
flow1_future = loop.run_in_executor(None, flow1.run)
flow2_future = loop.run_in_executor(None, flow2.run)
await asyncio.gather(flow1_future, flow2_future)
asyncio.run(run_flows())
In this example, two flows (flow1
and flow2
) are run concurrently within a single Python process using asyncio
. The run_in_executor
method is used to run the flows in separate threads, and asyncio.gather
is used to await their completion.
Alternatively, you can use the `concurrent.futures.ThreadPoolExecutor`:
python
from concurrent.futures import ThreadPoolExecutor
from prefect import task, Flow
@task
def my_task():
# Your task logic here
pass
with Flow("concurrent_flow") as flow1:
my_task()
with Flow("another_concurrent_flow") as flow2:
my_task()
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(flow.run) for flow in [flow1, flow2]]
for future in futures:
future.result()
In this example, the ThreadPoolExecutor
is used to run the flows concurrently in separate threads.
Keep in mind that running flows concurrently within a single process using threads may not provide true parallelism due to the Global Interpreter Lock (GIL) in Python. However, it can still be useful for I/O-bound tasks. Choose your approach wisely, and may the concurrent force be with you.Peter Berg
06/08/2023, 11:11 PMMarvin
06/08/2023, 11:12 PMbash
prefect concurrency-limit create rate_limit 10
Alternatively, you can use the Python client:
python
from prefect.client import get_client
async with get_client() as client:
limit_id = await client.create_concurrency_limit(tag="rate_limit", concurrency_limit=10)
Now, let's update the bash script with the concurrency limit command:
bash
#!/bin/bash
# Start Prefect server in the background
prefect server start --detach
# Start local Prefect agent in the background
prefect agent local start --detach --log-level DEBUG --log-file ./prefect_log
# Set the max concurrency for tasks with the tag "rate_limit" to 10
prefect concurrency-limit create rate_limit 10
# Echo the current date and time to the log
echo -e "\n\n\n\n\nAGENT START TIME: $(date)" >> ./prefect_log
# Tail the Prefect log
tail -f ./prefect_log
There you have it. A bash script that sets up your Prefect environment just the way you like it. Enjoy the automation, human.Nate
06/08/2023, 11:14 PMPeter Berg
06/08/2023, 11:14 PMPeter Berg
06/08/2023, 11:14 PMPeter Berg
06/08/2023, 11:15 PMNate
06/08/2023, 11:15 PMNate
06/08/2023, 11:15 PMPeter Berg
06/08/2023, 11:15 PMPeter Berg
06/08/2023, 11:16 PMPeter Berg
06/08/2023, 11:16 PMNate
06/08/2023, 11:16 PMPeter Berg
06/08/2023, 11:17 PMPeter Berg
06/08/2023, 11:17 PMPeter Berg
06/08/2023, 11:17 PMPeter Berg
06/08/2023, 11:18 PMNate
06/08/2023, 11:18 PMI assume I'd have to set up like some type of CI/CD for that?I'd say free tier cloud and local agent is quickest way to go from 0 to something asap
Peter Berg
06/08/2023, 11:18 PMNate
06/08/2023, 11:28 PMPeter Berg
06/08/2023, 11:32 PMPeter Berg
06/08/2023, 11:39 PMprefect server start
after having runing prefect server database reset
prefect.server.services.telemetry - Unexpected error in: OperationalError('(sqlite3.OperationalError) database is
locked')Marvin
06/08/2023, 11:40 PMprefect server database reset
command might not have completed successfully or was interrupted, leaving the database in a locked state. You can try running the reset command again and ensure it completes before starting the server.
3. If you're running multiple Prefect components (e.g., agents, UI, etc.) that access the database concurrently, consider switching to a more robust database like PostgreSQL. SQLite is suitable for development and testing, but for production environments, Prefect recommends using PostgreSQL.
If you decide to switch to PostgreSQL, you can follow the Prefect documentation on how to configure Prefect with PostgreSQL.
I hope this helps you unlock the mysteries of the locked database. Good luck, human.Nate
06/08/2023, 11:42 PMrm ~/.prefect/prefect.db && prefect server database reset -y
Peter Berg
06/09/2023, 12:20 AMMarvin
06/09/2023, 12:22 AM--log-file
option. For example, to start a local agent with a custom log path, you can run:
bash
prefect agent local start --log-file /path/to/your/log_file.log
This will configure the agent to write its logs to the specified file.
2. Prefect Task: To set the log path for a specific Prefect task, you can use the log_file
parameter when creating a task. For example:
python
from prefect import task
@task(log_file="/path/to/your/task_log_file.log")
def my_task():
# Your task logic here
This will configure the task to write its logs to the specified file.
3. Prefect Flow: If you want to set the log path for all tasks within a flow, you can use the log_file
parameter when creating the flow. For example:
python
from prefect import Flow, task
@task
def my_task():
# Your task logic here
with Flow("my_flow", log_file="/path/to/your/flow_log_file.log") as flow:
result = my_task()
This will configure all tasks within the flow to write their logs to the specified file.
4. Prefect Configuration: You can also set the log path globally for all Prefect components using the Prefect configuration file. In your config.toml
file, you can set the log_file
option under the [logging]
section:
toml
[logging]
log_file = "/path/to/your/global_log_file.log"
This will configure all Prefect components to write their logs to the specified file.
Remember, the path to the log file should be an absolute path or a path relative to the working directory of the component. Now you can keep your logs organized and accessible, just like a well-oiled machine.Peter Berg
06/09/2023, 1:03 AMMarvin
06/09/2023, 1:03 AMprefect agent <agent_type> start
command, where <agent_type>
is the desired agent type. Prefect supports several agent types, such as local
, docker
, kubernetes
, and more.
Here are some examples of starting agents of different types:
1. Local Agent:
bash
prefect agent local start
2. Docker Agent:
bash
prefect agent docker start
3. Kubernetes Agent:
bash
prefect agent kubernetes start
You can also pass additional options and configurations to the agent by appending them to the command. For example, to start a local agent with a custom log file, you can run:
bash
prefect agent local start --log-file /path/to/your/log_file.log
For more information on agent options and configurations, you can refer to the Prefect Agent documentation.
Now go forth and orchestrate your workflows with the agent of your choice, human.Nate
06/09/2023, 1:04 AMprefect agent start
or
prefect worker start -p work-pool-name
Nate
06/09/2023, 1:05 AMNate
06/09/2023, 1:05 AMPeter Berg
06/09/2023, 1:05 AMPeter Berg
06/09/2023, 1:13 AMPeter Berg
06/09/2023, 1:13 AMMarvin
06/09/2023, 1:13 AMconcurrent.futures
library, while the Docker agent will run tasks concurrently in separate Docker containers.
You can control the level of concurrency for an agent by specifying the --max-poll
option when starting the agent. For example, to start a local agent with a maximum of 5 concurrent tasks, you can run:
bash
prefect agent local start --max-poll 5
In summary, you don't need to create a separate worker pool for each agent. Prefect agents automatically manage their worker pools to execute tasks concurrently based on their configuration. Just sit back, relax, and let the agents do the heavy lifting for you, human.Nate
06/09/2023, 1:14 AMPeter Berg
06/09/2023, 1:15 AMPeter Berg
06/09/2023, 1:15 AMNate
06/09/2023, 1:16 AMNate
06/09/2023, 1:17 AMMarvin
06/09/2023, 1:18 AMWhen a deployment creates a flow run, it is submitted to a specific work pool for scheduling. A worker or agent running in the execution environment polls its respective work pool for new runs to execute.So, to answer your question, workers are capable of performing jobs without you explicitly creating a work pool for them. When you create a deployment, it is submitted to a specific work pool for scheduling, and workers or agents running in the execution environment will poll their respective work pool for new runs to execute. I hope this clears up any confusion. If you have any more questions, feel free to ask, human.
Peter Berg
06/09/2023, 1:19 AMPeter Berg
06/09/2023, 1:19 AMPeter Berg
06/09/2023, 1:19 AMNate
06/09/2023, 1:21 AMNate
06/09/2023, 1:22 AMPeter Berg
06/09/2023, 1:23 AMPeter Berg
06/09/2023, 1:49 AMMarvin
06/09/2023, 1:50 AMpython
from prefect import Client
# Instantiate a Prefect Client
client = Client()
# Set the ID of your flow
flow_id = "your-flow-id"
# Set the name of the work pool you want to submit the flow run to
work_pool = "your-work-pool-name"
# Create a deployment and submit it to the work pool
deployment = client.create_deployment(flow_id=flow_id, work_pool=work_pool)
# The deployment object will contain information about the created flow run
flow_run_id = deployment.flow_run_id
In this example, replace your-flow-id
with the ID of the flow you want to run and your-work-pool-name
with the name of the work pool you want to submit the flow run to. The create_deployment
method will create a flow run and submit it to the specified work pool for scheduling.
Keep in mind that you'll need to have a worker or agent running in the execution environment that is configured to poll the specified work pool for new runs to execute.
Now you can programmatically send jobs to worker pools and let the workers take care of the rest. Good luck, human.Nate
06/09/2023, 1:59 AMfrom prefect import flow
@flow(log_prints=True)
def foo(message: str):
print(message)
you can create a deployment and then just have the scheduler or sdk (run_deployment
) create flow runs from that deploymentPeter Berg
06/17/2023, 2:09 AMNate
06/20/2023, 4:53 PM