Miguel Moncada
01/10/2023, 8:51 AMGcsBucket.load
classmethod following the reference here.
ValueError: Unable to find block document named tam-2759-bucket for block type gcs-bucket
More details in the thread đź§µMiguel Moncada
01/10/2023, 8:52 AMMiguel Moncada
01/10/2023, 8:53 AMGcsBucket
class has further options and it's the preferred way-to-go)
logger = get_run_logger()
<http://logger.info|logger.info>("Starting flow")
bucket_gcs = GCS.load(GCS_BUCKET_BLOB)
<http://logger.info|logger.info>(f"Bucket {bucket_gcs} loaded")
❯ python tam_2759_flow.py
09:44:22.436 | INFO | prefect.engine - Created flow run 'tunneling-horse' for flow 'tam_2759_flow'
09:44:23.376 | INFO | Flow run 'tunneling-horse' - Starting flow
09:44:23.668 | INFO | Flow run 'tunneling-horse' - Bucket GCS(bucket_path='bucket-name', service_account_info=SecretStr('**********'), project='gcp-project-name') loaded
Miguel Moncada
01/10/2023, 8:55 AMfrom prefect import task, flow, get_run_logger
from prefect_gcp.cloud_storage import GcsBucket
from utils.tam_2759.utils import check_is_candidate_file_from_uri
from utils.tam_2759.constants import GCS_BUCKET_BLOB, FILE_MAX_OLD_DAYS
@task(
name="get_gcs_candidate_upload_files",
retries=0,
retry_delay_seconds=1,
timeout_seconds=5,
)
def get_gcs_candidate_upload_files(
bucket: GcsBucket, days_period: int = FILE_MAX_OLD_DAYS
) -> list:
"""
Function to get the list of files to process.
Args:
bucket (GcsBucket): GCS bucket object
days_period (int, optional): Days period to check if a file is valid
to process. Defaults to FILE_MAX_OLD_DAYS.
Returns:
list: List of files to process
"""
logger = get_run_logger()
blob_list = bucket.list_blobs()
if blob_list:
<http://logger.info|logger.info>(f"Found {len(blob_list)} files in bucket"
f"{GcsBucket.bucket}")
else:
logger.warning(f"No files found in bucket {GcsBucket.bucket}")
return [
check_is_candidate_file_from_uri(blob.name, days_period=days_period)
for blob in blob_list
]
@flow(name="tam_2759_flow")
def main():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting flow")
bucket = GcsBucket.load(GCS_BUCKET_BLOB)
<http://logger.info|logger.info>(f"Bucket {bucket} loaded")
candidate_files = get_gcs_candidate_upload_files(bucket=bucket)
<http://logger.info|logger.info>(candidate_files)
main()
Miguel Moncada
01/10/2023, 8:58 AM❯ pip freeze | grep -i "prefect\|google"
google-api-core==2.11.0
google-api-python-client==2.70.0
google-auth==2.15.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.8.0
google-cloud==0.34.0
google-cloud-bigquery==3.4.1
google-cloud-core==2.3.2
google-cloud-storage==2.7.0
google-crc32c==1.5.0
google-resumable-media==2.4.0
googleapis-common-protos==1.57.0
prefect==2.7.5
prefect-dask==0.2.2
prefect-gcp==0.2.2
alex
01/10/2023, 2:34 PMGCS
block in the UI, but you’re using the GcsBucket
class from prefect-gcp
. If you want to load the block you created in the UI you’ll need to use the GCS
class from prefect.filesystems
.Miguel Moncada
01/10/2023, 2:35 PMGcsBucket
class object using the credentials stored in a GCS block?alex
01/10/2023, 2:42 PMprefect-gcp
with the command prefect blocks register -m prefect_gcp
. The blocks in prefect_gcp
will be available in your UI and available for configuration after that.Miguel Moncada
01/10/2023, 2:48 PMGCS
and GCS bucket
blocks defined?alex
01/11/2023, 12:17 PMMiguel Moncada
01/11/2023, 12:26 PMGCS Bucket
block offers more functionality than GCS
, so I'll try to stick to this one.
If that was the case, and just out of curiosity, are there any plans on keeping only one of them in the future?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by