I want to convert the below code written in prefec...
# prefect-community
t
I want to convert the below code written in prefect 1 to prefect 2:
Copy code
from prefect import task, Flow, resource_manager

from pyspark import SparkConf
from pyspark.sql import SparkSession

@resource_manager
class SparkCluster:
    def __init__(self, conf: SparkConf = SparkConf()):
        self.conf = conf

    def setup(self) -> SparkSession:
        return SparkSession.builder.config(conf=self.conf).getOrCreate()

    def cleanup(self, spark: SparkSession):
        spark.stop()

@task
def get_data(spark: SparkSession):
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

@task(log_stdout=True)
def analyze(df):
    word_count = df.groupBy('word').count()
    word_count.show()


with Flow("spark_flow") as flow:
    conf = SparkConf().setMaster('local[*]')
    with SparkCluster(conf) as spark:
        df = get_data(spark)
        analyze(df)

if __name__ == '__main__':
    flow.run()
I have converted a few obvious things but still I dont have an idea how to remove this resource_manager decorator and what context manager to use in that place. Now my code looks like this:
Copy code
from prefect import task, flow, resource_manager

from pyspark import SparkConf
from pyspark.sql import SparkSession

@resource_manager
class SparkCluster:
    def __init__(self, conf: SparkConf = SparkConf()):
        self.conf = conf

    def setup(self) -> SparkSession:
        return SparkSession.builder.config(conf=self.conf).getOrCreate()

    def cleanup(self, spark: SparkSession):
        spark.stop()

@task
def get_data(spark: SparkSession):
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

@task(log_stdout=True)
def analyze(df):
    word_count = df.groupBy('word').count()
    word_count.show()


@flow(name="spark_flow")
def my_flow():
    conf = SparkConf().setMaster('local[*]')
    with SparkCluster(conf) as spark:
        df = get_data(spark)
        analyze(df)

if __name__ == '__main__':
    my_flow()
Can anyone please help me in this. cc @Anna Geller
1
can you please take a look or tag some appropriate person @Kevin Kho @Cole Murray
a
Tejal, please don't tag anyone, as this violates our Code of Conduct. We'll get back to you as soon as we can. If you need SLA-based support, reach out to our paid support channel: cs@prefect.io. Thanks a lot in advance
✔️ 1
r
Hi Tejal, it looks like this is happening because the return values of Prefect 2 tasks need to be serializable. If you try to serialize a PySpark DataFrame, you get the error message you are seeing here:
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
A few potential solutions that come to mind: • Create the DataFrame in the flow and pass it into the task functions(s) that need to use it. • Create the DataFrame and perform operations like groupBy and count in the same task. • If your dataset isn't too large, convert the PySpark DataFrame to a Pandas DataFrame before returning it from
get_data
.
🙏 1
🙌 2
j
Could you please move your code to this thread @Tejal Singh to help keep the main channel tidy?
🙏 1
💯 1