Hi, I am trying to replace a make based work flow ...
# prefect-community
y
Hi, I am trying to replace a make based work flow with prefect. I was wondering how I can implement a file centric work flow. If an upstream task doesn’t have a return value but generates a file instead, and the downstream task takes that file as input and process that file, what is the best way to specify such dependency? Here’s a simple mock-up for my question:
Copy code
class GenerateFile( Task ):
  def run( self ):
    with open( 'result.txt', 'w' ) as f:
      f.write( f'This file is generated by {self.name}.' )

class ProcessFile( Task ):
  def run( self ):
    with open( 'result.txt', 'r' ) as f:
      print( f.read() )

gen_task   = GenerateFile()
print_task = PrintFile()

with Flow( 'test caching' ) as flow:
  gen_result   = gen_task()
  print_result = print_task( upstream_tasks=[ gen_result ] )
Is there a better way to do it other than manually set the
upstream_tasks
? Another question is how can I specify the generated file as target such that I get the same caching behavior as make? I tried
Copy code
gen_task   = GenerateFile( target='result.txt', checkpoint=True, result=LocalResult( dir='.' ) )
but it does not seem to work.
e
I would return the filename or filepath from
GenerateFile
, and use the upstream value as filename in
ProcessFile
, rather than hardcoding in
ProcessFile
. I am not experienced with make, or prefect checkpoints, but your caching may not be working because
GenerateFile
has no return value, i.e. nothing to cache.
y
Right - returning the filepath should be a way to work around it. But then I think prefect will cache the filepath rather than the file itself?
e
yeah, I usually persist files in a storage external to the flows runtime, such as aws s3. Then my file reading tasks handles how to read from s3, much like how your
ProcessFile
handles how to read from local storage.