Hi All, We have a handy little Task class called `...
# ask-community
d
Hi All, We have a handy little Task class called 
api_data_to_s3
 that sends json data to S3. In other flows, we import this class and pass it two arguments, the vendor name and the api_endpoint (ex. 
api_data_to_s3(vendor="google", api_endpoint="keywords")
 . The issue is that some vendors have multiple endpoints and when we try to parameterize the api_endpoint we get some errors. I'll post some code in the thread below but I'd love to know the recommended way to solve.
Here's the code:
Copy code
import uuid
import gzip
import os
import json
from datetime import datetime
from typing import List, Dict

from prefect import Task
from prefect.utilities.logging import get_logger as get_prefect_logger
from prefect.utilities.aws import get_boto_client

LOGGER = get_prefect_logger()
# The S3 bucket
API_INGEST_BUCKET = "rtr-api-ingest"
# Where data will be written to in the container
TEMP_OUTPUT_FOLDER = "/app/api_data"



class APIDataToS3(Task):

    #######################################
    # Prefect.Task class method overrides #
    #######################################
    def __init__(self, vendor: str, api_endpoint: str, environment: str, **kwargs):

        if vendor is None:
            raise ValueError("You must specify a vendor")
        if api_endpoint is None:
            raise ValueError("You must specify an api_endpoint")

        # Instance attributes
        self.vendor = vendor
        self.api_endpoint = api_endpoint
        self.environment = environment

        super().__init__(**kwargs)

    def run(self, data: str) -> None:
        """Takes a List object, dumps it to a JSON file, and publishes it to S3.
        Whatever "vendor" and "endpoint" were passed to the class init method
        will determine the S3 object key's prefix.
        e.g. If vendor="sailthru", endpoint="blast", the resulting S3 object key
        will look like:
        /sailthru/blast/2020/07/10/09/b23bf59f-3936-46da-acb7-b4352fd82348.json.gz
        Returns: None
        """

        api_endpoint_value = self.api_endpoint
        # S3 Settings
        self.s3_client = get_boto_client("s3")
        # 1) Write data to local file
        filepath = self._data_to_json_file(data=data, api_endpoint=api_endpoint_value)
        # 2) Publish local file to S3
        self._publish_file_to_s3(filepath, api_endpoint=api_endpoint_value)
        # 3) Remove local file
        os.remove(filepath)

        return None
The 
_data_to_json_file
 and 
_publish_file_to_s3
 aren't included here, but if helpful let me know and I can add. In another flow, we have:
Copy code
api_endpoint = Parameter("api_endpoint", default="keywords")
send_data_to_s3 = APIDataToS3(vendor="google", api_endpoint=api_endpoint, environment=env) #this is where we initiate the class
...
sent_data = send_data_to_s3.map(data=data) #this is where we call the class
We currently get the following error though:
Copy code
flow.py:314: UserWarning: A Task was passed as an argument to APIDataToS3, you likely want to first initialize APIDataToS3 with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:

  my_task = APIDataToS3(...)  # static (non-Task) args go here
  res = my_task(...)  # dynamic (Task) args go here
I thought by adding 
api_endpoint_value = self.api_endpoint
 to the 
run
 function in the class that it would substantiate the variable and allow it to be used. Previously, I didn't have that and I was getting something like 
<Parameter: api_endpoint>
 instead of the actual parameter value. Is the recommended way to do this to just pass in the parameter when the function is called and make 
api_endpoint
 an argument in run?
k
Hi @Danielle Dalton, I’m looking at this
👍 1
I think the issue here is that the
init
method is ran during build time while the
run
is ran during runtime. You can pass the parameters to the
run
method instead.
Parameters
only have a value in runtime so they need deferred execution. It seems that you have other methods that accept api_endpoint defined from on the
api_endpoint = self.api_endpoint
. I would try passing
api_endpoint
directly to the
run
. Does that make sense?
d
Yes it does - thank you! I will take that approach!