hey team! I have a question related to: concurrenc...
# ask-community
r
hey team! I have a question related to: concurrency, mutexes, semaphores, rate limiting. My flow is related to web3 transactions. - paralellizing transaction writes create nonce errors - but we're ok to wait for transactions in parallel, it's just reading. However this operation is slower, taking at least 6 seconds since tx submit. Sending transactions are really fast. Using tag based concurrency creates a bottleneck, even if the delay is set to be 1 second. Ideally, a queue system would be the best. pseudocode:
Copy code
# limiting concurrency of tag would lead to a long time

@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
    # might fail
    tx_hash = web3.eth.send_raw_transaction(tx)
    return tx_hash

@task
def wait_for_tx(tx_hash: str):
    return web3.eth.wait_for_transaction_receipt(tx_hash)

@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
    tx_hash = task_send_tx.submit(tx_params)
    return wait_for_tx.submit(tx_hash)

@flow
def my_flow():
    # suppose this return 600 items. It would take 600 seconds to run on 1 second delay
    many_params = gather_params()

    all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
    wait(all_tasks)
ideally, if I was free to pass anything as params, I could use semaphores:
Copy code
python
@task
def task_send_tx(tx_params: dict, semaphore: asyncio.Semaphore):
    with semaphore:
        return web3.eth.send_raw_transaction(tx)
but it seems I can't. Or can I? I received an error about not being able to get from context. I would prefer to keep using separate tasks for each to leverage prefect features such as retrials, etc Do you have any suggestions to perform this?