Hi all! I’m getting close to a new `HdfsSparkDataF...
# ask-community
d
Hi all! I’m getting close to a new
HdfsSparkDataFrameResult
implementation and I’m now looking to test it. I’ve been successful in writing unit tests (similar to those in the
prefect
test suite) but I’m now looking to test whether the result works in a Flow. Namely, I’d want to be sure that if some task is decorated with
@task(result=HdfsSparkDataFrameResult(…))
that the Flow doesn’t re-run the task if the result already exists. Has anyone done this for their own custom
Result
(sub-)type? I’m trying to follow the test suite examples under
TestOutputCaching
, but I’m not sure I’m testing the right thing. I’ll add some more code in a thread here, but any advice/experience would be very welcome!
k
Maybe you can open a work-in-progress PR and we can have the conversation over the pull request?
1
You’re also welcome to leave details here
1
d
I’d be happy to try and contribute the result once it works! 😅 It’s also slightly overfit to some of our internal implementations/HDFS accessors, but I’d be happy to refactor it. I think HDFS analogues for the
LocalResult
and whatnot would be great to have in prefect (since they’re already in Luigi)! Details coming in just a minute.
Here’s the gist of it: 1. I have a
HdfsSparkDataFrameResult
that appears to work as expected when testing its individual
write
,
read
, and
exists
methods in unit tests. 2. I’m now trying to test that Prefect is properly using that result when re-running that task. 3. To do that, I’ve decorated
select_one
with
result=HdfsSparkDataFrameResult(...)
, which – if the result works as expected – I imagine would first check whether the result exists, and if it does, reads the DataFrame from HDFS. 4. I’m not sure what assertions to make – either on the
flow
, or the flow state
fs
, or on the task result
result
, or on the re-run flow state
fs2
– to assert that Prefect is just running the task once (on the first
flow.run()
) and reading from cache on the second
flow.run()
.
Copy code
from internalpackagename.hdfs import HDFSPath
from loguru import logger
from prefect.core.flow import Flow
from prefect.engine.flow_runner import FlowRunner
from prefect.engine.state import State
from prefect.utilities.tasks import task
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession

from prefectx.results import HdfsSparkDataFrameResult


class TestHdfsSparkDataFrameResult:

    def test_result_on_task(self, tmp_hdfs_dir: HDFSPath, spark: SparkSession) -> None:

        # Create a `prefect` task that outputs a PySpark DataFrame.
        @task(
            result=HdfsSparkDataFrameResult(
                location=tmp_hdfs_dir,
                write_kwargs=dict(mode="overwrite")
            )
        )
        def select_one() -> DataFrame:
            """Create and return a placeholder DataFrame."""
            df: DataFrame = spark.sql(sqlQuery="SELECT 1")
            return df

        # Run our flow once, knowing that the DataFrame won't be cached.
        with Flow(name="test") as flow:
            result = select_one()
        fr: FlowRunner = FlowRunner(flow=flow)
        fs: State = fr.run()

        # First, make sure the task/flow was successful.
        assert fs.is_successful()

        # Maybe re-run the flow so that the task gets cached?
        fs2: State = fr.run()
        assert fs2.is_successful()
        # Maybe test that the flow was cached? No, right? The task should be cached?
        assert fs2.is_cached()
I think I’d want to test that the
select_one
task’s state on
fs2
would be cached (instead of successful), but I’m not 100% sure.
k
I will likely get back tomorrow for this one
d
No worries! Thanks @Kevin Kho. I’ll keep trying to hack away at it in the meantime 🙂
Quick edit: to be safe, I think we need to pass
checkpoint=True
to the
@task
decorator. In my testing environment I already have
PREFECT__FLOWS__CHECKPOINTING=true
so that’s good 👍
k
Ok I looked at this, I don’t think you are looking at the right test. The test you pointed to is about Caching, which is different from Checkpointing. Caching is applied across Flow runs. Checkpointing is done at the task level. I think these tests are more relevant more you
Checkpointing is about Flow restarts and continuing from where work left off while caching described like “don’t run this task for flow runs in the next 24 hours”
I think this test is clear and might give you an idea for what you want to do https://github.com/PrefectHQ/prefect/blob/master/tests/engine/test_task_runner.py#L1217-L1230
d
Hey @Kevin Kho thanks for clarifying! So if a
Flow
itself is responsible for generating some DataFrame in HDFS we’d use
with Flow(…, result=HdfsSparkDataFrameResult(...)) as flow
and checkpointing would apply. But with a
Task
within a flow we’d want to use caching. That explanation makes sense, but it’s a bit confusing since the official
Task
class docstring uses
checkpoint
as what we’re describing as “caching”? Unless I’m (still) misunderstanding, shouldn’t it be
cache=True
? Regardless, I’ll check out the tests you’ve linked and see if I can’t get mine working!
k
^ Resolved through DM
1