Thread
#prefect-community
    Jean-Baptiste Six

    Jean-Baptiste Six

    7 months ago
    Hi ! I have the following flow :
    with Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
        root_dir = init_dirs("sec", archive=False)
        download_dir = download(root_dir, False)
        metadatas = metadata(download_dir)
        upload(metadatas)
        result_locations = split_store_batches(metadatas)
        mapped_flow_run_ids = create_flow_run.map(
            flow_name=unmapped("index_flow"),
            project_name=unmapped("Document Pipeline"),
            parameters=result_locations
        )
    With a "subflow":
    with Flow("index_flow", result=GCSResult(BUCKET_RESULTS), state_handlers=[flow_handler]) as index_flow:
        # Imports
        metadatas_batch_location = Parameter("metadatas_batch_location", required=True)
        metadatas_batch = get_result(metadatas_batch_location)
        imports = build_import(metadatas_batch)
        index_weaviate(imports)
    But I have the following error :
    prefect.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'dictionary update sequence element #0 has length 1; 2 is required', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    That I'm not sure to understand šŸ˜• I need some help please šŸ™ Same error for both 'batches': ā€¢ Task 'create_flow_run[0]': Exception encountered during task execution! ā€¢ Task 'create_flow_run[1]': Exception encountered during task execution!
    Anna Geller

    Anna Geller

    7 months ago
    Can you share the definition of
    split_store_batches
    ?
    Jean-Baptiste Six

    Jean-Baptiste Six

    7 months ago
    Sure šŸ™‚
    @task(max_retries=1, retry_delay=timedelta(seconds=300), state_handlers=[fail_handler], log_stdout=True)
    def split_store_batches(metadatas: List[dict]) -> List[str]:
        metadatas_batches = [{"metadatas_batch": batch_value} for batch_value in list(batch(metadatas, BATCH_SIZE_INDEX))]
        result_locations = []
        with tempfile.TemporaryDirectory() as tmp_dir:
            for metadatas_batch in metadatas_batches:
                today = prefect.context.get("today").split("-") # type: ignore
                file_id = uuid.uuid4()
                path_file = f"{tmp_dir}/{file_id}"
                result_location = f"{today[0]}/{today[1]}/{today[2]}/metadata_batches/{file_id}"
                with open(path_file, "w") as file:
                    json.dump(metadatas_batch, file)
                common.upload_to_bucket(storage_client, result_location, path_file, BUCKET_RESULTS)
                result_locations.append(result_location)
        return result_locations
    Anna Geller

    Anna Geller

    7 months ago
    The problem here is that parameters argument on the create_flow_run task expects a dictionary in the format {"param_name": param_value} and you are currently passing just a string with parameter value. This should fix it:
    @task(max_retries=1, retry_delay=timedelta(seconds=300), state_handlers=[fail_handler], log_stdout=True)
    def split_store_batches(metadatas: List[dict]) -> List[Dict[str, str]]:
        metadatas_batches = [{"metadatas_batch": batch_value} for batch_value in list(batch(metadatas, BATCH_SIZE_INDEX))]
        result_locations = []
        with tempfile.TemporaryDirectory() as tmp_dir:
            for metadatas_batch in metadatas_batches:
                today = prefect.context.get("today").split("-") # type: ignore
                file_id = uuid.uuid4()
                path_file = f"{tmp_dir}/{file_id}"
                result_location = f"{today[0]}/{today[1]}/{today[2]}/metadata_batches/{file_id}"
                with open(path_file, "w") as file:
                    json.dump(metadatas_batch, file)
                common.upload_to_bucket(storage_client, result_location, path_file, BUCKET_RESULTS)
                result_locations.append({"metadatas_batch_location": result_location})
        return result_locations
    Jean-Baptiste Six

    Jean-Baptiste Six

    7 months ago
    Ah yes ! You already told me that, I just forgot after some manipulations ! Thx šŸ™
    It's working šŸ™‚ Should I use 'wait_for_flow_run' ? If yes, how to use it with the map ? Smth like this ?
    mapped_flow_run_ids = create_flow_run.map(
            flow_name=unmapped("index_flow"),
            project_name=unmapped("Document Pipeline"),
            parameters=result_locations
        )
    wait_for_flow_run(mapped_flow_run_ids)
    Or maybe I have to do :
    wait_for_flow_run.map(mapped_flow_run_ids)
    ?
    Anna Geller

    Anna Geller

    7 months ago
    Nice work! The latter should work and you can also pass extra arguments which are quite useful to see the logs in the UI:
    wait_for_flow_run.map(flow_run_id=mapped_flow_run_ids, raise_final_state=unmapped(True), stream_logs=unmapped(True))