Raffael Campos
10/30/2024, 6:54 PM# limiting concurrency of tag would lead to a long time
@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
# might fail
tx_hash = web3.eth.send_raw_transaction(tx)
return tx_hash
@task
def wait_for_tx(tx_hash: str):
return web3.eth.wait_for_transaction_receipt(tx_hash)
@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
tx_hash = task_send_tx.submit(tx_params)
return wait_for_tx.submit(tx_hash)
@flow
def my_flow():
# suppose this return 600 items. It would take 600 seconds to run on 1 second delay
many_params = gather_params()
all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
wait(all_tasks)
ideally, if I was free to pass anything as params, I could use semaphores:
python
@task
def task_send_tx(tx_params: dict, semaphore: asyncio.Semaphore):
with semaphore:
return web3.eth.send_raw_transaction(tx)
but it seems I can't. Or can I? I received an error about not being able to get from context.
I would prefer to keep using separate tasks for each to leverage prefect features such as retrials, etc
Do you have any suggestions to perform this?Marvin
10/30/2024, 6:54 PMI have a question related to: concurrency, mutexes, semaphores, rate limiting.My flow is related to web3 transactions. - paralellizing transaction writes create nonce errors - but we're ok to wait for transactions in parallel, it's just reading. However this operation is slower, taking at least 6 seconds since tx submit. Sending transactions are really fast. Using tag based concurrency creates a bottleneck, even if the delay is set to be 1 second. Ideally, a queue system would be the best. pseudocode:
# limiting concurrency of tag would lead to a long time
@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
# might fail
tx_hash = web3.eth.send_raw_transaction(tx)
return tx_hash
@task
def wait_for_tx(tx_hash: str):
return web3.eth.wait_for_transaction_receipt(tx_hash)
@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
tx_hash = task_send_tx.submit(tx_params)
return wait_for_tx.submit(tx_hash)
@flow
def my_flow():
# suppose this return 600 items. It would take 600 seconds to run on 1 second delay
many_params = gather_params()
all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
wait(all_tasks)
ideally, if I was freeNate
10/30/2024, 6:55 PMNate
10/30/2024, 6:56 PMMarvin
10/30/2024, 6:56 PMconcurrency
context manager to control the number of certain tasks or operations, such as database connections, allowed at one time.
- For example, to limit database connections, you can use:
python
from prefect import flow, task, concurrency
@task
def database_query(query):
with concurrency("database", occupy=1): # Limits to one query at a time
# Perform database operations here
pass
- More details can be found here.
2. Prefect 3.x:
- Prefect 3.x also provides a similar mechanism. You can define concurrency limits through the CLI, and use functions such as rate_limit
to throttle task submissions. This is particularly useful for scenarios requiring controlled resource utilization.
- An example of task submission control:
python
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
@task
def my_task(i):
return i
@flow
def my_flow():
for _ in range(100):
rate_limit("slow-my-flow", occupy=1)
my_task.submit(1)
- More information is available here.
These tools allow you to ensure that your tasks do not exceed certain limits, thereby preventing overloading resources or violating any external rate limits. If you have further questions or specific use-cases in mind, feel free to ask!Raffael Campos
10/30/2024, 7:18 PMNate
10/30/2024, 7:18 PM