Raffael Campos
10/30/2024, 6:49 PM# limiting concurrency of tag would lead to a long time
@task(tags=["send-tx"])
def task_send_tx(tx_params: dict):
# might fail
tx_hash = web3.eth.send_raw_transaction(tx)
return tx_hash
@task
def wait_for_tx(tx_hash: str):
return web3.eth.wait_for_transaction_receipt(tx_hash)
@task(retries=3, retry_delay_seconds=10)
def send_tx_and_wait(tx_params: dict):
tx_hash = task_send_tx.submit(tx_params)
return wait_for_tx.submit(tx_hash)
@flow
def my_flow():
# suppose this return 600 items. It would take 600 seconds to run on 1 second delay
many_params = gather_params()
all_tasks = [send_tx_and_wait.submit(param) for param in many_params]
wait(all_tasks)
ideally, if I was free to pass anything as params, I could use semaphores:
python
@task
def task_send_tx(tx_params: dict, semaphore: asyncio.Semaphore):
with semaphore:
return web3.eth.send_raw_transaction(tx)
but it seems I can't. Or can I? I received an error about not being able to get from context.
I would prefer to keep using separate tasks for each to leverage prefect features such as retrials, etc
Do you have any suggestions to perform this?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by