Giuliano Mega
11/04/2022, 4:22 PMimport time
from prefect import flow, task
from lib.prefect import get_run_logger
@task
def get_inputs():
return range(1, 600)
@task(tags=['sequential-scraper'])
def process_input(item):
logger = get_run_logger()
<http://logger.info|logger.info>(f'Processing item {item}')
time.sleep(1)
<http://logger.info|logger.info>(f'Processed item {item}')
@flow(description='Example of a very simple, API scraping-like Pipeline. '
'Feel free to run it as it does not do anything harmful.')
def sample_pipeline():
new_tokens = get_inputs()
process_input.map(new_tokens)
And what I see essentially is my task runner (Job) taking a very long time until it gets wiped out without running any tasks at all, and the UI (Prefect cloud) leaves the flow in an eternal "running" state.
Enabling debugging shows a lot of:
16:18:26.175 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:26.868 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:27.375 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:28.344 | DEBUG | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
Am I misusing this in any way?Zanie
11/04/2022, 4:25 PMGiuliano Mega
11/04/2022, 4:26 PMZanie
11/04/2022, 4:27 PMasync
?Giuliano Mega
11/04/2022, 4:28 PMZanie
11/04/2022, 4:29 PMGiuliano Mega
11/04/2022, 5:22 PMimport asyncio
from prefect import flow, task
from lib.prefect import get_run_logger
@task
def get_inputs():
return range(1, 600)
@task(tags=['sequential-scraper'])
async def process_input(item):
logger = get_run_logger()
<http://logger.info|logger.info>(f'Processing item {item}')
await asyncio.sleep(1)
<http://logger.info|logger.info>(f'Processed item {item}')
@flow(description='Example of a very simple, API scraping-like Pipeline. '
'Feel free to run it as it does not do anything harmful.')
async def sample_pipeline():
await asyncio.gather(*[process_input(token) for token in get_inputs()])
17:24:37.724 | DEBUG | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/27f7ecb6-229f-4946-a25b-655a8a8fe9be/workspaces/2465f0dd-d6e4-49f6-bc80-d363ccc434b0/>
17:24:40.365 | DEBUG | Flow run 'transparent-coati' - Loading flow for deployment 'sample pipeline'...
17:24:40.590 | DEBUG | Flow run 'transparent-coati' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
17:24:40.592 | DEBUG | prefect.task_runner.concurrent - Starting task runner...
17:24:40.765 | DEBUG | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/27f7ecb6-229f-4946-a25b-655a8a8fe9be/workspaces/2465f0dd-d6e4-49f6-bc80-d363ccc434b0/>
17:24:41.300 | DEBUG | Flow run 'transparent-coati' - Executing flow 'sample-pipeline' for flow run 'transparent-coati'...
17:24:41.300 | DEBUG | Flow run 'transparent-coati' - Beginning execution...
17:24:42.378 | DEBUG | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/27f7ecb6-229f-4946-a25b-655a8a8fe9be/workspaces/2465f0dd-d6e4-49f6-bc80-d363ccc434b0/>
Kelvin DeCosta
12/19/2022, 4:09 PMI believe you may be running into an issue I’m actively working on where tasks are not yielding control of the thread when waiting. This can cause a deadlock once you move to scaleI'm running into the same problem with a lot of mapped tasks
Zanie
12/19/2022, 4:22 PMKelvin DeCosta
12/19/2022, 4:33 PM