Hi. I have a question. I've tried rearranging my...
# prefect-community
m
Hi. I have a question. I've tried rearranging my flow in a few different ways, but I still cannot get a mapped task to wait for all the proceeding mapped tasks to finish. Basically I have a flow where sometimes I will need to decrypt files, some sometimes not (based on a config). So I have a case() control flow task to do the "decryption branch." I start with a dict with some partition keys and a list of URIs for each. In the "decryption branch" I do two things to this dict. (1) transform the URIs to the new output paths (2) make a flat list of all the unique URIs. For (2) I map this and run the decryption task in parallel as a mapped task - this part is working just fine. Then I merge the dict from (1) with the an unchanged version of the dict for the cases where running the decryption should be skipped - this part is working just fine. My issue is then, I pass the resulting dict to mapped task to parallelize for each key. But the task starts up before the decryption mapping task is done. I need it to wait until all of the workers are done. - You can see I have even tried a "dummy" skip task, so I could merge the decrypting task with something from the false branch, and pass that to the final task. This didn't work.
extract_region_transactions
starts running as soon as any of the
decrypt_file_task
worker tasks complete. -- See code snippet in thread. ``````
j
Hi Marco, Your issue is because none of the
extract_region_transactions
tasks depend on the
decrypted_files
task - they only depend on
decrypted_input_paths_by_partition_key
(which only modify the paths, not the values). One way to fix this would be to pass
upstream_tasks=[decrypted_files]
to the call to
transform_input_paths
in the first case block. This tells prefect that that task depends on the previous task, but not necessarily its resulting values (useful for things that pass around data via files, rather than in memory).
m
I did try this too 😞
j
Also, in the future can you only post long code snippets/tracebacks inside the slack thread? We like to keep top-level posts short to make the main page more readable.
1
👍 1
Hmmm, can you post the code you tried?
m
Moved previous code snippet to thread.
Copy code
# First get all inputs for each partition key
        raw_input_paths_by_partition_key = get_input_paths_by_partition_key(
            data_config, start_date, end_date, run_backfill
        )
        # Decrypt files, if necessary
        with case(task(lambda s: s is not None)(decryption_secret), True):
            # get a flattened set of input files to decrypt
            flat_set_input_files = flatten_input_files(
                data_config, raw_input_paths_by_partition_key
            )
            # decrypt files in parallel
            decrypted_files = decrypt_file_task.map(
                unmapped(data_config), unmapped(decryption_secret), flat_set_input_files
            )
            # transform input_paths_by_partition_key to have the decrypted paths
            decrypted_input_paths_by_partition_key = transform_input_paths(
                data_config, raw_input_paths_by_partition_key
            )
        with case(task(lambda s: s is not None)(decryption_secret), False):
            non_decrypted_input_paths_by_partition_key = raw_input_paths_by_partition_key
            skip_file_decryption = skip_decrypt_file_task(data_config, None, None)
        # Merge branches of DAG
        input_paths_by_partition_key = merge(
            decrypted_input_paths_by_partition_key, non_decrypted_input_paths_by_partition_key,
        )
        decrypt_or_skip = merge(decrypted_files, skip_file_decryption)
        # Extract to BQ -- run in parallel, for each partition.
        extract_region_transactions.map(
            unmapped(data_config),
            unmapped(vendor),
            input_paths_by_partition_key,
            unmapped(decrypt_or_skip),
        )
