Thread
#prefect-community
    Andrea Haessly

    Andrea Haessly

    7 months ago
    I'm still trying to understand when I can set task attributes during the task constructor (vs when a copy of the task is made and the values are not carried over). Will post code example in 🧵
    from typing import Union, Tuple
    
    from prefect import task, Flow
    from prefect.tasks.gcp import GCSCopy, GCSBlobExists
    
    # from google.cloud import storage
    from prefect.tasks.gcp.storage import GCSBaseTask
    from prefect.utilities.tasks import defaults_from_attrs
    from prefect.utilities.gcp import get_storage_client
    
    
    source_bucket = "bucket1"
    source_blob = "file1"
    dest_bucket = "file2"
    dest_blob = "bucket2"
    
    class GCSDeleteFile(GCSBaseTask):
        def __init__(
                self,
                bucket: str = None,
                blob: str = None,
                project: str = None,
                request_timeout: Union[float, Tuple[float, float]] = 60,
                **kwargs,
        ):
            super().__init__(bucket, blob, project, request_timeout, kwargs)
    
        @defaults_from_attrs(
            "bucket",
            "blob",
            "project",
            "request_timeout",
        )
        def run(
            self,
            bucket: str = None,
            blob: str = None,
            project: str = None,
            credentials: dict = None,
            request_timeout: Union[float, Tuple[float, float]] = 60,
        ) -> None:
            # create client
            client = get_storage_client(project=project, credentials=credentials)
    
            # get source bucket and blob
            bucket_obj = client.get_bucket(source_bucket)
            blob_obj = bucket_obj.blob(source_blob)
    
            # delete
            blob_obj.delete()
    
    
    def main():
        flow = Flow("gcpImperativeExample")
        copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
        flow.add_task(copy_file_task)
    
        file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)  ## these values are not used when task runs
        flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task])
        # flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task], keyword_tasks=dict(bucket_name=dest_bucket, blob=dest_blob))
    
        delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob)  ## these values do get used when task runs 
        flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task])
        # flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task], keyword_tasks=dict(bucket=source_bucket, blob=source_blob))
    
        flow.run()
    
    
    if __name__ == "__main__":
        main()
    when I run this python code, I get an exception when the GCSBlobExists task tries to run
    [2022-02-07 13:19:52-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpImperativeExample'
    [2022-02-07 13:19:52-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Starting task run...
    [2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Finished task run for task with final state: 'Success'
    [2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
    [2022-02-07 13:19:53-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
        raise ValueError("Missing bucket_name or blob")
    ValueError: Missing bucket_name or blob
    [2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
    [2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Starting task run...
    [2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Finished task run for task with final state: 'TriggerFailed'
    [2022-02-07 13:19:53-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    I have figured out that I can set the values in the flow.set_dependencies call. But I'm really confused why it doesn't work for GCSBlobExists but setting the values in the task constructor does work for both the GCSCopy task and the GCSDeleteFile task?
    Kevin Kho

    Kevin Kho

    7 months ago
    Are you set on using the imperative API? Because the Functional API is easier to work with by far
    Like:
    def main():
        with Flow("gcExample") as flow:
            copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)()
            file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)(upstream_tasks=[copy_file_task])
            delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob)(upstream_tasks=[file_exists_task])
    
        flow.run()
    Andrea Haessly

    Andrea Haessly

    7 months ago
    No, I'm not set on it, but I am trying to evaluate the tool and would like to understand the options for use. If you don't recommend using the imperative API that's fine. But from the docs it sounds like the imperative API gives you more ability to customize your flows and we will have some pretty complicated workflows that we need to implement.
    Kevin Kho

    Kevin Kho

    7 months ago
    Ah ok. To be honest, I haven’t seen a case only achievable with the imperative API. Because if you want to use functions to add tasks to an existing Flow, you can also do:
    with Flow("example") as flow:
        a = task_a()
        task_b(upstream_tasks=[a])
    
    with flow:
        task_c()
    
    flow.run()
    but I haven’t seen anyone really use this heavily. If you want your logic to be modular, you can also use the flow of flows pattern
    Andrea Haessly

    Andrea Haessly

    7 months ago
    Ok thanks. I'll focus on the functional api. But one more question - how do I know what parameters can be passed to the the task run? In your example you passed the task specific parameters in the instantiation of the task
    GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)
    but how do I know what the options are for the second invocation
    (upstream_tasks=[copy_file_task])
    ? I've tried looking at the params documented on the Task class, but
    upsteam_tasks
    is not one of them.
    Kevin Kho

    Kevin Kho

    7 months ago
    There are only three, you may find them scattered around the doc pages, but the comprehensive thing is to look at the
    Task.run()
    documentation here because the Flow is responsible for calling the .
    run()
    . The second invocation is just the run.
    And then of course, whatever inputs your Task also takes like:
    with Flow("example") as flow:
            copy_file_task = GCSCopy()(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
            file_exists_task = GCSBlobExists()(bucket_name=dest_bucket, blob=dest_blob, upstream_tasks=[copy_file_task])
            delete_file_task = GCSDeleteFile()(bucket=source_bucket, blob=source_blob, upstream_tasks=[file_exists_task])
    Note that the first
    ()
    is the init and is empty here. This is specifically because those tasks support passing those arguments in the run method