Danielle Dalton
04/29/2021, 5:56 PMapi_data_to_s3
that sends json data to S3. In other flows, we import this class and pass it two arguments, the vendor name and the api_endpoint (ex. api_data_to_s3(vendor="google", api_endpoint="keywords")
. The issue is that some vendors have multiple endpoints and when we try to parameterize the api_endpoint we get some errors. I'll post some code in the thread below but I'd love to know the recommended way to solve.Danielle Dalton
04/29/2021, 5:56 PMimport uuid
import gzip
import os
import json
from datetime import datetime
from typing import List, Dict
from prefect import Task
from prefect.utilities.logging import get_logger as get_prefect_logger
from prefect.utilities.aws import get_boto_client
LOGGER = get_prefect_logger()
# The S3 bucket
API_INGEST_BUCKET = "rtr-api-ingest"
# Where data will be written to in the container
TEMP_OUTPUT_FOLDER = "/app/api_data"
class APIDataToS3(Task):
#######################################
# Prefect.Task class method overrides #
#######################################
def __init__(self, vendor: str, api_endpoint: str, environment: str, **kwargs):
if vendor is None:
raise ValueError("You must specify a vendor")
if api_endpoint is None:
raise ValueError("You must specify an api_endpoint")
# Instance attributes
self.vendor = vendor
self.api_endpoint = api_endpoint
self.environment = environment
super().__init__(**kwargs)
def run(self, data: str) -> None:
"""Takes a List object, dumps it to a JSON file, and publishes it to S3.
Whatever "vendor" and "endpoint" were passed to the class init method
will determine the S3 object key's prefix.
e.g. If vendor="sailthru", endpoint="blast", the resulting S3 object key
will look like:
/sailthru/blast/2020/07/10/09/b23bf59f-3936-46da-acb7-b4352fd82348.json.gz
Returns: None
"""
api_endpoint_value = self.api_endpoint
# S3 Settings
self.s3_client = get_boto_client("s3")
# 1) Write data to local file
filepath = self._data_to_json_file(data=data, api_endpoint=api_endpoint_value)
# 2) Publish local file to S3
self._publish_file_to_s3(filepath, api_endpoint=api_endpoint_value)
# 3) Remove local file
os.remove(filepath)
return None
The _data_to_json_file
and _publish_file_to_s3
aren't included here, but if helpful let me know and I can add. In another flow, we have:
api_endpoint = Parameter("api_endpoint", default="keywords")
send_data_to_s3 = APIDataToS3(vendor="google", api_endpoint=api_endpoint, environment=env) #this is where we initiate the class
...
sent_data = send_data_to_s3.map(data=data) #this is where we call the class
We currently get the following error though:
flow.py:314: UserWarning: A Task was passed as an argument to APIDataToS3, you likely want to first initialize APIDataToS3 with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
my_task = APIDataToS3(...) # static (non-Task) args go here
res = my_task(...) # dynamic (Task) args go here
I thought by adding api_endpoint_value = self.api_endpoint
to the run
function in the class that it would substantiate the variable and allow it to be used. Previously, I didn't have that and I was getting something like <Parameter: api_endpoint>
instead of the actual parameter value. Is the recommended way to do this to just pass in the parameter when the function is called and make api_endpoint
an argument in run?Kevin Kho
Kevin Kho
init
method is ran during build time while the run
is ran during runtime. You can pass the parameters to the run
method instead. Parameters
only have a value in runtime so they need deferred execution. It seems that you have other methods that accept api_endpoint defined from on the api_endpoint = self.api_endpoint
. I would try passing api_endpoint
directly to the run
. Does that make sense?Danielle Dalton
04/29/2021, 6:26 PM