Hey folks, me again. :slightly_smiling_face: I'm t...
# prefect-community
g
Hey folks, me again. 🙂 I'm trying to get a grip as to how Prefect behaves when you have somewhat fine-grained tasks and try to enforce a concurrency limit on them. I'm running it on a GKE cluster with autopilot. My understanding is that tag-based concurrency limits are enforced clusterwise, meaning there must be some central mechanism for coordination, is that assumption correct? Then, I'm trying the following flow, which is the kind of thing I'll have to run when doing external API calls to enrich thousands of data items:
Copy code
import time

from prefect import flow, task

from lib.prefect import get_run_logger


@task
def get_inputs():
    return range(1, 600)


@task(tags=['sequential-scraper'])
def process_input(item):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f'Processing item {item}')
    time.sleep(1)
    <http://logger.info|logger.info>(f'Processed item {item}')


@flow(description='Example of a very simple, API scraping-like Pipeline. ' 
                  'Feel free to run it as it does not do anything harmful.')
def sample_pipeline():
    new_tokens = get_inputs()
    process_input.map(new_tokens)
And what I see essentially is my task runner (Job) taking a very long time until it gets wiped out without running any tasks at all, and the UI (Prefect cloud) leaves the flow in an eternal "running" state. Enabling debugging shows a lot of:
Copy code
16:18:26.175 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:26.868 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:27.375 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
16:18:28.344 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
Am I misusing this in any way?
z
Hi! The WAIT instructions are indeed coming from the API which is in the central coordinator of the concurrency limits here.
Are all of the tasks stuck or are they being worked through slowly?
g
They look stuck, then eventually the Job dies. Will see if leaves any useful messages behind
z
I believe you may be running into an issue I’m actively working on where tasks are not yielding control of the thread when waiting. This can cause a deadlock once you move to scale
👀 1
Can you try changing your task/flow to
async
?
g
Ah I see, does it make any better if I use async?
ahah right, my thought exactly
lemme try
z
The concurrent task runner puts your sync tasks in threads to make things concurrent 😄 If you use async tasks they’ll all just run on the event loop in a single thread
g
so, I changed my flow to:
Copy code
import asyncio

from prefect import flow, task

from lib.prefect import get_run_logger


@task
def get_inputs():
    return range(1, 600)


@task(tags=['sequential-scraper'])
async def process_input(item):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f'Processing item {item}')
    await asyncio.sleep(1)
    <http://logger.info|logger.info>(f'Processed item {item}')


@flow(description='Example of a very simple, API scraping-like Pipeline. '
                  'Feel free to run it as it does not do anything harmful.')
async def sample_pipeline():
    await asyncio.gather(*[process_input(token) for token in get_inputs()])
and now it appears to be stuck 😕 all I see in the task runner logs is:
Copy code
17:24:37.724 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/27f7ecb6-229f-4946-a25b-655a8a8fe9be/workspaces/2465f0dd-d6e4-49f6-bc80-d363ccc434b0/>
17:24:40.365 | DEBUG   | Flow run 'transparent-coati' - Loading flow for deployment 'sample pipeline'...
17:24:40.590 | DEBUG   | Flow run 'transparent-coati' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
17:24:40.592 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
17:24:40.765 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/27f7ecb6-229f-4946-a25b-655a8a8fe9be/workspaces/2465f0dd-d6e4-49f6-bc80-d363ccc434b0/>
17:24:41.300 | DEBUG   | Flow run 'transparent-coati' - Executing flow 'sample-pipeline' for flow run 'transparent-coati'...
17:24:41.300 | DEBUG   | Flow run 'transparent-coati' - Beginning execution...
17:24:42.378 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/27f7ecb6-229f-4946-a25b-655a8a8fe9be/workspaces/2465f0dd-d6e4-49f6-bc80-d363ccc434b0/>
Nothing about my tasks 😕 I tried redeploying the agent to no avail. In this particular case I guess I could switch to the sequential task runner with synchronous tasks and all would be OK
k
Hey @Zanie, how are you doing? I hope I'm not bothering you here but I'd like to know if the issue is resolved:
I believe you may be running into an issue I’m actively working on where tasks are not yielding control of the thread when waiting. This can cause a deadlock once you move to scale
I'm running into the same problem with a lot of mapped tasks
z
Not yet! I’m working on rehauling our async runtime (which is a slow / large task), hopefully done soon!
k
Thanks for the quick reply!