https://prefect.io logo
j

Jean-Baptiste Six

02/23/2022, 10:58 AM
Hi ! I have the following flow :
Copy code
with Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
    root_dir = init_dirs("sec", archive=False)
    download_dir = download(root_dir, False)
    metadatas = metadata(download_dir)
    upload(metadatas)
    result_locations = split_store_batches(metadatas)
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=unmapped("index_flow"),
        project_name=unmapped("Document Pipeline"),
        parameters=result_locations
    )
With a "subflow":
Copy code
with Flow("index_flow", result=GCSResult(BUCKET_RESULTS), state_handlers=[flow_handler]) as index_flow:
    # Imports
    metadatas_batch_location = Parameter("metadatas_batch_location", required=True)
    metadatas_batch = get_result(metadatas_batch_location)
    imports = build_import(metadatas_batch)
    index_weaviate(imports)
But I have the following error :
Copy code
prefect.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'dictionary update sequence element #0 has length 1; 2 is required', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
That I'm not sure to understand šŸ˜• I need some help please šŸ™ Same error for both 'batches': • Task 'create_flow_run[0]': Exception encountered during task execution! • Task 'create_flow_run[1]': Exception encountered during task execution!
a

Anna Geller

02/23/2022, 11:15 AM
Can you share the definition of
split_store_batches
?
j

Jean-Baptiste Six

02/23/2022, 11:17 AM
Sure šŸ™‚
Copy code
@task(max_retries=1, retry_delay=timedelta(seconds=300), state_handlers=[fail_handler], log_stdout=True)
def split_store_batches(metadatas: List[dict]) -> List[str]:
    metadatas_batches = [{"metadatas_batch": batch_value} for batch_value in list(batch(metadatas, BATCH_SIZE_INDEX))]
    result_locations = []
    with tempfile.TemporaryDirectory() as tmp_dir:
        for metadatas_batch in metadatas_batches:
            today = prefect.context.get("today").split("-") # type: ignore
            file_id = uuid.uuid4()
            path_file = f"{tmp_dir}/{file_id}"
            result_location = f"{today[0]}/{today[1]}/{today[2]}/metadata_batches/{file_id}"
            with open(path_file, "w") as file:
                json.dump(metadatas_batch, file)
            common.upload_to_bucket(storage_client, result_location, path_file, BUCKET_RESULTS)
            result_locations.append(result_location)
    return result_locations
a

Anna Geller

02/23/2022, 11:22 AM
The problem here is that parameters argument on the create_flow_run task expects a dictionary in the format {"param_name": param_value} and you are currently passing just a string with parameter value. This should fix it:
Copy code
@task(max_retries=1, retry_delay=timedelta(seconds=300), state_handlers=[fail_handler], log_stdout=True)
def split_store_batches(metadatas: List[dict]) -> List[Dict[str, str]]:
    metadatas_batches = [{"metadatas_batch": batch_value} for batch_value in list(batch(metadatas, BATCH_SIZE_INDEX))]
    result_locations = []
    with tempfile.TemporaryDirectory() as tmp_dir:
        for metadatas_batch in metadatas_batches:
            today = prefect.context.get("today").split("-") # type: ignore
            file_id = uuid.uuid4()
            path_file = f"{tmp_dir}/{file_id}"
            result_location = f"{today[0]}/{today[1]}/{today[2]}/metadata_batches/{file_id}"
            with open(path_file, "w") as file:
                json.dump(metadatas_batch, file)
            common.upload_to_bucket(storage_client, result_location, path_file, BUCKET_RESULTS)
            result_locations.append({"metadatas_batch_location": result_location})
    return result_locations
j

Jean-Baptiste Six

02/23/2022, 11:27 AM
Ah yes ! You already told me that, I just forgot after some manipulations ! Thx šŸ™
šŸ‘ 1
It's working šŸ™‚ Should I use 'wait_for_flow_run' ? If yes, how to use it with the map ? Smth like this ?
Copy code
mapped_flow_run_ids = create_flow_run.map(
        flow_name=unmapped("index_flow"),
        project_name=unmapped("Document Pipeline"),
        parameters=result_locations
    )
wait_for_flow_run(mapped_flow_run_ids)
Or maybe I have to do :
Copy code
wait_for_flow_run.map(mapped_flow_run_ids)
?
a

Anna Geller

02/23/2022, 2:30 PM
Nice work! The latter should work and you can also pass extra arguments which are quite useful to see the logs in the UI:
Copy code
wait_for_flow_run.map(flow_run_id=mapped_flow_run_ids, raise_final_state=unmapped(True), stream_logs=unmapped(True))