Hi, I'm trying to run a Pyspark script inside my F...
# ask-community
l
Hi, I'm trying to run a Pyspark script inside my Flow, but it gives me this error: I'm using Local Agent, already tried changing environment paths, but still gives me the same error after I register -> "Quick Run" Does anybody know how to deal with pyspark scripts(not Databricks) with Prefect?
Copy code
Exception: Java gateway process exited before sending its port number
k
Hey @Lucas Hosoya, could you show me your script? I’ve done it before with Databricks
l
Sure, I've used a sample create DataFrame `
Copy code
import os 
import sys
import prefect
from prefect import task, Flow
from pyspark.sql import SparkSession

@task(log_stdout=True)
def test():
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    dept = [("Finance",10), 
            ("Marketing",20), 
            ("Sales",30), 
            ("IT",40) 
        ]

    deptColumns = ["dept_name","dept_id"]
    df = spark.createDataFrame(data=dept, schema = deptColumns)
    df.write.csv('a', mode='overwrite')
        

with Flow("test pyspark") as flow:
    test()

flow.register(project_name='DCT')
I also tried with this config in both prefect start agent and the flow.run_config
Copy code
flow.run_config = LocalRun(env={"PYSPARK_SUBMIT_ARGS":"--master local[2] pyspark-shell", 
        "JAVA_HOME":r"/usr/lib/jvm/java-1.8.0-openjdk-amd64",
        "SPARK_HOME":r"/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark",
        "PYSPARK_PYTHON":r"/home/comp/anaconda3/envs/prefect/bin/python3.8",
        "HADOOP_HOME":r"/home/comp/hadoop"})
k
Does it work fine when not wrapped in a task?
l
Only works when I use flow.run, other than that I can register to the Agent, but it gives me the error
Ignore the sc = SparkContext(conf=conf), I forgot to delete haha
k
This looks good. If it works in
flow.run()
it doesn’t even use the RunConfig and considering this a LocalRun, you should have the same env variables set up. Just confirming the local agent is on the same machine that you tested
flow.run()
on right?
l
Yup, it works fine in flow.run()
k
Do you get the same error with an empty run config?
l
Yes, using only the "spark = SparkSession.builder.getOrCreate() " and the spark.createDataFrame(...) gives me the same error
k
Is the agent running in the same Python environment as the
flow.run()
used?
l
Hm, I'm pretty sure it is, I'm using this config in .service `
Copy code
/home/comp/anaconda3/envs/prefect/bin/prefect agent local start
k
What happens when you remove
flow.run()
and
flow.register()
and use the CLI instead to do
prefect run -p filename.py
?
l
It does work fine
I'm trying using some os.getenv() on my flow.register()
when I run through the Cloud UI, it returns None, which it shouldn't, right?
k
The
prefect run
runs agent the backend (just without an agent).
os.getenv()
needs an env variable name supplied. Maybe you want
print(os.environ)
?
l
Will try
It did work, but is it ok to show prefect.engine.cloud.CloudTaskRunner?
k
Yeah those are environment variables Prefect sets to run your flow
I’ll ask the team for more info. This looks like it should work unless there is some bizarre thing that interferes with Spark
l
Sure! I'll be trying some more configs to see if I can run
z
Hey @Lucas Hosoya - a few questions to help track down where the disconnect is here If I understand correctly, using
prefect run -p filename.py
it works fine. Does
/home/comp/anaconda3/envs/prefect/bin/prefect run -p filename.py
also succeed? Also, is there any chance you're executing
prefect run
from the pyspark shell? (e.g. running the command
./bin/pyspark
?
A few more items that might be helpful • In the Flow, can you log the value of
import sys; sys.executable
? This will confirm we're pointing at the right python path • Do you set
PYSPARK_DRIVER_PYTHON
anywhere in your environment? (either on the run config or your local env)
l
Hey @Zach Angell It does work with
Copy code
/home/comp/anaconda3/envs/prefect/bin/prefect run -p filename.py
For both import
sys;sys.excutable
prints this:
/home/comp/anaconda3/envs/prefect/bin/python3
• For my
PYSPARK_DRIVER_PYTHON
I'm using as
PYSPARK_PYTHON
located in
~/anaconda3/envs/prefect/bin/python3.8
Should the pyspark_python be located @python3?
z
hmmm is
python3
or
python3.8
the correct python executable? you might be able to tell by executing
/home/comp/anaconda3/envs/prefect/bin/python3 flow_file.py
if
flow_file.py
calls
flow.run()
l
Well, both seems to be working
z
🤔 hmm okay, honestly I don't know pyspark/conda well enough to give a good answer there Back to trying to get more info - is there any way we can log a full stacktrace of the exception you're hitting when running the flow? In the task, something like
Copy code
try:
   ... # spark stuff
except Exception as exc:
   print (exc)
   print (exc.traceback)
should do it
thanks for all your help debugging here btw! I wish i knew pyspark a bit better
l
Sure, let me run it
This is the full error
Copy code
Traceback (most recent call last):
  File "prefect_spark.py", line 26, in hello_task
    spark = SparkSession.builder.getOrCreate()
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/sql/session.py", line 228, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 384, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 144, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 331, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/java_gateway.py", line 108, in launch_gateway
    raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number
z
What happens if you use
prefect run -n <flow-name> --execute
instead of submitting it to an agent?
upvote 2
l
Oh, now it does show some different error `
Copy code
ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Asked to run locally with 0 threads
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2893)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:557)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
21/09/14 22:36:36 WARN MetricsSystem: Stopping a MetricsSystem that is not running
└── 22:36:36 | INFO    | <class 'Exception'>
└── 22:36:36 | INFO    | Traceback (most recent call last):
  File "prefect_spark.py", line 26, in hello_task
    spark = SparkSession.builder.master("local[*]").getOrCreate()
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/sql/session.py", line 228, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 384, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 146, in __init__
    self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 209, in _do_init
    self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 321, in _initialize_context
    return self._jvm.JavaSparkContext(jconf)
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/py4j/java_gateway.py", line 1573, in __call__
    return_value = get_return_value(
  File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Asked to run locally with 0 threads
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2893)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:557)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
I've tried to use with
.master("local")
or even
.master("local[*]")
but still the same
z
With
--execute
it's copying your entire environment into a subprocess so it seems likely that it's missing some sort of environment variables when run with the agent
Not sure how to help you with those new errors though
l
Yeah, I'm looking for some pyspark + subprocesses, but all I find is about adding the
.master
or
PYSPARK_SUBMIT_ARGS
, will look for more environment variable details though. Maybe it has something to do with
spark-submit
or
spark-shell
variables
@Zanie @Kevin Kho @Zach Angell Hello! Sorry for @you and keeping this thread giant! I've solved the problem, but it feels like a little problem with
anaconda
and
paths
(tl;dr variables) So what solved? • Whenever I run `prefect run -n <flow-name> --execute`(which run agentless locally) it does as Michael said, it copies all environment variables, but it missed a few important variables for both
Anaconda
and
PySpark/Spark
(*Same for Hadoop, but I don't think it comes to this case). • For the
Quick Run
(which run using an Agent - local Agent in my case), it doesn't copy any important variable, so I had to throw the
.run_config()
method with a lot of variables (copied all of the important vars to run a pyspark-shell/submit environment - gonna test which of the variables are that I really need to use). • TL;DR - After doing so many tests and +60 flow versions, I've managed to run successfully using the Agent. THE FIX - Had to make my subprocess in the Agent look almost the same as my linux/win env. Thanks for all the help! Great support!
🚀 2
z
Nice work!
1
k
Glad you got that figured out Lucas!
1