https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Mathijs Carlu

11/04/2022, 9:25 AM
Hi all, I'm currently running Prefect 2.6.5 on Kubernetes, running all flows in Kubernetes Jobs with flows baked in docker images. I seem to run into task run crashes which cause my flow run to crash, while I have retrying on these specific tasks. The task that reports the crash is a mapped task, over 12 inputs. The task is executed in a loop, with sleeping in between loop iterations. It mostly manages to execute a certain amount of iterations before crashing (like 30ish). Does anyone have an idea on how I could solve this, or what exactly is going wrong? Stack trace in thread
Crash detected! Execution was cancelled by the runtime environment.
09:50:37 AM
machine_data-7699a41e-398
DEBUG
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1334, in report_task_run_crashes
yield
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1070, in begin_task_run
connect_error = await client.api_healthcheck()
File "/usr/local/lib/python3.8/site-packages/prefect/client/orion.py", line 204, in api_healthcheck
await self._client.get("/health")
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 159, in send
await super().send(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 218, in handle_async_request
async with self._pool_lock:
File "/usr/local/lib/python3.8/site-packages/httpcore/_synchronization.py", line 15, in __aenter__
await self._lock.acquire()
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py", line 130, in acquire
await event.wait()
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1842, in wait
if await self._event.wait():
File "/usr/local/lib/python3.8/asyncio/locks.py", line 309, in wait
await fut
asyncio.exceptions.CancelledError
09:50:37 AM
machine_data-7699a41e-398
ERROR
Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 910, in write
n = self._sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 33, in read
return await self._stream.receive(max_bytes=max_bytes)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive
raise self._protocol.exception
anyio.BrokenResourceError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
raise exc
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
) = await self._receive_response_headers(**kwargs)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
data = await self._network_stream.read(
File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 35, in read
return b""
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.ReadError
The above exception was the direct cause of the following exception:
httpx.ReadError
s

Sebastian Pabst

11/04/2022, 1:29 PM
I have the exact same issue. I try to retrieve a bunch of blob files from azure via a map in prefect and some of them crash (don't fail) with a broken Pipe error. I also noticed that crashed tasks are not retried, while failed ones do (tried to add a non existing file in the loop). I am running version 2.6.5 too
f

Faheem Khan

11/05/2022, 9:10 AM
Got the same issue but didn't see anything crashing, prefect 2.6.6
b

Benny Warlick

11/05/2022, 6:48 PM
I am getting similar httpx errors and BrokenPipeError, and then a crashed flow. My VM also died (Google Compute Engine) that is running the Prefect agent and running flows locally in the VM. Not sure if the VM died first and caused the flow to crash or the other way around.
f

Faheem Khan

11/07/2022, 11:16 PM
did anyone find any solution?
m

Mathijs Carlu

11/08/2022, 7:22 AM
Nope, I'll file a github issue since my question was kind of skipped by the team
a

Andreas Nigg

11/08/2022, 7:34 AM
Mathijs do you have an example flow which can easily reproduce this issue? I also had this problem once but could not really reproduce it in a minimal example - and the prefect team had a hard time to do something, as also they were not really able to reproduce as well. So if you could add an example flow to reproduce to the github issue - this would be awesome I guess. Would you mind to link the github issue here? I'd like to follow the issue as well.
m

Mathijs Carlu

11/08/2022, 7:46 AM
Copy code
@flow(persist_result=True, result_storage='azure/default')
def flow(input_date: Union[datetime, None]=None):
    logger = get_run_logger()

    user = ''
    pwd = ''
    logger_name = ''
    logger_pwd = '' 

    machine_speed = {'Martin': 'diSpeed', 'Transline': 'diSpeed', 'Compact': 'diSpeed', 'Ward': 'diSpeed',
                     'Geopfert': 'diSpeed', 'Straplijn': 'rSpeed', 'Emba': 'diSpeed', 'Magnus': 'diSpeed',
                     'Mark2': 'diSpeed', 'Rapidline': 'diSpeed', 'Mark1': 'diSpeed', 'Copar': 'Speed'}

    timezone = pytz.timezone('Europe/Brussels')

    # Calculate datetimes for shifts
    context = get_run_context()
    date = context.flow_run.expected_start_time.astimezone(pytz.utc).date() if input_date is None else input_date.date()
    shift_duration = timedelta(hours=5, minutes=30) if date.isoweekday() == 5 else timedelta(hours=8)
    start_first_shift = timezone.localize(datetime.combine(date, time(hour=5, minute=0, second=0)))
    end_first_shift = start_first_shift + shift_duration
    start_first_shift_utc = start_first_shift.astimezone(pytz.utc)
    end_first_shift_utc = end_first_shift.astimezone(pytz.utc)

    start_second_shift = end_first_shift
    end_second_shift = start_second_shift + shift_duration
    start_second_shift_utc = start_second_shift.astimezone(pytz.utc)
    end_second_shift_utc = end_second_shift.astimezone(pytz.utc)


    current_start = start_first_shift_utc
    current_end = current_start + timedelta(seconds=INTERVAL)
    df = pd.DataFrame.from_records(
        [{'Tags.Machine': machine, 'Tags.Shift': int(1), 'Tags.DowntimeIndex': int(0), f"Fields.Speed": 0.0,
          "Timestamp": start_first_shift_utc, 'Fields.Duration': 0.0} for machine in machine_speed.keys()])

    insert(get_token(logger_name, logger_pwd), df)

    while current_end <= end_first_shift_utc:
        data = machine_data.map(get_token(user, pwd), machine_speed.keys(), machine_speed.values(), current_start, current_end)
        df = concatenate_dfs(df, data)
        df = process_data(df, start_first_shift_utc, end_first_shift_utc, 1)

        insert(get_token(logger_name, logger_pwd), df[df['Timestamp'] > current_start])

        df = df[df['Timestamp'] >= current_end]

        current_start += timedelta(seconds=INTERVAL)
        current_end += timedelta(seconds=INTERVAL)

        sec_until_next_loop = (current_end - datetime.now(tz=pytz.utc)).total_seconds() + OFFSET
        if sec_until_next_loop > 0:
            logger.warning("Sleeping...")
            sleep(sec_until_next_loop)

    while current_end <= end_second_shift_utc:
        data = machine_data.map(get_token(user, pwd), machine_speed.keys(), machine_speed.values(), current_start, current_end)
        df = concatenate_dfs(df, data)
        df = process_data(df, start_second_shift_utc, end_second_shift_utc, 2)

        insert(get_token(logger_name, logger_pwd), df[df['Timestamp'] > current_start])

        df = df[df['Timestamp'] >= current_end]

        current_start += timedelta(seconds=INTERVAL)
        current_end += timedelta(seconds=INTERVAL)

        sec_until_next_loop = (current_end - datetime.now(tz=pytz.utc)).total_seconds() + OFFSET
        if sec_until_next_loop > 0:
            logger.warning("Sleeping...")
            sleep(sec_until_next_loop)
It probably could be coded more efficiently, but it's a temporary solution. What it does, every 5 minutes: 1. Requests data from our data API (mapped, 12 queries -> 12 tasks) 2. Concatenate with previous 'left-over' data 3. Do some processing on the data 4. Send this data back to our data API The normal duration of this flow is 16 hours max. It already managed to do so as well. The discrepancy between times until failure is quite big as well: sometimes it fails after 10 minutes, sometimes only after 4 hours. I originally just used local storage, which worked. Since I'm having issues, I tried to persist results to Azure Blob Storage, but this hasn't really made any improvements...
@Andreas Nigg I also have not been able to produce a minimal working example... There seems to be already an issue created for this issue though: https://github.com/PrefectHQ/prefect/issues/7472
z

Zanie

11/10/2022, 4:36 AM
Hi everyone! We really need a MRE to resolve this efficiently. Please leave a comment in the issue if you manage to reproduce this with a simple example.
It’d also be very helpful to get an idea of what versions are affected and if you are using Prefect Cloud or a local server.
3 Views