Raffael Campos
10/30/2024, 6:49 PM# limiting concurrency of tag would lead to a long time
@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
# might fail
tx_hash = web3.eth.send_raw_transaction(tx)
return tx_hash
@task
def wait_for_tx(tx_hash: str):
return web3.eth.wait_for_transaction_receipt(tx_hash)
@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
tx_hash = task_send_tx.submit(tx_params)
return wait_for_tx.submit(tx_hash)
@flow
def my_flow():
# suppose this return 600 items. It would take 600 seconds to run on 1 second delay
many_params = gather_params()
all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
wait(all_tasks)
ideally, if I was free to pass anything as params, I could use semaphores:
python
@task
def task_send_tx(tx_params: dict, semaphore: asyncio.Semaphore):
with semaphore:
return web3.eth.send_raw_transaction(tx)
but it seems I can't. Or can I? I received an error about not being able to get from context.
I would prefer to keep using separate tasks for each to leverage prefect features such as retrials, etc
Do you have any suggestions to perform this?