Hi, all. I am new to Prefect and trying to create ...
# ask-community
m
Hi, all. I am new to Prefect and trying to create a work flow with following need: 1. Get list of files from S3 2. Iterate the files and transpose the values in each file and dump to a db. I have following python functions: • function 1-> get S3 object • function 2-> read the S3 file content and transpose the value • function 3 -> write the transposed values to db What I am looking is how to link them in a flow so that function 1, 2, and 3 are executed for all the files I have on S3. How would I do it using Prefect? Thanks, Maz
1
k
Hey @maz, 1. Wrap them as tasks with the
@task
decorator 2. Use prefect’s
map
operator
Copy code
@task
def func1...

with Flow() as flow:
     files = func1() # get files
     transformed = func2().map(files)
     write(transformed)
Something like this
upvote 1
m
Many thanks for the help. I implemented it and below is are signature of my tasks and flow. @task def get_s3_objects(bucket: str, credentials: dict =None) -> dict: @task def get_db_obj(db_name: str, user: str, password: str, port: str): @task def load_data_to_db(s3_object, db_obj, skip_first_row=True): with Flow(FLOW_NAME) as flow:         s3_objects = get_s3_objects(bucket, credentials)         psi_db_obj = get_db_obj(aws_db_name, user_name, password, port)         load_data_to_db(psi_db_obj).map(s3_object=s3_objects) I am having problem with load_data_to_db task when added to flow. As there are other arguments needed for the task, how should i write it so that it will take a map for the s3_objects
k
I think you want
load_data_to_db.map(s3_object=s3_objects, db_obj=unmapped(psi_db_obj), skip_first_row=unmapped(True))
where
unmapped
is
from prefect import unmapped
.
And then you might get an error where
psi_db_obj
is not pickle-able, but just come back when that happens.