Marco Palmeri
11/18/2020, 6:10 PMextract_region_transactions
starts running as soon as any of the decrypt_file_task
worker tasks complete. -- See code snippet in thread.
``````Jim Crist-Harif
11/18/2020, 6:26 PMextract_region_transactions
tasks depend on the decrypted_files
task - they only depend on decrypted_input_paths_by_partition_key
(which only modify the paths, not the values). One way to fix this would be to pass upstream_tasks=[decrypted_files]
to the call to transform_input_paths
in the first case block. This tells prefect that that task depends on the previous task, but not necessarily its resulting values (useful for things that pass around data via files, rather than in memory).Marco Palmeri
11/18/2020, 6:26 PMJim Crist-Harif
11/18/2020, 6:27 PMMarco Palmeri
11/18/2020, 6:29 PM# First get all inputs for each partition key
raw_input_paths_by_partition_key = get_input_paths_by_partition_key(
data_config, start_date, end_date, run_backfill
)
# Decrypt files, if necessary
with case(task(lambda s: s is not None)(decryption_secret), True):
# get a flattened set of input files to decrypt
flat_set_input_files = flatten_input_files(
data_config, raw_input_paths_by_partition_key
)
# decrypt files in parallel
decrypted_files = decrypt_file_task.map(
unmapped(data_config), unmapped(decryption_secret), flat_set_input_files
)
# transform input_paths_by_partition_key to have the decrypted paths
decrypted_input_paths_by_partition_key = transform_input_paths(
data_config, raw_input_paths_by_partition_key
)
with case(task(lambda s: s is not None)(decryption_secret), False):
non_decrypted_input_paths_by_partition_key = raw_input_paths_by_partition_key
skip_file_decryption = skip_decrypt_file_task(data_config, None, None)
# Merge branches of DAG
input_paths_by_partition_key = merge(
decrypted_input_paths_by_partition_key, non_decrypted_input_paths_by_partition_key,
)
decrypt_or_skip = merge(decrypted_files, skip_file_decryption)
# Extract to BQ -- run in parallel, for each partition.
extract_region_transactions.map(
unmapped(data_config),
unmapped(vendor),
input_paths_by_partition_key,
unmapped(decrypt_or_skip),
)
# First get all inputs for each partition key
raw_input_paths_by_partition_key = get_input_paths_by_partition_key(
data_config, start_date, end_date, run_backfill
)
# Decrypt files, if necessary
with case(task(lambda s: s is not None)(decryption_secret), True):
# get a flattened set of input files to decrypt
flat_set_input_files = flatten_input_files(
data_config, run_backfill, raw_input_paths_by_partition_key
)
# decrypt files in parallel
decrypted_files = decrypt_file_task.map(
unmapped(data_config), unmapped(decryption_secret), flat_set_input_files
)
# transform input_paths_by_partition_key to have the decrypted paths
decrypted_input_paths_by_partition_key = transform_input_paths(
data_config, raw_input_paths_by_partition_key
)
# Extract to BQ -- run in parallel, for each partition.
extract_region_transactions.map(
unmapped(data_config),
unmapped(vendor),
decrypted_input_paths_by_partition_key,
upstream_tasks=[decrypted_files],
)
with case(task(lambda s: s is not None)(decryption_secret), False):
# Extract to BQ -- run in parallel, for each partition.
extract_region_transactions.map(
unmapped(data_config), unmapped(vendor), raw_input_paths_by_partition_key,
)
decrypted_files
task is caching a lot of the mapper tasks, any idea why that is? shouldn't they stay pending until they get picked up?Jim Crist-Harif
11/18/2020, 7:10 PM# First get all inputs for each partition key
raw_input_paths_by_partition_key = get_input_paths_by_partition_key(
data_config, start_date, end_date, run_backfill
)
# Decrypt files, if necessary
with case(task(lambda s: s is not None)(decryption_secret), True):
# get a flattened set of input files to decrypt
flat_set_input_files = flatten_input_files(
data_config, raw_input_paths_by_partition_key
)
# decrypt files in parallel
decrypted_files = decrypt_file_task.map(
unmapped(data_config), unmapped(decryption_secret), flat_set_input_files
)
# transform input_paths_by_partition_key to have the decrypted paths
decrypted_input_paths_by_partition_key = transform_input_paths(
data_config, raw_input_paths_by_partition_key, upstream_tasks=[decrypted_files]
)
with case(task(lambda s: s is not None)(decryption_secret), False):
non_decrypted_input_paths_by_partition_key = raw_input_paths_by_partition_key
skip_file_decryption = skip_decrypt_file_task(data_config, None, None)
# Merge branches of DAG
input_paths_by_partition_key = merge(
decrypted_input_paths_by_partition_key, non_decrypted_input_paths_by_partition_key,
)
decrypt_or_skip = merge(decrypted_files, skip_file_decryption)
# Extract to BQ -- run in parallel, for each partition.
extract_region_transactions.map(
unmapped(data_config),
unmapped(vendor),
input_paths_by_partition_key,
unmapped(decrypt_or_skip),
)
the weird thing is theI'm not sure I follow this sentence, can you rephrase it to clarify?task is caching a lot of the mapper tasks, any idea why that is? shouldn't they stay pending until they get picked up?decrypted_files
Marco Palmeri
11/18/2020, 7:30 PMINFO Task 'decrypt_file_task[410]': Finished task run for task with final state: 'Cached'
Cached
Is that expected?Jim Crist-Harif
11/18/2020, 8:07 PMMarco Palmeri
11/18/2020, 8:08 PMJim Crist-Harif
11/18/2020, 8:09 PMI wonder if this was the issue all alongIn previous versions you didn't have a dependence on the decrpyting task actually completing, so I doubt this is the case. The change to set
upstream_tasks
should be necessary, the caching thing you're seeing here is likely another issue.Marco Palmeri
11/18/2020, 8:36 PMJim Crist-Harif
11/18/2020, 8:37 PM