Andrea Haessly
02/07/2022, 6:23 PMfrom typing import Union, Tuple
from prefect import task, Flow
from prefect.tasks.gcp import GCSCopy, GCSBlobExists
# from google.cloud import storage
from prefect.tasks.gcp.storage import GCSBaseTask
from prefect.utilities.tasks import defaults_from_attrs
from prefect.utilities.gcp import get_storage_client
source_bucket = "bucket1"
source_blob = "file1"
dest_bucket = "file2"
dest_blob = "bucket2"
class GCSDeleteFile(GCSBaseTask):
def __init__(
self,
bucket: str = None,
blob: str = None,
project: str = None,
request_timeout: Union[float, Tuple[float, float]] = 60,
**kwargs,
):
super().__init__(bucket, blob, project, request_timeout, kwargs)
@defaults_from_attrs(
"bucket",
"blob",
"project",
"request_timeout",
)
def run(
self,
bucket: str = None,
blob: str = None,
project: str = None,
credentials: dict = None,
request_timeout: Union[float, Tuple[float, float]] = 60,
) -> None:
# create client
client = get_storage_client(project=project, credentials=credentials)
# get source bucket and blob
bucket_obj = client.get_bucket(source_bucket)
blob_obj = bucket_obj.blob(source_blob)
# delete
blob_obj.delete()
def main():
flow = Flow("gcpImperativeExample")
copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
flow.add_task(copy_file_task)
file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob) ## these values are not used when task runs
flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task])
# flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task], keyword_tasks=dict(bucket_name=dest_bucket, blob=dest_blob))
delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob) ## these values do get used when task runs
flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task])
# flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task], keyword_tasks=dict(bucket=source_bucket, blob=source_blob))
flow.run()
if __name__ == "__main__":
main()
[2022-02-07 13:19:52-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpImperativeExample'
[2022-02-07 13:19:52-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Starting task run...
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Finished task run for task with final state: 'Success'
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
[2022-02-07 13:19:53-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
raise ValueError("Missing bucket_name or blob")
ValueError: Missing bucket_name or blob
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Starting task run...
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Finished task run for task with final state: 'TriggerFailed'
[2022-02-07 13:19:53-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Kevin Kho
def main():
with Flow("gcExample") as flow:
copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)()
file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)(upstream_tasks=[copy_file_task])
delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob)(upstream_tasks=[file_exists_task])
flow.run()
Andrea Haessly
02/07/2022, 6:33 PMKevin Kho
with Flow("example") as flow:
a = task_a()
task_b(upstream_tasks=[a])
with flow:
task_c()
flow.run()
but I haven’t seen anyone really use this heavily. If you want your logic to be modular, you can also use the flow of flows patternAndrea Haessly
02/07/2022, 6:42 PMGCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)
but how do I know what the options are for the second invocation (upstream_tasks=[copy_file_task])
? I've tried looking at the params documented on the Task class, but upsteam_tasks
is not one of them.Kevin Kho
Task.run()
documentation here because the Flow is responsible for calling the .run()
. The second invocation is just the run.with Flow("example") as flow:
copy_file_task = GCSCopy()(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
file_exists_task = GCSBlobExists()(bucket_name=dest_bucket, blob=dest_blob, upstream_tasks=[copy_file_task])
delete_file_task = GCSDeleteFile()(bucket=source_bucket, blob=source_blob, upstream_tasks=[file_exists_task])
Note that the first ()
is the init and is empty here. This is specifically because those tasks support passing those arguments in the run method