Miguel Moncada
01/10/2023, 8:51 AMGcsBucket.load
classmethod following the reference here.
ValueError: Unable to find block document named tam-2759-bucket for block type gcs-bucket
More details in the thread đź§µGcsBucket
class has further options and it's the preferred way-to-go)
logger = get_run_logger()
<http://logger.info|logger.info>("Starting flow")
bucket_gcs = GCS.load(GCS_BUCKET_BLOB)
<http://logger.info|logger.info>(f"Bucket {bucket_gcs} loaded")
❯ python tam_2759_flow.py
09:44:22.436 | INFO | prefect.engine - Created flow run 'tunneling-horse' for flow 'tam_2759_flow'
09:44:23.376 | INFO | Flow run 'tunneling-horse' - Starting flow
09:44:23.668 | INFO | Flow run 'tunneling-horse' - Bucket GCS(bucket_path='bucket-name', service_account_info=SecretStr('**********'), project='gcp-project-name') loaded
from prefect import task, flow, get_run_logger
from prefect_gcp.cloud_storage import GcsBucket
from utils.tam_2759.utils import check_is_candidate_file_from_uri
from utils.tam_2759.constants import GCS_BUCKET_BLOB, FILE_MAX_OLD_DAYS
@task(
name="get_gcs_candidate_upload_files",
retries=0,
retry_delay_seconds=1,
timeout_seconds=5,
)
def get_gcs_candidate_upload_files(
bucket: GcsBucket, days_period: int = FILE_MAX_OLD_DAYS
) -> list:
"""
Function to get the list of files to process.
Args:
bucket (GcsBucket): GCS bucket object
days_period (int, optional): Days period to check if a file is valid
to process. Defaults to FILE_MAX_OLD_DAYS.
Returns:
list: List of files to process
"""
logger = get_run_logger()
blob_list = bucket.list_blobs()
if blob_list:
<http://logger.info|logger.info>(f"Found {len(blob_list)} files in bucket"
f"{GcsBucket.bucket}")
else:
logger.warning(f"No files found in bucket {GcsBucket.bucket}")
return [
check_is_candidate_file_from_uri(blob.name, days_period=days_period)
for blob in blob_list
]
@flow(name="tam_2759_flow")
def main():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting flow")
bucket = GcsBucket.load(GCS_BUCKET_BLOB)
<http://logger.info|logger.info>(f"Bucket {bucket} loaded")
candidate_files = get_gcs_candidate_upload_files(bucket=bucket)
<http://logger.info|logger.info>(candidate_files)
main()
❯ pip freeze | grep -i "prefect\|google"
google-api-core==2.11.0
google-api-python-client==2.70.0
google-auth==2.15.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.8.0
google-cloud==0.34.0
google-cloud-bigquery==3.4.1
google-cloud-core==2.3.2
google-cloud-storage==2.7.0
google-crc32c==1.5.0
google-resumable-media==2.4.0
googleapis-common-protos==1.57.0
prefect==2.7.5
prefect-dask==0.2.2
prefect-gcp==0.2.2
alex
01/10/2023, 2:34 PMGCS
block in the UI, but you’re using the GcsBucket
class from prefect-gcp
. If you want to load the block you created in the UI you’ll need to use the GCS
class from prefect.filesystems
.Miguel Moncada
01/10/2023, 2:35 PMGcsBucket
class object using the credentials stored in a GCS block?alex
01/10/2023, 2:42 PMprefect-gcp
with the command prefect blocks register -m prefect_gcp
. The blocks in prefect_gcp
will be available in your UI and available for configuration after that.Miguel Moncada
01/10/2023, 2:48 PMGCS
and GCS bucket
blocks defined?alex
01/11/2023, 12:17 PMMiguel Moncada
01/11/2023, 12:26 PMGCS Bucket
block offers more functionality than GCS
, so I'll try to stick to this one.
If that was the case, and just out of curiosity, are there any plans on keeping only one of them in the future?