Many long introspection warnings in the following flow: ```@flow(task_runner=ConcurrentTaskRunner(),...
v
Many long introspection warnings in the following flow:
Copy code
@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def process_docs(payload):
    # Get  paths to large compressed archives
    archive_payloads = get_file_list(payload)
    for arch_payload in archive_payloads:
        # Download each file to local disk
        file_payload = download_file(allow_failure(arch_payload))
        # Extract each archive and return paths
        batch_paths = get_targz_members(quote(file_payload))
        # Batch paths by 5000 in each batch
        path_tuples = make_batches(batch_paths)
        for batch in path_tuples:
            # Write intermediate parquet files and return paths
            # Read path process data and write parquet
            combined_result = combine_batches(
                parse_paragraphs_batch.submit(batch),
                parse_citations_batch.submit(batch),
            )
            # Read parquet and reshape data
            reshape_data.submit(allow_failure(combined_result))
            # Remove intermediate results from disk
            remove_batch_inputs.submit(batch, wait_for=combined_result)
        # Remove downloaded file archives from disk
        remove_file_archive.submit(payload, wait_for=batch_paths)
Task run 'combine_batches-10' - Task parameter introspection took 257.256 seconds , exceeding
PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD
of 10.0. Try wrapping large task parameters with
prefect.utilities.annotations.quote
for increased performance, e.g.
my_task(quote(param))
. To disable this message set
PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD=0
. The workflow runs and completes, however takes much longer than anticipated with many introspection warnings, taking on average 250 seconds. Is there a way to optimize this flow?
n
hi @Vlad - as suggested by the error message, you can do
my_task(quote(param))
but note that when you do, you'll have to call
.result()
on whatever futures are within that quoted result, otherwise your task will receive an actual Future or a collection containing one alternative, you can redesign your tasks so that you pass a reference to large data to your task and then load the actual data inside the task
v
Thanks @Nate. I do pass a reference (path) to each input and generate paths as output in this workflow, so was surprised to see this warning