Hey, I've got a quick question about persisting te...
# prefect-community
c
Hey, I've got a quick question about persisting temporary files between tasks. It looks like if I pass a
tmpfile
object between tasks, it'll work as expected, as in
Copy code
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
    logger = prefect.context.get("logger")
    the_file = tempfile.NamedTemporaryFile()
    for i in range(5000000):
        the_file.write(b"lorem ipsum\n")
    the_file.seek(0)
    <http://logger.info|logger.info>("wrote the file")
    return the_file

@task(name="reread that data")
def read_some_data(the_file, result=PrefectResult()):
    logger = prefect.context.get("logger")
    output = the_file.read()
    <http://logger.info|logger.info>(f"read the file")
    <http://logger.info|logger.info>(f"length of file is {len(output)}")


with Flow("save_retrieve_file_flow") as flow:
    f = save_some_data()
    g = read_some_data(f)
but passing the name of the file won't work - the file will not be found
Copy code
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
    logger = prefect.context.get("logger")
    the_file = tempfile.NamedTemporaryFile()
    for i in range(500000):
        the_file.write(b"lorem ipsum\n")
    the_file.seek(0)
    <http://logger.info|logger.info>(f"wrote the file to {the_file.name}")
    return the_file.name

@task(name="reread that data")
def read_some_data(the_file_name, result=PrefectResult()):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"trying to open {the_file_name}")
    
    with open(the_file_name, 'r') as the_file:
        output = the_file.read()
        <http://logger.info|logger.info>(f"read the file")
        <http://logger.info|logger.info>(f"length of file is {len(output)}")


with Flow("save_retrieve_file_flow2", 
        state_handlers=[cloud_only_slack_handler]
       ) as flow:
    f = save_some_data()
    g = read_some_data(f)
what's going on here? is the file system being reset between tasks? does prefect clean up temporary files in the flow if they're not in memory directly? something else?
👀 1
z
Temporary files are deleted when they are closed, which Python does for you when there are no longer references to the open handle. When they are passed from task to task they are serialized which I presume is closing the file because you can’t pass serialize an open file handle.
Pass
delete=False
when you create the file and clean it up manually when you’re done with it
c
that's interesting. one sec lemme try that
yeah, that does the thing
z
Sorry my explanation was a bit flawed, it works when you pass
tmpfile
between the tasks because there is still a reference to the open file — looks like it passes from task to task fine although you will run into problems into a distributed setting where it is serialized.
c
appreciate the quick response
z
When you pass it as
tmpfile.name
you drop the reference to
tmpfile
and the open file handler and it’s cleaned up 🙂
No problem!
c
yeah that makes sense
I thought it was somethign prefect was doing to the fs at first, but python's garbage collector makes a lot more sense as the culprit