Vlad
06/06/2024, 12:02 PM@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def process_docs(payload):
# Get paths to large compressed archives
archive_payloads = get_file_list(payload)
for arch_payload in archive_payloads:
# Download each file to local disk
file_payload = download_file(allow_failure(arch_payload))
# Extract each archive and return paths
batch_paths = get_targz_members(quote(file_payload))
# Batch paths by 5000 in each batch
path_tuples = make_batches(batch_paths)
for batch in path_tuples:
# Write intermediate parquet files and return paths
# Read path process data and write parquet
combined_result = combine_batches(
parse_paragraphs_batch.submit(batch),
parse_citations_batch.submit(batch),
)
# Read parquet and reshape data
reshape_data.submit(allow_failure(combined_result))
# Remove intermediate results from disk
remove_batch_inputs.submit(batch, wait_for=combined_result)
# Remove downloaded file archives from disk
remove_file_archive.submit(payload, wait_for=batch_paths)
Task run 'combine_batches-10' - Task parameter introspection took 257.256 seconds , exceeding PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD
of 10.0. Try wrapping large task parameters with prefect.utilities.annotations.quote
for increased performance, e.g. my_task(quote(param))
. To disable this message set PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD=0
.
The workflow runs and completes, however takes much longer than anticipated with many introspection warnings, taking on average 250 seconds.
Is there a way to optimize this flow?Nate
06/07/2024, 6:51 PMmy_task(quote(param))
but note that when you do, you'll have to call .result()
on whatever futures are within that quoted result, otherwise your task will receive an actual Future or a collection containing one
alternative, you can redesign your tasks so that you pass a reference to large data to your task and then load the actual data inside the taskVlad
06/12/2024, 2:28 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by