Thread
#prefect-server
    Владислав Богучаров

    Владислав Богучаров

    7 months ago
    Hi! As I get it right and Prefect doesn't provide any redshift helpers for uploading data? So we need to implement upload logic ourselves (but at the same time I find tasks related to bigquery and snowflake). I mean it's not a problem, but want to be sure because didn't find any information about redshift in docs/github/slack
    Anna Geller

    Anna Geller

    7 months ago
    Check out awswrangler, you can literally load data to Redshift in a single line in a Prefect task: • https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/008%20-%20Redshift%20-%20Copy%20%26%20Unload.html https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/009%20-%20Redshift%20-%20Append%2C%20Overwrite%2C%20Upsert.html If you still have questions after reading this, LMK and I'll try to help more
    Владислав Богучаров

    Владислав Богучаров

    7 months ago
    Ok, I will try it, thanks!
    @Anna Geller Actually there is one question. Before uploading to Redshift, I had a task that wrote data to s3. I need to get the path to the recorded file from the previous task to specify it in awswrangler. I'm trying to do this via task_to_s3.result.location, the location is displayed as '{task_name}.csv', since I used templates. Can I somehow isolate the real path to the file in s3?
    Anna Geller

    Anna Geller

    7 months ago
    I wouldn't rely on results to load data to source systems, you can think of Results more as something that can help you fix data pipeline's failures (e.g. to restart from failure) rather than something to use in your applications. But there's another one liner from awswrangler you can use to load data to S3 🙂 more info here and an example:
    wr.s3.to_parquet(df1, path1)
    Владислав Богучаров

    Владислав Богучаров

    7 months ago
    I mean, for example, in Luigi, the output for the current task is the input for the next one. I.e. we don't have to explicitly write the paths to the saved files from previous tasks. The whole problem is that Prefect renders template names during execution and {task_name}.csv turns into a beautiful name. But if we try to refer to this target name explicitly to get the path to the recorded files, then we will get a dry string "task_name.csv"
    Anna Geller

    Anna Geller

    7 months ago
    you can always return the same path name (rendered at runtime from context arguments) in one task and pass it to the next:
    @task
    def do_sth():
        return f"{prefect.context.get('task_name')}.csv"
    Kevin Kho

    Kevin Kho

    7 months ago
    I think both paradigms are possible in Prefect, you can handle the file persisting explicitly yourself like this:
    @task1
    def create_df():
        location = df.to_csv(...)
        return location
    
    @task
    def load_df(location):
        df = pd.read_csv(location)
        # more stuff
        return
    
    with Flow(..) as flow:
        loc = create_df()
        load_df(loc)
    Or you can have Prefect handle it for you with the Result interface and then in this case it is both persisted and passed on to the next task.
    @task1(result = S3Result(location="{task_name}.csv", serializer=PandaSerializer())
    def create_df():
        return df
    
    @task
    def load_df(df):
        # more stuff
        return
    
    with Flow(..) as flow:
        df = create_df()
        load_df(df)
    If you don’t like persisting the file, you can turn off checkpointing too.