This is the same as before, just moving it out of the main message ^
@Jim Crist-Harif
Copy code
# First get all inputs for each partition key
        raw_input_paths_by_partition_key = get_input_paths_by_partition_key(
            data_config, start_date, end_date, run_backfill
        )
        # Decrypt files, if necessary
        with case(task(lambda s: s is not None)(decryption_secret), True):
            # get a flattened set of input files to decrypt
            flat_set_input_files = flatten_input_files(
                data_config, run_backfill, raw_input_paths_by_partition_key
            )
            # decrypt files in parallel
            decrypted_files = decrypt_file_task.map(
                unmapped(data_config), unmapped(decryption_secret), flat_set_input_files
            )
            # transform input_paths_by_partition_key to have the decrypted paths
            decrypted_input_paths_by_partition_key = transform_input_paths(
                data_config, raw_input_paths_by_partition_key
            )
            # Extract to BQ -- run in parallel, for each partition.
            extract_region_transactions.map(
                unmapped(data_config),
                unmapped(vendor),
                decrypted_input_paths_by_partition_key,
                upstream_tasks=[decrypted_files],
            )
        with case(task(lambda s: s is not None)(decryption_secret), False):
            # Extract to BQ -- run in parallel, for each partition.
            extract_region_transactions.map(
                unmapped(data_config), unmapped(vendor), raw_input_paths_by_partition_key,
            )
here is that code - in this one I don't do any merging, just keep the branches seperate
the weird thing is the
decrypted_files
task is caching a lot of the mapper tasks, any idea why that is? shouldn't they stay pending until they get picked up?
j
Try:
Copy code
# First get all inputs for each partition key
        raw_input_paths_by_partition_key = get_input_paths_by_partition_key(
            data_config, start_date, end_date, run_backfill
        )
        # Decrypt files, if necessary
        with case(task(lambda s: s is not None)(decryption_secret), True):
            # get a flattened set of input files to decrypt
            flat_set_input_files = flatten_input_files(
                data_config, raw_input_paths_by_partition_key
            )
            # decrypt files in parallel
            decrypted_files = decrypt_file_task.map(
                unmapped(data_config), unmapped(decryption_secret), flat_set_input_files
            )
            # transform input_paths_by_partition_key to have the decrypted paths
            decrypted_input_paths_by_partition_key = transform_input_paths(
                data_config, raw_input_paths_by_partition_key, upstream_tasks=[decrypted_files]
            )
        with case(task(lambda s: s is not None)(decryption_secret), False):
            non_decrypted_input_paths_by_partition_key = raw_input_paths_by_partition_key
            skip_file_decryption = skip_decrypt_file_task(data_config, None, None)
        # Merge branches of DAG
        input_paths_by_partition_key = merge(
            decrypted_input_paths_by_partition_key, non_decrypted_input_paths_by_partition_key,
        )
        decrypt_or_skip = merge(decrypted_files, skip_file_decryption)
        # Extract to BQ -- run in parallel, for each partition.
        extract_region_transactions.map(
            unmapped(data_config),
            unmapped(vendor),
            input_paths_by_partition_key,
            unmapped(decrypt_or_skip),
        )
the weird thing is the
decrypted_files
task is caching a lot of the mapper tasks, any idea why that is?  shouldn't they stay pending until they get picked up?
I'm not sure I follow this sentence, can you rephrase it to clarify?
m
e.g. :
INFO  Task 'decrypt_file_task[410]': Finished task run for task with final state: 'Cached'
@Jim Crist-Harif ^ Any idea why many tasks are moving to state =
Cached
Is that expected?
j
It sounds like the caching settings for that task mean that repeated runs can use previous cached versions. See https://docs.prefect.io/core/concepts/persistence.html for more information on this.
If you're running the same tasks on the same data with output caching enabled, subsequent task runs can use the previous run results instead of re-running.
m
Like for 1000 tasks, it will do 5 of them, and cache 955 of them, then move on to the downstream tasks... I wonder if this was the issue all along
Got it.. let me look into this
j
I wonder if this was the issue all along
In previous versions you didn't have a dependence on the decrpyting task actually completing, so I doubt this is the case. The change to set
upstream_tasks
should be necessary, the caching thing you're seeing here is likely another issue.
m
Thanks @Jim Crist-Harif - I fixed the caching issue. Seems like it's working properly so far
🚀 1
j
Glad you figured it out! Let us know if you have any other questions.