Andrea Haessly
02/07/2022, 6:23 PMAndrea Haessly
02/07/2022, 6:26 PMfrom typing import Union, Tuple
from prefect import task, Flow
from prefect.tasks.gcp import GCSCopy, GCSBlobExists
# from google.cloud import storage
from prefect.tasks.gcp.storage import GCSBaseTask
from prefect.utilities.tasks import defaults_from_attrs
from prefect.utilities.gcp import get_storage_client
source_bucket = "bucket1"
source_blob = "file1"
dest_bucket = "file2"
dest_blob = "bucket2"
class GCSDeleteFile(GCSBaseTask):
def __init__(
self,
bucket: str = None,
blob: str = None,
project: str = None,
request_timeout: Union[float, Tuple[float, float]] = 60,
**kwargs,
):
super().__init__(bucket, blob, project, request_timeout, kwargs)
@defaults_from_attrs(
"bucket",
"blob",
"project",
"request_timeout",
)
def run(
self,
bucket: str = None,
blob: str = None,
project: str = None,
credentials: dict = None,
request_timeout: Union[float, Tuple[float, float]] = 60,
) -> None:
# create client
client = get_storage_client(project=project, credentials=credentials)
# get source bucket and blob
bucket_obj = client.get_bucket(source_bucket)
blob_obj = bucket_obj.blob(source_blob)
# delete
blob_obj.delete()
def main():
flow = Flow("gcpImperativeExample")
copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
flow.add_task(copy_file_task)
file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob) ## these values are not used when task runs
flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task])
# flow.set_dependencies(task=file_exists_task, upstream_tasks=[copy_file_task], keyword_tasks=dict(bucket_name=dest_bucket, blob=dest_blob))
delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob) ## these values do get used when task runs
flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task])
# flow.set_dependencies(task=delete_file_task, upstream_tasks=[file_exists_task], keyword_tasks=dict(bucket=source_bucket, blob=source_blob))
flow.run()
if __name__ == "__main__":
main()
Andrea Haessly
02/07/2022, 6:27 PM[2022-02-07 13:19:52-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpImperativeExample'
[2022-02-07 13:19:52-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Starting task run...
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSCopy': Finished task run for task with final state: 'Success'
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
[2022-02-07 13:19:53-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.9/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
raise ValueError("Missing bucket_name or blob")
ValueError: Missing bucket_name or blob
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Starting task run...
[2022-02-07 13:19:53-0500] INFO - prefect.TaskRunner | Task 'GCSDeleteFile': Finished task run for task with final state: 'TriggerFailed'
[2022-02-07 13:19:53-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Andrea Haessly
02/07/2022, 6:29 PMKevin Kho
Kevin Kho
def main():
with Flow("gcExample") as flow:
copy_file_task = GCSCopy(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)()
file_exists_task = GCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)(upstream_tasks=[copy_file_task])
delete_file_task = GCSDeleteFile(bucket=source_bucket, blob=source_blob)(upstream_tasks=[file_exists_task])
flow.run()
Andrea Haessly
02/07/2022, 6:33 PMKevin Kho
with Flow("example") as flow:
a = task_a()
task_b(upstream_tasks=[a])
with flow:
task_c()
flow.run()
but I haven’t seen anyone really use this heavily. If you want your logic to be modular, you can also use the flow of flows patternAndrea Haessly
02/07/2022, 6:42 PMGCSBlobExists(bucket_name=dest_bucket, blob=dest_blob)
but how do I know what the options are for the second invocation (upstream_tasks=[copy_file_task])
? I've tried looking at the params documented on the Task class, but upsteam_tasks
is not one of them.Kevin Kho
Task.run()
documentation here because the Flow is responsible for calling the .run()
. The second invocation is just the run.Kevin Kho
with Flow("example") as flow:
copy_file_task = GCSCopy()(source_bucket=source_bucket, source_blob=source_blob, dest_bucket=dest_bucket, dest_blob=dest_blob)
file_exists_task = GCSBlobExists()(bucket_name=dest_bucket, blob=dest_blob, upstream_tasks=[copy_file_task])
delete_file_task = GCSDeleteFile()(bucket=source_bucket, blob=source_blob, upstream_tasks=[file_exists_task])
Note that the first ()
is the init and is empty here. This is specifically because those tasks support passing those arguments in the run methodBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by