Hi everyone, I am trying to run a simple task aga...
# prefect-community
a
Hi everyone, I am trying to run a simple task against a spark cluster using Prefect 1.2 and i get this error:
TypeError: cannot pickle '_thread.RLock' object
I can see the application being registered inside the spark cluster. Thanks!
Full error:
Copy code
Unexpected error: TypeError("cannot pickle '_thread.RLock' object") Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state result = self.result.write(value, **formatting_kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 75, in write binary_data = new.serializer.serialize(new.value) File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize return cloudpickle.dumps(value) File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_thread.RLock' object
k
What are your tasks inputs and outputs?
a
Copy code
@task(name='Get data', log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=10), state_handlers=[post_to_slack_on_failure])
def get_data():
    spark = SparkSession.builder.appName("Bobita-app").master("<spark://spark-master:7077>").getOrCreate()
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

with Flow(
            name=flow_name,
            schedule=CronSchedule(cron=cron),
            storage=GCS(bucket="prefect-bucket"),
            run_config=KubernetesRun(
                labels=["dev"], 
                job_template=register_flow_job_template,
                service_account_name="prefect-sa",
                # env={"PYSPARK_SUBMIT_ARGS": "--master <spark://spark-master-alerts:7077> --deploy-mode cluster pyspark-shell"},
                image="prefect-base:v0.1",
                image_pull_policy="Always",
                )
        ) as flow:

             df = get_data()
@Kyle McChesney I've added above the task and flow snippets Logs from spark master proving the app is registered.
Copy code
22/09/20 15:16:25 INFO Master: Registered app Bobita-app with ID app-20220920151625-0000  
22/09/20 15:16:25 INFO Master: Launching executor app-20220920151625-0000/0 on worker worker-20220920151514-10.42.163.29-36489  
22/09/20 15:16:25 INFO Master: Launching executor app-20220920151625-0000/1 on worker worker-20220920151515-10.42.87.199-43455
k
thanks for sharing. Nothing sticks out as immediately wrong here. I think the error is related to trying to pickle the result from your task (the spark dataframe). I dont have a whole lot of spark experience, but is it possible that the spark dataframe is not pickle-able (ie, it might contain a reference to the session or connection which includes an RLock object)?
As a quick test maybe throw
.toPandas()
on the end of the return
It might also just be the connection object. This seems related: https://github.com/PrefectHQ/prefect/discussions/3374
a
Thanks a lot, @Kyle McChesney. I'll have a look
Made some progress (i've added .toPandas() like you said), but now i get another error, this time inside the spark workers:
Caused by: java.io.IOException: Failed to connect to prefect-job-b3fe1959-274t6:42699
They are trying to connect to the Kubernetes job that started the flow. Maybe to return the state of the task? I am running the entire infra behind a corporate proxy and trying to figure out if this is causing the error.