Lucas Hosoya
09/14/2021, 6:56 PMException: Java gateway process exited before sending its port number
Kevin Kho
Lucas Hosoya
09/14/2021, 6:59 PMimport os
import sys
import prefect
from prefect import task, Flow
from pyspark.sql import SparkSession
@task(log_stdout=True)
def test():
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
dept = [("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
]
deptColumns = ["dept_name","dept_id"]
df = spark.createDataFrame(data=dept, schema = deptColumns)
df.write.csv('a', mode='overwrite')
with Flow("test pyspark") as flow:
test()
flow.register(project_name='DCT')
Lucas Hosoya
09/14/2021, 7:00 PMflow.run_config = LocalRun(env={"PYSPARK_SUBMIT_ARGS":"--master local[2] pyspark-shell",
"JAVA_HOME":r"/usr/lib/jvm/java-1.8.0-openjdk-amd64",
"SPARK_HOME":r"/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark",
"PYSPARK_PYTHON":r"/home/comp/anaconda3/envs/prefect/bin/python3.8",
"HADOOP_HOME":r"/home/comp/hadoop"})
Kevin Kho
Lucas Hosoya
09/14/2021, 7:02 PMLucas Hosoya
09/14/2021, 7:03 PMKevin Kho
flow.run()
it doesn’t even use the RunConfig and considering this a LocalRun, you should have the same env variables set up. Just confirming the local agent is on the same machine that you tested flow.run()
on right?Lucas Hosoya
09/14/2021, 7:08 PMKevin Kho
Lucas Hosoya
09/14/2021, 7:11 PMKevin Kho
flow.run()
used?Lucas Hosoya
09/14/2021, 7:14 PM/home/comp/anaconda3/envs/prefect/bin/prefect agent local start
Kevin Kho
flow.run()
and flow.register()
and use the CLI instead to do prefect run -p filename.py
?Lucas Hosoya
09/14/2021, 7:18 PMLucas Hosoya
09/14/2021, 7:19 PMLucas Hosoya
09/14/2021, 7:20 PMKevin Kho
prefect run
runs agent the backend (just without an agent). os.getenv()
needs an env variable name supplied. Maybe you want print(os.environ)
?Lucas Hosoya
09/14/2021, 7:24 PMLucas Hosoya
09/14/2021, 7:27 PMKevin Kho
Kevin Kho
Lucas Hosoya
09/14/2021, 7:33 PMZach Angell
prefect run -p filename.py
it works fine. Does /home/comp/anaconda3/envs/prefect/bin/prefect run -p filename.py
also succeed?
Also, is there any chance you're executing prefect run
from the pyspark shell? (e.g. running the command ./bin/pyspark
?Zach Angell
import sys; sys.executable
? This will confirm we're pointing at the right python path
• Do you set PYSPARK_DRIVER_PYTHON
anywhere in your environment? (either on the run config or your local env)Lucas Hosoya
09/14/2021, 8:01 PM/home/comp/anaconda3/envs/prefect/bin/prefect run -p filename.py
Lucas Hosoya
09/14/2021, 8:05 PMsys;sys.excutable
prints this: /home/comp/anaconda3/envs/prefect/bin/python3
• For my PYSPARK_DRIVER_PYTHON
I'm using as PYSPARK_PYTHON
located in ~/anaconda3/envs/prefect/bin/python3.8
Lucas Hosoya
09/14/2021, 8:05 PMZach Angell
python3
or python3.8
the correct python executable?
you might be able to tell by executing /home/comp/anaconda3/envs/prefect/bin/python3 flow_file.py
if flow_file.py
calls flow.run()
Lucas Hosoya
09/14/2021, 8:15 PMZach Angell
try:
... # spark stuff
except Exception as exc:
print (exc)
print (exc.traceback)
should do itZach Angell
Lucas Hosoya
09/14/2021, 9:00 PMLucas Hosoya
09/14/2021, 9:09 PMTraceback (most recent call last):
File "prefect_spark.py", line 26, in hello_task
spark = SparkSession.builder.getOrCreate()
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/sql/session.py", line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 384, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 144, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 331, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/java_gateway.py", line 108, in launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number
Zanie
prefect run -n <flow-name> --execute
instead of submitting it to an agent?Lucas Hosoya
09/14/2021, 10:37 PMERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Asked to run locally with 0 threads
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2893)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:557)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
21/09/14 22:36:36 WARN MetricsSystem: Stopping a MetricsSystem that is not running
└── 22:36:36 | INFO | <class 'Exception'>
└── 22:36:36 | INFO | Traceback (most recent call last):
File "prefect_spark.py", line 26, in hello_task
spark = SparkSession.builder.master("local[*]").getOrCreate()
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/sql/session.py", line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 384, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 146, in __init__
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 209, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/pyspark/context.py", line 321, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/py4j/java_gateway.py", line 1573, in __call__
return_value = get_return_value(
File "/home/comp/anaconda3/envs/prefect/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Asked to run locally with 0 threads
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2893)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:557)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Lucas Hosoya
09/14/2021, 10:41 PM.master("local")
or even .master("local[*]")
but still the sameZanie
--execute
it's copying your entire environment into a subprocess so it seems likely that it's missing some sort of environment variables when run with the agentZanie
Lucas Hosoya
09/14/2021, 10:56 PM.master
or PYSPARK_SUBMIT_ARGS
, will look for more environment variable details though.
Maybe it has something to do with spark-submit
or spark-shell
variablesLucas Hosoya
09/15/2021, 12:10 PManaconda
and paths
(tl;dr variables)
So what solved?
• Whenever I run `prefect run -n <flow-name> --execute`(which run agentless locally) it does as Michael said, it copies all environment variables, but it missed a few important variables for both Anaconda
and PySpark/Spark
(*Same for Hadoop, but I don't think it comes to this case).
• For the Quick Run
(which run using an Agent - local Agent in my case), it doesn't copy any important variable, so I had to throw the .run_config()
method with a lot of variables (copied all of the important vars to run a pyspark-shell/submit environment - gonna test which of the variables are that I really need to use).
• TL;DR - After doing so many tests and +60 flow versions, I've managed to run successfully using the Agent. THE FIX - Had to make my subprocess in the Agent look almost the same as my linux/win env.
Thanks for all the help! Great support!Zach Angell
Kevin Kho