Jacob (he/him)
07/31/2020, 5:09 PM@task
def get_file_name(table):
file_name = f'bpi/{table}/-000'
return file_name
with Flow("write to s3") as flow:
exports = ['email_acquisitions', 'new_list_acquisitions', 'revenue', 'segmentation']
for export in exports:
file_name = get_file_name(export)
print(file_name)
I was trying to write a function that does some string manipulation to get a s3 path (this is simplified ^) and when I add the @task decorator, file_name
= <Task: get_file_name>, when I just want it to return the string produced. Am i doing something in a non-prefect way?nicholas
07/31/2020, 5:18 PMprint
is evaluated at script runtime, it's outputting the task method instead of the task evaluation. If you modify your code like this:
@task
def get_file_name(table):
file_name = f'bpi/{table}/-000'
return file_name
@task
def print_file_name(file):
print(file)
with Flow("write to s3") as flow:
exports = ['email_acquisitions', 'new_list_acquisitions', 'revenue', 'segmentation']
for export in exports:
file_name = get_file_name(export)
print_file_name(file_name)
flow.run()
you'll be able to see what you're expectingprint
won't show up in your logs in Cloud/Server 🙂Jacob (he/him)
07/31/2020, 5:31 PM@task
def read_s3(file,bucket_name, s3):
obj = s3.Object(bucket_name, file_name)
body = obj.get()['Body'].read()
yields Invalid type for parameter Key, value: <Task: get_file_name>, type: <class 'prefect.tasks.core.function.FunctionTask'>, valid types: <class 'str'>
nicholas
07/31/2020, 5:34 PM@task
def get_file_name(table):
file_name = f'bpi/{table}/-000'
return file_name
@task
def print_file_name(file):
print(file)
@task export_list():
return ['email_acquisitions', 'new_list_acquisitions', 'revenue', 'segmentation']
with Flow("write to s3") as flow:
file_name = get_file_name.map(export_list)
print_file_name.map(file_name)
flow.run()
Jacob (he/him)
07/31/2020, 5:36 PM