Danny Vilela
12/16/2021, 9:17 PMHdfsSparkDataFrameResult
implementation and I’m now looking to test it. I’ve been successful in writing unit tests (similar to those in the prefect
test suite) but I’m now looking to test whether the result works in a Flow. Namely, I’d want to be sure that if some task is decorated with @task(result=HdfsSparkDataFrameResult(…))
that the Flow doesn’t re-run the task if the result already exists. Has anyone done this for their own custom Result
(sub-)type?
I’m trying to follow the test suite examples under TestOutputCaching
, but I’m not sure I’m testing the right thing. I’ll add some more code in a thread here, but any advice/experience would be very welcome!Kevin Kho
Kevin Kho
Danny Vilela
12/16/2021, 9:21 PMLocalResult
and whatnot would be great to have in prefect (since they’re already in Luigi)! Details coming in just a minute.Danny Vilela
12/16/2021, 9:25 PMHdfsSparkDataFrameResult
that appears to work as expected when testing its individual write
, read
, and exists
methods in unit tests.
2. I’m now trying to test that Prefect is properly using that result when re-running that task.
3. To do that, I’ve decorated select_one
with result=HdfsSparkDataFrameResult(...)
, which – if the result works as expected – I imagine would first check whether the result exists, and if it does, reads the DataFrame from HDFS.
4. I’m not sure what assertions to make – either on the flow
, or the flow state fs
, or on the task result result
, or on the re-run flow state fs2
– to assert that Prefect is just running the task once (on the first flow.run()
) and reading from cache on the second flow.run()
.
from internalpackagename.hdfs import HDFSPath
from loguru import logger
from prefect.core.flow import Flow
from prefect.engine.flow_runner import FlowRunner
from prefect.engine.state import State
from prefect.utilities.tasks import task
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from prefectx.results import HdfsSparkDataFrameResult
class TestHdfsSparkDataFrameResult:
def test_result_on_task(self, tmp_hdfs_dir: HDFSPath, spark: SparkSession) -> None:
# Create a `prefect` task that outputs a PySpark DataFrame.
@task(
result=HdfsSparkDataFrameResult(
location=tmp_hdfs_dir,
write_kwargs=dict(mode="overwrite")
)
)
def select_one() -> DataFrame:
"""Create and return a placeholder DataFrame."""
df: DataFrame = spark.sql(sqlQuery="SELECT 1")
return df
# Run our flow once, knowing that the DataFrame won't be cached.
with Flow(name="test") as flow:
result = select_one()
fr: FlowRunner = FlowRunner(flow=flow)
fs: State = fr.run()
# First, make sure the task/flow was successful.
assert fs.is_successful()
# Maybe re-run the flow so that the task gets cached?
fs2: State = fr.run()
assert fs2.is_successful()
# Maybe test that the flow was cached? No, right? The task should be cached?
assert fs2.is_cached()
Danny Vilela
12/16/2021, 9:26 PMselect_one
task’s state on fs2
would be cached (instead of successful), but I’m not 100% sure.Kevin Kho
Danny Vilela
12/16/2021, 9:28 PMDanny Vilela
12/16/2021, 10:16 PMcheckpoint=True
to the @task
decorator. In my testing environment I already have PREFECT__FLOWS__CHECKPOINTING=true
so that’s good 👍Kevin Kho
Kevin Kho
Kevin Kho
Danny Vilela
12/17/2021, 8:21 PMFlow
itself is responsible for generating some DataFrame in HDFS we’d use with Flow(…, result=HdfsSparkDataFrameResult(...)) as flow
and checkpointing would apply. But with a Task
within a flow we’d want to use caching. That explanation makes sense, but it’s a bit confusing since the official Task
class docstring uses checkpoint
as what we’re describing as “caching”? Unless I’m (still) misunderstanding, shouldn’t it be cache=True
?
Regardless, I’ll check out the tests you’ve linked and see if I can’t get mine working!Kevin Kho