Hi! As I get it right and Prefect doesn't provide ...
# prefect-server
в
Hi! As I get it right and Prefect doesn't provide any redshift helpers for uploading data? So we need to implement upload logic ourselves (but at the same time I find tasks related to bigquery and snowflake). I mean it's not a problem, but want to be sure because didn't find any information about redshift in docs/github/slack
a
Check out awswrangler, you can literally load data to Redshift in a single line in a Prefect task: • https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/008%20-%20Redshift%20-%20Copy%20%26%20Unload.htmlhttps://aws-data-wrangler.readthedocs.io/en/stable/tutorials/009%20-%20Redshift%20-%20Append%2C%20Overwrite%2C%20Upsert.html If you still have questions after reading this, LMK and I'll try to help more
в
Ok, I will try it, thanks!
@Anna Geller Actually there is one question. Before uploading to Redshift, I had a task that wrote data to s3. I need to get the path to the recorded file from the previous task to specify it in awswrangler. I'm trying to do this via task_to_s3.result.location, the location is displayed as '{task_name}.csv', since I used templates. Can I somehow isolate the real path to the file in s3?
a
I wouldn't rely on results to load data to source systems, you can think of Results more as something that can help you fix data pipeline's failures (e.g. to restart from failure) rather than something to use in your applications. But there's another one liner from awswrangler you can use to load data to S3 🙂 more info here and an example:
Copy code
wr.s3.to_parquet(df1, path1)
в
I mean, for example, in Luigi, the output for the current task is the input for the next one. I.e. we don't have to explicitly write the paths to the saved files from previous tasks. The whole problem is that Prefect renders template names during execution and {task_name}.csv turns into a beautiful name. But if we try to refer to this target name explicitly to get the path to the recorded files, then we will get a dry string "task_name.csv"
a
you can always return the same path name (rendered at runtime from context arguments) in one task and pass it to the next:
Copy code
@task
def do_sth():
    return f"{prefect.context.get('task_name')}.csv"
k
I think both paradigms are possible in Prefect, you can handle the file persisting explicitly yourself like this:
Copy code
@task1
def create_df():
    location = df.to_csv(...)
    return location

@task
def load_df(location):
    df = pd.read_csv(location)
    # more stuff
    return

with Flow(..) as flow:
    loc = create_df()
    load_df(loc)
Or you can have Prefect handle it for you with the Result interface and then in this case it is both persisted and passed on to the next task.
Copy code
@task1(result = S3Result(location="{task_name}.csv", serializer=PandaSerializer())
def create_df():
    return df

@task
def load_df(df):
    # more stuff
    return

with Flow(..) as flow:
    df = create_df()
    load_df(df)
If you don’t like persisting the file, you can turn off checkpointing too.