Tejal Singh
08/23/2022, 6:33 PMRuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
from prefect import task, flow
import contextlib
from pyspark import SparkConf
from pyspark.sql import SparkSession
@contextlib.contextmanager
def SparkCluster(conf2: SparkConf = SparkConf()):
ssobj = SparkSession.builder.config(conf=conf2).getOrCreate()
try:
yield ssobj
finally:
ssobj.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task()
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
@flow(name="spark_flow")
def my_flow():
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
my_flow()
Basically I was trying to convert the below code from prefect 1.0 to 2.0:
from prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
Khuyen Tran
08/23/2022, 6:34 PMCole Murray
08/24/2022, 3:01 AMTejal Singh
08/24/2022, 6:46 AM