https://prefect.io logo
#prefect-community
Title
# prefect-community
f

flavienbwk

09/25/2022, 3:10 PM
Hi, I am using Prefect 1.x for a while now and trying to migrate to 2.x. But I feel like the documentation is missing a lot of examples and it is pretty complicated to migrate to 2.x although I consider the effort on publishing a lot of (too?) basic articles to get started. For example, I don't find anywhere an example on adding parameters to my flow. I'm here trying to register a flow in a S3 storage with pre-defined parameters at flow registration. The parameters include S3 credentials to create a bucket IN the flow.
Copy code
@flow(name="get_paris_weather")
def get_paris_weather(
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_use_ssl: bool,
    bucket_name: str,
):
    create_bucket(
        minio_endpoint,
        minio_access_key,
        minio_secret_key,
        minio_use_ssl,
        bucket_name,
    )
    city_coordinates = get_city_coordinates("Paris")
    return get_weather(city_coordinates[0], city_coordinates[1])


# --- Deployment definition

if __name__ == "__main__":
    bucket_name = os.environ.get("MINIO_PREFECT_FLOWS_BUCKET_NAME")
    minio_endpoint = os.environ.get("MINIO_ENDPOINT")
    minio_use_ssl = os.environ.get("MINIO_USE_SSL") == "true"
    minio_scheme = "https" if minio_use_ssl else "http"
    minio_access_key = os.environ.get("MINIO_ACCESS_KEY")
    minio_secret_key = os.environ.get("MINIO_SECRET_KEY")

    flow_identifier = uuid.uuid4()
    block_storage = RemoteFileSystem(
        basepath=f"s3://{bucket_name}/{flow_identifier}",
        key_type="hash",
        settings=dict(
            use_ssl=minio_use_ssl,
            key=minio_access_key,
            secret=minio_secret_key,
            client_kwargs=dict(endpoint_url=f"{minio_scheme}://{minio_endpoint}"),
        ),
    )
    block_storage.save("s3-storage", overwrite=True)

    deployment = Deployment.build_from_flow(
        name="get_weather_s3_example",
        flow=get_paris_weather,
        storage=RemoteFileSystem.load("s3-storage"),
        work_queue_name="flows-example-queue",
        parameters={
            minio_endpoint: minio_endpoint,
            minio_access_key: minio_access_key,
            minio_secret_key: minio_secret_key,
            minio_use_ssl: minio_use_ssl,
            bucket_name: bucket_name,
        },
    )
    deployment.apply()
But what I get as error is :
Copy code
prefect.exceptions.SignatureMismatchError: Function expects parameters ['minio_endpoint', 'minio_access_key', 'minio_secret_key', 'minio_use_ssl', 'bucket_name'] but was provided with parameters ['False', 'minio', 'minio123', 'prefect-flows', '172.17.0.1:9000']
Could you explain me how I can pass parameters to my flow ?
1
r

Ryan Peden

09/25/2022, 4:06 PM
I think you are seeing this because your parameters dict is using your variables as keys instead of using strings. Instead, perhaps try:
Copy code
parameters={
    "minio_endpoint": minio_endpoint,
    "minio_access_key": minio_access_key,
    "minio_secret_key": minio_secret_key,
    "minio_use_ssl": minio_use_ssl,
    "bucket_name": bucket_name,
}
1
🙌 1
f

flavienbwk

09/25/2022, 4:25 PM
Well, it works with quotes... Thanks !
11 Views