Deepanshu Aggarwal
11/22/2022, 8:22 AMredsquare
11/22/2022, 8:33 AMDeepanshu Aggarwal
11/22/2022, 8:41 AM07:00:48.366 | ERROR | prefect.agent - <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:19999, additional_data:None>
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 224, in process_input
func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 108, in handle_async_request
await self._send_request_headers(request=request, stream_id=stream_id)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 205, in _send_request_headers
self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 766, in send_headers
self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 228, in process_input
raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 141, in handle_async_request
raise RemoteProtocolError(self._connection_error_event)
httpcore.RemoteProtocolError: <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:19999, additional_data:None>
redsquare
11/22/2022, 8:44 AMDeepanshu Aggarwal
11/22/2022, 8:45 AMredsquare
11/22/2022, 8:46 AMChristopher Boyd
11/22/2022, 4:13 PMDeepanshu Aggarwal
11/22/2022, 4:14 PMTaylor Curran
11/23/2022, 2:03 PMfrom prefect import flow, task
from datetime import date, timedelta
from prefect_dask import DaskTaskRunner
from prefect.deployments import run_deployment
@task
def ingest_raw_customers(dataset_size):
pass
@task
def ingest_raw_payments(dataset_size):
pass
@task
def ingest_raw_orders(dataset_size, start_date, end_date):
pass
# Parallel
@flow(task_runner=DaskTaskRunner)
def partition_ingestion_async(dataset_size, start_date, end_date, n_partitions):
date_delta = end_date - start_date
partition_period = date_delta / n_partitions
for i in range(n_partitions):
end_date = start_date + partition_period
start_date = start_date+partition_period
ingest_raw_orders.submit(start_date, end_date, dataset_size)
return 'Done'
# Async
@flow
def partition_ingestion_async(dataset_size, start_date, end_date, n_partitions):
date_delta = end_date - start_date
partition_period = date_delta / n_partitions
start_dates = []
end_dates = []
for i in range(n_partitions):
start_dates.append(start_date)
end_dates.append(start_date + partition_period)
start_date = start_date+partition_period
ingest_raw_orders.map(start_dates, end_dates, dataset_size)
return 'Done'
#k8s or separate container
@flow
def sub_flow():
print('subflow')
@flow
def partition_ingestion_unique_pods(dataset_size, start_date, end_date, n_partitions):
date_delta = end_date - start_date
partition_period = date_delta / n_partitions
for i in range(n_partitions):
end_date = start_date + partition_period
start_date = start_date+partition_period
run_deployment(
deployment='ingest_raw_orders/k8s_deployment',
parameters={
'start_date': start_date,
'end_date': end_date,
'dataset_size':dataset_size}
)
@flow()#retries=3, retry_delay_seconds=30)
def raw_data_jaffle_shop(
start_date: date = date(2022, 2, 1),
end_date: date = date.today(),
dataset_size: int = 10_000, # parametrized for backfills # ?
n_partitions: int = 3
):
ingest_raw_customers.submit(dataset_size)
ingest_raw_payments.submit(dataset_size)
partition_ingestion_async(dataset_size, start_date, end_date, n_partitions)
if __name__ == "__main__":
raw_data_jaffle_shop(date(2022, 2, 1), date.today(), 1000, 4)