Chris Jordan

    Chris Jordan

    1 year ago
    Hey, I've got a quick question about persisting temporary files between tasks. It looks like if I pass a
    tmpfile
    object between tasks, it'll work as expected, as in
    @task(name="save records to a tmp file")
    def save_some_data(result=PrefectResult()):
        logger = prefect.context.get("logger")
        the_file = tempfile.NamedTemporaryFile()
        for i in range(5000000):
            the_file.write(b"lorem ipsum\n")
        the_file.seek(0)
        <http://logger.info|logger.info>("wrote the file")
        return the_file
    
    @task(name="reread that data")
    def read_some_data(the_file, result=PrefectResult()):
        logger = prefect.context.get("logger")
        output = the_file.read()
        <http://logger.info|logger.info>(f"read the file")
        <http://logger.info|logger.info>(f"length of file is {len(output)}")
    
    
    with Flow("save_retrieve_file_flow") as flow:
        f = save_some_data()
        g = read_some_data(f)
    but passing the name of the file won't work - the file will not be found
    @task(name="save records to a tmp file")
    def save_some_data(result=PrefectResult()):
        logger = prefect.context.get("logger")
        the_file = tempfile.NamedTemporaryFile()
        for i in range(500000):
            the_file.write(b"lorem ipsum\n")
        the_file.seek(0)
        <http://logger.info|logger.info>(f"wrote the file to {the_file.name}")
        return the_file.name
    
    @task(name="reread that data")
    def read_some_data(the_file_name, result=PrefectResult()):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"trying to open {the_file_name}")
        
        with open(the_file_name, 'r') as the_file:
            output = the_file.read()
            <http://logger.info|logger.info>(f"read the file")
            <http://logger.info|logger.info>(f"length of file is {len(output)}")
    
    
    with Flow("save_retrieve_file_flow2", 
            state_handlers=[cloud_only_slack_handler]
           ) as flow:
        f = save_some_data()
        g = read_some_data(f)
    what's going on here? is the file system being reset between tasks? does prefect clean up temporary files in the flow if they're not in memory directly? something else?
    Michael Adkins

    Michael Adkins

    1 year ago
    Temporary files are deleted when they are closed, which Python does for you when there are no longer references to the open handle. When they are passed from task to task they are serialized which I presume is closing the file because you can’t pass serialize an open file handle.
    Pass
    delete=False
    when you create the file and clean it up manually when you’re done with it
    Chris Jordan

    Chris Jordan

    1 year ago
    that's interesting. one sec lemme try that
    yeah, that does the thing
    Michael Adkins

    Michael Adkins

    1 year ago
    Sorry my explanation was a bit flawed, it works when you pass
    tmpfile
    between the tasks because there is still a reference to the open file — looks like it passes from task to task fine although you will run into problems into a distributed setting where it is serialized.
    Chris Jordan

    Chris Jordan

    1 year ago
    appreciate the quick response
    Michael Adkins

    Michael Adkins

    1 year ago
    When you pass it as
    tmpfile.name
    you drop the reference to
    tmpfile
    and the open file handler and it’s cleaned up 🙂
    No problem!
    Chris Jordan

    Chris Jordan

    1 year ago
    yeah that makes sense
    I thought it was somethign prefect was doing to the fs at first, but python's garbage collector makes a lot more sense as the culprit