<@ULVA73B9P> I have a question related to: concurr...
# ask-marvin
r
@Marvin I have a question related to: concurrency, mutexes, semaphores, rate limiting. My flow is related to web3 transactions. - paralellizing transaction writes create nonce errors - but we're ok to wait for transactions in parallel, it's just reading. However this operation is slower, taking at least 6 seconds since tx submit. Sending transactions are really fast. Using tag based concurrency creates a bottleneck, even if the delay is set to be 1 second. Ideally, a queue system would be the best. pseudocode:
Copy code
# limiting concurrency of tag would lead to a long time

@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
    # might fail
    tx_hash = web3.eth.send_raw_transaction(tx)
    return tx_hash

@task
def wait_for_tx(tx_hash: str):
    return web3.eth.wait_for_transaction_receipt(tx_hash)

@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
    tx_hash = task_send_tx.submit(tx_params)
    return wait_for_tx.submit(tx_hash)

@flow
def my_flow():
    # suppose this return 600 items. It would take 600 seconds to run on 1 second delay
    many_params = gather_params()

    all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
    wait(all_tasks)
ideally, if I was free to pass anything as params, I could use semaphores:
Copy code
python
@task
def task_send_tx(tx_params: dict, semaphore: asyncio.Semaphore):
    with semaphore:
        return web3.eth.send_raw_transaction(tx)
but it seems I can't. Or can I? I received an error about not being able to get from context. I would prefer to keep using separate tasks for each to leverage prefect features such as retrials, etc Do you have any suggestions to perform this?
m
Your message was too long by 103 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I have a question related to: concurrency, mutexes, semaphores, rate limiting.
My flow is related to web3 transactions. - paralellizing transaction writes create nonce errors - but we're ok to wait for transactions in parallel, it's just reading. However this operation is slower, taking at least 6 seconds since tx submit. Sending transactions are really fast. Using tag based concurrency creates a bottleneck, even if the delay is set to be 1 second. Ideally, a queue system would be the best. pseudocode:
Copy code
# limiting concurrency of tag would lead to a long time

@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
    # might fail
    tx_hash = web3.eth.send_raw_transaction(tx)
    return tx_hash

@task
def wait_for_tx(tx_hash: str):
    return web3.eth.wait_for_transaction_receipt(tx_hash)

@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
    tx_hash = task_send_tx.submit(tx_params)
    return wait_for_tx.submit(tx_hash)

@flow
def my_flow():
    # suppose this return 600 items. It would take 600 seconds to run on 1 second delay
    many_params = gather_params()

    all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
    wait(all_tasks)
ideally, if I was free
n
hi @Raffael Campos i might checkout global concurrency limits
@Marvin tldr global concurrency limits usage in SDK, using the context manager
m
To manage global concurrency limits in Prefect, the SDK offers a way to control the number of concurrent tasks or operations using context managers. Here's a brief explanation for both Prefect 2.x and 3.x: 1. Prefect 2.x: - You can use the
concurrency
context manager to control the number of certain tasks or operations, such as database connections, allowed at one time. - For example, to limit database connections, you can use:
Copy code
python
     from prefect import flow, task, concurrency

     @task
     def database_query(query):
         with concurrency("database", occupy=1):  # Limits to one query at a time
             # Perform database operations here
             pass
- More details can be found here. 2. Prefect 3.x: - Prefect 3.x also provides a similar mechanism. You can define concurrency limits through the CLI, and use functions such as
rate_limit
to throttle task submissions. This is particularly useful for scenarios requiring controlled resource utilization. - An example of task submission control:
Copy code
python
     from prefect import flow, task
     from prefect.concurrency.sync import rate_limit

     @task
     def my_task(i):
         return i

     @flow
     def my_flow():
         for _ in range(100):
             rate_limit("slow-my-flow", occupy=1)
             my_task.submit(1)
- More information is available here. These tools allow you to ensure that your tasks do not exceed certain limits, thereby preventing overloading resources or violating any external rate limits. If you have further questions or specific use-cases in mind, feel free to ask!
r
hey @Nate, thanks for your prompt response, it's very enough for this case. I misread the global part as being something else, I totally missed it was exactly what I needed
n
catjam
🙌 1