Jean-Baptiste Six
02/23/2022, 10:58 AMwith Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
result_locations = split_store_batches(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters=result_locations
)
With a "subflow":
with Flow("index_flow", result=GCSResult(BUCKET_RESULTS), state_handlers=[flow_handler]) as index_flow:
# Imports
metadatas_batch_location = Parameter("metadatas_batch_location", required=True)
metadatas_batch = get_result(metadatas_batch_location)
imports = build_import(metadatas_batch)
index_weaviate(imports)
But I have the following error :
prefect.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'dictionary update sequence element #0 has length 1; 2 is required', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
That I'm not sure to understand š I need some help please š
Same error for both 'batches':
⢠Task 'create_flow_run[0]': Exception encountered during task execution!
⢠Task 'create_flow_run[1]': Exception encountered during task execution!Anna Geller
split_store_batches
?Jean-Baptiste Six
02/23/2022, 11:17 AM@task(max_retries=1, retry_delay=timedelta(seconds=300), state_handlers=[fail_handler], log_stdout=True)
def split_store_batches(metadatas: List[dict]) -> List[str]:
metadatas_batches = [{"metadatas_batch": batch_value} for batch_value in list(batch(metadatas, BATCH_SIZE_INDEX))]
result_locations = []
with tempfile.TemporaryDirectory() as tmp_dir:
for metadatas_batch in metadatas_batches:
today = prefect.context.get("today").split("-") # type: ignore
file_id = uuid.uuid4()
path_file = f"{tmp_dir}/{file_id}"
result_location = f"{today[0]}/{today[1]}/{today[2]}/metadata_batches/{file_id}"
with open(path_file, "w") as file:
json.dump(metadatas_batch, file)
common.upload_to_bucket(storage_client, result_location, path_file, BUCKET_RESULTS)
result_locations.append(result_location)
return result_locations
Anna Geller
@task(max_retries=1, retry_delay=timedelta(seconds=300), state_handlers=[fail_handler], log_stdout=True)
def split_store_batches(metadatas: List[dict]) -> List[Dict[str, str]]:
metadatas_batches = [{"metadatas_batch": batch_value} for batch_value in list(batch(metadatas, BATCH_SIZE_INDEX))]
result_locations = []
with tempfile.TemporaryDirectory() as tmp_dir:
for metadatas_batch in metadatas_batches:
today = prefect.context.get("today").split("-") # type: ignore
file_id = uuid.uuid4()
path_file = f"{tmp_dir}/{file_id}"
result_location = f"{today[0]}/{today[1]}/{today[2]}/metadata_batches/{file_id}"
with open(path_file, "w") as file:
json.dump(metadatas_batch, file)
common.upload_to_bucket(storage_client, result_location, path_file, BUCKET_RESULTS)
result_locations.append({"metadatas_batch_location": result_location})
return result_locations
Jean-Baptiste Six
02/23/2022, 11:27 AMmapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters=result_locations
)
wait_for_flow_run(mapped_flow_run_ids)
Or maybe I have to do :
wait_for_flow_run.map(mapped_flow_run_ids)
?Anna Geller
wait_for_flow_run.map(flow_run_id=mapped_flow_run_ids, raise_final_state=unmapped(True), stream_logs=unmapped(True))