Austin Weisgrau
02/27/2023, 9:34 PMZanie
02/27/2023, 9:47 PMAustin Weisgrau
02/27/2023, 11:04 PMZanie
02/28/2023, 1:35 AMAustin Weisgrau
02/28/2023, 1:38 AMfrom concurrent.futures import ThreadPoolExecutor
values = fetch_values()
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(make_api_call, values)
in prefect I'm working with the default concurrent task runner:
@flow
def run_pipeline():
values = fetch_values()
make_api_call.map(values)
requests
packageZanie
02/28/2023, 2:13 AMAustin Weisgrau
02/28/2023, 2:25 AMimport time
from prefect import flow, task
class Semaphore:
slots: int = 2
@task
def concurrent_task(value, semaphore):
while True:
if semaphore.slots:
semaphore.slots += -1
time.sleep(1) # Do whatever I actually want to do here (e.g. make an API call)
semaphore.slots += 1
return value
else:
time.sleep(0.1)
@flow
def my_flow():
values = range(30)
semaphore = Semaphore()
concurrent_task.map(values, semaphore)
if __name__ == "__main__":
my_flow()
Will Raphaelson
03/02/2023, 10:34 PMZanie
03/02/2023, 10:35 PMthreading.Semaphore
tooAustin Weisgrau
03/02/2023, 10:35 PMZanie
03/02/2023, 10:38 PMAustin Weisgrau
03/02/2023, 10:38 PMimport time
from functools import wraps
from threading import Semaphore
from typing import Callable
from prefect import flow, task
def limit_concurrency(
max_workers: int,
) -> Callable[[Callable], Callable]:
semaphore = Semaphore(max_workers)
def pseudo_decorator(func: Callable):
@wraps(func)
def limited_concurrent_func(*args, **kwargs):
semaphore.acquire()
result = func(*args, **kwargs)
semaphore.release()
return result
return limited_concurrent_func
return pseudo_decorator
@task
@limit_concurrency(max_workers=2)
def concurrent_task(value):
print(f"starting {value}")
time.sleep(1)
print(f"ending {value}")
@flow
def my_flow():
values = range(30)
concurrent_task.map(values)
if __name__ == "__main__":
my_flow()
Zanie
03/02/2023, 10:41 PMAustin Weisgrau
03/02/2023, 10:41 PMZanie
03/02/2023, 10:41 PMAustin Weisgrau
03/02/2023, 10:44 PMZanie
03/02/2023, 10:46 PMAustin Weisgrau
03/20/2023, 5:56 PMwhile True:
response = requests.get(some_url)
if response['success']:
return
else:
sleep(30)
It's not using barely any memory but the CPU is maxing out. Any idea why that would happen?Zanie
03/20/2023, 6:11 PMAustin Weisgrau
03/20/2023, 6:13 PMZanie
03/20/2023, 6:14 PMstrace
might helpCody Frisby
03/23/2023, 4:48 PMvalues
object. My expectation is that this would create as many workers and threads as specified by the max_workers
variable. For my test case I have tens of thousands of elements I need to operate on and when using this solution Prefect creates tens of thousands of tasks before working on any of them. What am I missing? What am I not understanding about prefect? Thanks!Austin Weisgrau
03/23/2023, 4:49 PMZanie
03/23/2023, 4:50 PMAustin Weisgrau
03/23/2023, 4:53 PMCody Frisby
03/23/2023, 4:57 PMZanie
03/23/2023, 4:58 PM