Danny Vilela
12/16/2021, 9:17 PMHdfsSparkDataFrameResult implementation and I’m now looking to test it. I’ve been successful in writing unit tests (similar to those in the prefect test suite) but I’m now looking to test whether the result works in a Flow. Namely, I’d want to be sure that if some task is decorated with @task(result=HdfsSparkDataFrameResult(…)) that the Flow doesn’t re-run the task if the result already exists. Has anyone done this for their own custom Result (sub-)type?
I’m trying to follow the test suite examples under TestOutputCaching, but I’m not sure I’m testing the right thing. I’ll add some more code in a thread here, but any advice/experience would be very welcome!Kevin Kho
Kevin Kho
Danny Vilela
12/16/2021, 9:21 PMLocalResult and whatnot would be great to have in prefect (since they’re already in Luigi)! Details coming in just a minute.Danny Vilela
12/16/2021, 9:25 PMHdfsSparkDataFrameResult that appears to work as expected when testing its individual write, read, and exists methods in unit tests.
2. I’m now trying to test that Prefect is properly using that result when re-running that task.
3. To do that, I’ve decorated select_one with result=HdfsSparkDataFrameResult(...), which – if the result works as expected – I imagine would first check whether the result exists, and if it does, reads the DataFrame from HDFS.
4. I’m not sure what assertions to make – either on the flow, or the flow state fs, or on the task result result, or on the re-run flow state fs2 – to assert that Prefect is just running the task once (on the first flow.run()) and reading from cache on the second flow.run().
from internalpackagename.hdfs import HDFSPath
from loguru import logger
from prefect.core.flow import Flow
from prefect.engine.flow_runner import FlowRunner
from prefect.engine.state import State
from prefect.utilities.tasks import task
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from prefectx.results import HdfsSparkDataFrameResult
class TestHdfsSparkDataFrameResult:
def test_result_on_task(self, tmp_hdfs_dir: HDFSPath, spark: SparkSession) -> None:
# Create a `prefect` task that outputs a PySpark DataFrame.
@task(
result=HdfsSparkDataFrameResult(
location=tmp_hdfs_dir,
write_kwargs=dict(mode="overwrite")
)
)
def select_one() -> DataFrame:
"""Create and return a placeholder DataFrame."""
df: DataFrame = spark.sql(sqlQuery="SELECT 1")
return df
# Run our flow once, knowing that the DataFrame won't be cached.
with Flow(name="test") as flow:
result = select_one()
fr: FlowRunner = FlowRunner(flow=flow)
fs: State = fr.run()
# First, make sure the task/flow was successful.
assert fs.is_successful()
# Maybe re-run the flow so that the task gets cached?
fs2: State = fr.run()
assert fs2.is_successful()
# Maybe test that the flow was cached? No, right? The task should be cached?
assert fs2.is_cached()Danny Vilela
12/16/2021, 9:26 PMselect_one task’s state on fs2 would be cached (instead of successful), but I’m not 100% sure.Kevin Kho
Danny Vilela
12/16/2021, 9:28 PMDanny Vilela
12/16/2021, 10:16 PMcheckpoint=True to the @task decorator. In my testing environment I already have PREFECT__FLOWS__CHECKPOINTING=true so that’s good 👍Kevin Kho
Kevin Kho
Kevin Kho
Danny Vilela
12/17/2021, 8:21 PMFlow itself is responsible for generating some DataFrame in HDFS we’d use with Flow(…, result=HdfsSparkDataFrameResult(...)) as flow and checkpointing would apply. But with a Task within a flow we’d want to use caching. That explanation makes sense, but it’s a bit confusing since the official Task class docstring uses checkpoint as what we’re describing as “caching”? Unless I’m (still) misunderstanding, shouldn’t it be cache=True?
Regardless, I’ll check out the tests you’ve linked and see if I can’t get mine working!Kevin Kho