When I tried to run the below code I am getting th...
# prefect-community
t
When I tried to run the below code I am getting this error:
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Copy code
from prefect import task, flow
import contextlib
from pyspark import SparkConf
from pyspark.sql import SparkSession

@contextlib.contextmanager
def SparkCluster(conf2: SparkConf = SparkConf()):
    ssobj = SparkSession.builder.config(conf=conf2).getOrCreate()
    try:
        yield ssobj
    finally:
        ssobj.stop()

@task
def get_data(spark: SparkSession):
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

@task()
def analyze(df):
    word_count = df.groupBy('word').count()
    word_count.show()


@flow(name="spark_flow")
def my_flow():
    conf = SparkConf().setMaster('local[*]')
    with SparkCluster(conf) as spark:
        df = get_data(spark)
        analyze(df)

if __name__ == '__main__':
    my_flow()
Basically I was trying to convert the below code from prefect 1.0 to 2.0:
Copy code
from prefect import task, Flow, resource_manager

from pyspark import SparkConf
from pyspark.sql import SparkSession

@resource_manager
class SparkCluster:
    def __init__(self, conf: SparkConf = SparkConf()):
        self.conf = conf

    def setup(self) -> SparkSession:
        return SparkSession.builder.config(conf=self.conf).getOrCreate()

    def cleanup(self, spark: SparkSession):
        spark.stop()

@task
def get_data(spark: SparkSession):
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

@task(log_stdout=True)
def analyze(df):
    word_count = df.groupBy('word').count()
    word_count.show()


with Flow("spark_flow") as flow:
    conf = SparkConf().setMaster('local[*]')
    with SparkCluster(conf) as spark:
        df = get_data(spark)
        analyze(df)

if __name__ == '__main__':
    flow.run()
1
k
Did you get the same error when running the code with 1.0?
1
c
+1 to Khuyen’s question. I would be surprised if this worked in Prefect 1.0/2.0, as passing the data frame between the tasks seems like an error and likely to encounter a pickle serialization issue. Specific to your use-case of spark, the issue here seems related to the passing of the dataframe from one task to another. Spark Dataframe’s exist in the context of a SparkContext. Passing it between tasks feels strange, as results are saved for each task and I don’t think a spark context can be pickled. From a task perspective, it also seems weird, as Spark’s operations are lazily-evaluated, so it isn’t clear how a retry would work if needed. My suggestion: Refactor the code to instantiate the sparkContext as part of the task. Design the tasks to drive from load -> action on the dataFrame. You can then persist your data into S3/GCS, and download it as part of another task to perform additional operations
Alternatively, you could use prefect to launch a job against an existing spark cluster/provision a cluster, which is the more common design pattern with an orchestrator service like Prefect/Airflow. e.g. https://docs-v1.prefect.io/api/0.15.13/tasks/databricks.html#databrickssubmitrun
t
@Khuyen Tran yes it is working successfully in v1