https://prefect.io logo
Title
d

Deepanshu Aggarwal

11/22/2022, 8:22 AM
hi everyone! im facing a weird issue in flow runs . im running prefect on aws eks ( self hosted instance of orion and kubernetes jobs on different namespaces ) 1. i have 9 running flows which i can see on the ui but i cant find the corresponding job on my cluster 2. when i check the concurrency-limit for the tags that these flow use i can see 14 task runs ( these flow have parallel task runs so some of them are having more than one task stuck in running state) so basically 9 running flows with 14 running tasks as i can see on the ui but i cant find the jobs on my cluster. doubt - how did these flow runs get terminated on the cluster and even if they did why did it not update in the ui or lets say why was the state not updated in meta db attaching screenshots for the same
9 running flows
14 concurrent running tasks
no jobs found
r

redsquare

11/22/2022, 8:33 AM
What do the agent logs say?
d

Deepanshu Aggarwal

11/22/2022, 8:41 AM
07:00:48.366 | ERROR   | prefect.agent - <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:19999, additional_data:None>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 108, in handle_async_request
    await self._send_request_headers(request=request, stream_id=stream_id)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 205, in _send_request_headers
    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
  File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 766, in send_headers
    self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
  File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 141, in handle_async_request
    raise RemoteProtocolError(self._connection_error_event)
httpcore.RemoteProtocolError: <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:19999, additional_data:None>
r

redsquare

11/22/2022, 8:44 AM
Possibly erroring trying to speak to k8s api to create the jobs
d

Deepanshu Aggarwal

11/22/2022, 8:45 AM
but jobs did get created , right ? cz like 98% of the tasks ran. it must have terminated in the middle somewhere without updating the state
r

redsquare

11/22/2022, 8:46 AM
The jobs should still be there
if they got created
c

Christopher Boyd

11/22/2022, 4:13 PM
Are they being created in that namespace, or another?
what if you run kubectl get pods -A
d

Deepanshu Aggarwal

11/22/2022, 4:14 PM
In the selected namespace
I mean the job has definitely terminated because it’s not something that would take more than 30min at max but has been in running state for somewhat 24 hours now
i think the issue is somehow the errors that task run into (which should mark the task failed) are not getting updated in the metadb
and neither are they coming up in logs on the ui
see the last line in the logs. but this is not reflected in the ui
t

Taylor Curran

11/23/2022, 2:03 PM
@Deepanshu Aggarwal code here:
from prefect import flow, task
from datetime import date, timedelta
from prefect_dask import DaskTaskRunner
from prefect.deployments import run_deployment


@task
def ingest_raw_customers(dataset_size):
    pass

@task
def ingest_raw_payments(dataset_size):
    pass

@task
def ingest_raw_orders(dataset_size, start_date, end_date):
    pass

# Parallel
@flow(task_runner=DaskTaskRunner)
def partition_ingestion_async(dataset_size, start_date, end_date, n_partitions):

    date_delta = end_date - start_date

    partition_period = date_delta / n_partitions

    for i in range(n_partitions):
        
        end_date = start_date + partition_period
        start_date = start_date+partition_period

        ingest_raw_orders.submit(start_date, end_date, dataset_size)

    return 'Done'
 
# Async
@flow
def partition_ingestion_async(dataset_size, start_date, end_date, n_partitions):

    date_delta = end_date - start_date

    partition_period = date_delta / n_partitions

    start_dates = []
    end_dates = []
    for i in range(n_partitions):
        
        start_dates.append(start_date) 
        end_dates.append(start_date + partition_period)
        start_date = start_date+partition_period

    ingest_raw_orders.map(start_dates, end_dates, dataset_size)

    return 'Done'

#k8s or separate container

@flow
def sub_flow():
  print('subflow')


@flow
def partition_ingestion_unique_pods(dataset_size, start_date, end_date, n_partitions):

    date_delta = end_date - start_date

    partition_period = date_delta / n_partitions

    for i in range(n_partitions):
        
        end_date = start_date + partition_period
        start_date = start_date+partition_period


        run_deployment(
            deployment='ingest_raw_orders/k8s_deployment', 
            parameters={
                'start_date': start_date, 
                'end_date': end_date, 
                'dataset_size':dataset_size}
                )



@flow()#retries=3, retry_delay_seconds=30)
def raw_data_jaffle_shop(
        start_date: date = date(2022, 2, 1),
        end_date: date = date.today(),
        dataset_size: int = 10_000,  # parametrized for backfills # ?
        n_partitions: int = 3
):

    ingest_raw_customers.submit(dataset_size)
    ingest_raw_payments.submit(dataset_size)

    partition_ingestion_async(dataset_size, start_date, end_date, n_partitions)


if __name__ == "__main__":
    raw_data_jaffle_shop(date(2022, 2, 1), date.today(), 1000, 4)