Andrea Haessly

02/07/2022, 6:23 PM
I'm still trying to understand when I can set task attributes during the task constructor (vs when a copy of the task is made and the values are not carried over). Will post code example in 🧵
Copy code
from typing import Union, Tuple

from prefect import task, Flow
from prefect.tasks.gcp import GCSCopy, GCSBlobExists

# from import storage
from import GCSBaseTask
from prefect.utilities.tasks import defaults_from_attrs
from prefect.utilities.gcp import get_storage_client

source_bucket = "bucket1"
source_blob = "file1"
dest_bucket = "file2"
dest_blob = "bucket2"

class GCSDeleteFile(GCSBaseTask):
    def __init__(
            bucket: str = None,
            blob: str = None,
            project: str = None,
            request_timeout: Union[float, Tuple[float, float]] = 60,
        super().__init__(bucket, blob, project, request_timeout, kwargs)

    def run(
        bucket: str = None,
        blob: str = None,
        project: str = None,
        credentials: dict = None,
        request_timeout: Union[float, Tuple[float, float]] = 60,
    ) -> None:
        # create client
        client = get_storage_client(project=project, credentials=credentials)

        # get source bucket and blob
        bucket_obj = client.get_bucket(source_bucket)
        blob_obj = bucket_obj.blob(source_blob)

        # delete

def main():
    flow = Flow("gcpImperativeExample")
    copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)

    file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)  ## these values are not used when task runs
    flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task])
    # flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task], keyword_tasks=dict(bucket_name=dest_bucket, blob=dest_blob))

    delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob)  ## these values do get used when task runs 
    flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task])
    # flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task], keyword_tasks=dict(bucket=source_bucket, blob=source_blob))

if __name__ == "__main__":
when I run this python code, I get an exception when the GCSBlobExists task tries to run
Copy code
[2022-02-07 13:19:52-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpImperativeExample'
[2022-02-07 13:19:52-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Starting task run...
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Finished task run for task with final state: 'Success'
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
[2022-02-07 13:19:53-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/engine/", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/", line 467, in run_task_with_timeout
    return*args, **kwargs)  # type: ignore
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/tasks/gcp/", line 525, in run
    raise ValueError("Missing bucket_name or blob")
ValueError: Missing bucket_name or blob
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Starting task run...
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Finished task run for task with final state: 'TriggerFailed'
[2022-02-07 13:19:53-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
I have figured out that I can set the values in the flow.set_dependencies call. But I'm really confused why it doesn't work for GCSBlobExists but setting the values in the task constructor does work for both the GCSCopy task and the GCSDeleteFile task?

Kevin Kho

02/07/2022, 6:29 PM
Are you set on using the imperative API? Because the Functional API is easier to work with by far
upvote 1
Copy code
def main():
    with Flow("gcExample") as flow:
        copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)()
        file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)(upstream_tasks=[copy_file_task])
        delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob)(upstream_tasks=[file_exists_task])

Andrea Haessly

02/07/2022, 6:33 PM
No, I'm not set on it, but I am trying to evaluate the tool and would like to understand the options for use. If you don't recommend using the imperative API that's fine. But from the docs it sounds like the imperative API gives you more ability to customize your flows and we will have some pretty complicated workflows that we need to implement.

Kevin Kho

02/07/2022, 6:37 PM
Ah ok. To be honest, I haven’t seen a case only achievable with the imperative API. Because if you want to use functions to add tasks to an existing Flow, you can also do:
Copy code
with Flow("example") as flow:
    a = task_a()

with flow:
but I haven’t seen anyone really use this heavily. If you want your logic to be modular, you can also use the flow of flows pattern

Andrea Haessly

02/07/2022, 6:42 PM
Ok thanks. I'll focus on the functional api. But one more question - how do I know what parameters can be passed to the the task run? In your example you passed the task specific parameters in the instantiation of the task
GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)
but how do I know what the options are for the second invocation
? I've tried looking at the params documented on the Task class, but
is not one of them.

Kevin Kho

02/07/2022, 6:48 PM
There are only three, you may find them scattered around the doc pages, but the comprehensive thing is to look at the
documentation here because the Flow is responsible for calling the .
. The second invocation is just the run.
And then of course, whatever inputs your Task also takes like:
Copy code
with Flow("example") as flow:
        copy_file_task = GCSCopy()(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
        file_exists_task = GCSBlobExists()(bucket_name=dest_bucket, blob=dest_blob, upstream_tasks=[copy_file_task])
        delete_file_task = GCSDeleteFile()(bucket=source_bucket, blob=source_blob, upstream_tasks=[file_exists_task])
Note that the first
is the init and is empty here. This is specifically because those tasks support passing those arguments in the run method
upvote 1