https://prefect.io logo
s

Scott Moreland

11/23/2020, 12:58 PM
Hi all! I'm looking at using Prefect to manage ETL processes that are executed by PySpark on a CDSW/Hadoop environment. I'd like to subclass the Results class---much like what has already been done for Google Cloud and AWS---such that each task can read/write to Hive storage by toggling the Prefect cache variable. I.e. loading a cached task would simply read the spark dataframe created by that task via
sql_context.read.table
. Are there any references for this?
k

Kyle Moon-Wright

11/23/2020, 4:47 PM
Hey @Scott Moreland, What sort of references are you after? I don’t think there are references to this specifically in the documentation - the best resources for you may be the source code for the Result Class and the use cases found here in the community.
s

Scott Moreland

11/23/2020, 5:01 PM
So I mimic'd the S3Result class, and it correctly reads from from the spark dataframe on Hive when the target exists. However, when the Dataframe doesn't exist, and it has to build it, it doesn't seem to call the Result write function at all. I'm looking at the src code and trying to figure out what conditions need to be met to trigger the Result write function.
k

Kyle Moon-Wright

11/23/2020, 6:12 PM
Gotcha, I may have misunderstood a bit. I think it depends on how you’re defining your Result, but it should write if provided a location as you can see here. You can certainly change those conditions with a subclass, but typically the result will write on the Task’s successful task run state.
s

Scott Moreland

11/23/2020, 6:22 PM
What's the relation between the target field and the location field? I want to read a persisted table if it exists (from target), and create the table if it does not (to location). So it sounds like target and location should assume the same value. Is there a way to do this cleanly?
Thanks in advance!
k

Kyle Moon-Wright

11/23/2020, 6:52 PM
Hmm, I think it’s best to think about the
target
and
location
as both being a part of the write part of the Result, not the read (
target
only checks for existence). You may need to customize a task/logic to check for your table’s existence before doing your Result write - I can’t think of a way to do this cleanly otherwise, but I will continue to think about it.
s

Scott Moreland

11/23/2020, 7:06 PM
Copy code
sql_context = create_sql_context()
db_result = HiveResult(sql_context, location='task_output_table_name')

@task(target='task_output_table_name', result=db_result)
def create_table(sql_context):
    """Transform a table"""
    
    sdf = sql_context.read.table('database.src_table_name')
    
    sdf = sdf.groupby('col1').agg(sf.sum('col2').alias('sum'))
    
    db_result.write(sdf)
           
    return sdf
Dealing with something like this and trying to avoid duplicating the persistent table name in both location and target. Also wondering if I really need to manually specify
db_result.write(sdf)
as it sounds like this should happen automatically when the target doesn't exist and needs to be rebuilt.
k

Kyle Moon-Wright

11/23/2020, 7:18 PM
This is looking good I believe. Haha, yeah duplicating the results is our main concern here. You are correct that if the target doesn’t exist, it will write automatically, so I think you are wright (😉) that we don’t need to manually call the
write
method on your
HiveResult/db_result
.
s

Scott Moreland

11/23/2020, 7:20 PM
So I do understand that's how it is supposed to function, but unfortunately it never triggers the write statement if I remove it, even if the target doesn't exist.
Anyhow, thanks so much for thinking about this. Much appreciated. I'll chew on it for a bit and see if I can get to the bottom of it.
k

Kyle Moon-Wright

11/23/2020, 7:28 PM
Hey np, this is deeper into Results than I’ve been in a while.
Not verified: I think it may exist because our HiveResult checks/creates that location when it instantiates, then the
create_table
task runs and sees that the target exists?
Feel free to work on it and hit me back!
1
s

Scott Moreland

11/23/2020, 9:35 PM
Found the issue (finally). I was not properly setting
prefect.config.checkpointing
to True via the associated environment variable. Thanks again for the help!
🚀 1
👍 1
k

Kyle Moon-Wright

11/23/2020, 9:40 PM
Absolutely, nicely done! Thanks for including your solution, hopefully this will help others in the future.