Chris Jordan
01/19/2021, 10:13 PMtmpfile
object between tasks, it'll work as expected, as in
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
logger = prefect.context.get("logger")
the_file = tempfile.NamedTemporaryFile()
for i in range(5000000):
the_file.write(b"lorem ipsum\n")
the_file.seek(0)
<http://logger.info|logger.info>("wrote the file")
return the_file
@task(name="reread that data")
def read_some_data(the_file, result=PrefectResult()):
logger = prefect.context.get("logger")
output = the_file.read()
<http://logger.info|logger.info>(f"read the file")
<http://logger.info|logger.info>(f"length of file is {len(output)}")
with Flow("save_retrieve_file_flow") as flow:
f = save_some_data()
g = read_some_data(f)
but passing the name of the file won't work - the file will not be found
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
logger = prefect.context.get("logger")
the_file = tempfile.NamedTemporaryFile()
for i in range(500000):
the_file.write(b"lorem ipsum\n")
the_file.seek(0)
<http://logger.info|logger.info>(f"wrote the file to {the_file.name}")
return the_file.name
@task(name="reread that data")
def read_some_data(the_file_name, result=PrefectResult()):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"trying to open {the_file_name}")
with open(the_file_name, 'r') as the_file:
output = the_file.read()
<http://logger.info|logger.info>(f"read the file")
<http://logger.info|logger.info>(f"length of file is {len(output)}")
with Flow("save_retrieve_file_flow2",
state_handlers=[cloud_only_slack_handler]
) as flow:
f = save_some_data()
g = read_some_data(f)
what's going on here? is the file system being reset between tasks? does prefect clean up temporary files in the flow if they're not in memory directly? something else?Zanie
delete=False
when you create the file and clean it up manually when you’re done with itChris Jordan
01/19/2021, 10:24 PMZanie
tmpfile
between the tasks because there is still a reference to the open file — looks like it passes from task to task fine although you will run into problems into a distributed setting where it is serialized.Chris Jordan
01/19/2021, 10:25 PMZanie
tmpfile.name
you drop the reference to tmpfile
and the open file handler and it’s cleaned up 🙂Chris Jordan
01/19/2021, 10:26 PM