Apache Spark — local setup for efficient development and debugging

Sairam Krish
4 min readMar 10, 2021

Having a good local setup is always handy to deploy spark apps, develop, debug and learn the spark ecosystem.

Local standalone spark cluster

./sbin/start-master.sh
./sbin/start-worker.sh spark://<system_name>:7077
# Once done, we could stop all of them
./sbin/stop-all.sh
  • Having a master and worker based local setup is handy to simulate different needs & life cycle of spark application.
  • We can fine tune how spark application is submitted to the cluster, different parameters and configuration passed.

Persisting application events for debugging

While creating spark session, it would be useful to enable event log. This helps in viewing Jobs, stages and DAG after the application is completed.

spark = (
SparkSession.builder.master("spark://system_name:7077")
.appName("testing")
.config('spark.eventLog.enabled','true')
.getOrCreate()
)

Now we can view the historic information and drill down details later with history server

./sbin/start-history-server.sh

Unit test sandbox

import unittest
from pyspark.sql import SparkSession
from pandas.testing import assert_frame_equal
from numpy.testing import assert_array_almost_equal
spark = (
SparkSession.builder.master("spark://system_name:7077")
.appName("testing")
.config('spark.eventLog.enabled','true')
.getOrCreate()
)
class TestSparkDataProcessing(unittest.TestCase):
def setUp(self):
self.df = spark.read.csv('my_big_dataset.csv',header=True).limit(10000)
def test_my_processing(self):
result = do_spark_based_processing()
result.write.save('result.csv', format='csv', mode='append', header=True)
if __name__ == '__main__':
unittest.main()

Docker based jupyter notebook with Spark

version: "3.7"services:

jupyter-spark:
image: jupyter/pyspark-notebook:latest
networks:
- default_net
ports:
- "8888:8888"
- "4040-4080:4040-4080"
environment:
- JUPYTER_ENABLE_LAB=yes
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
volumes:
- ./notebooks:/home/jovyan/work/notebooks/
- ./data:/home/jovyan/work/data/
networks:
default_net:
attachable: true

jupyter/pyspark-notebook — is an excellent place to start. It has jupyter notebook. It also has a spark installed.

If you want scala or other variants — please check here

Observations

  • Notice that we have enabledJUPYTER_ENABLE_LAB
  • Apache spark ports 4040 to 4080are binded. Every new spark context that is created is put onto an incrementing port (ie. 4040, 4041, 4042, etc.)
  • AWS credentials are passed as environment variable to docker. This is helpful in connecting with S3 buckets etc. I tried to mount them as volume to .aws/credentials inside docker, but it didn’t work as expected.
  • More details on this image can be found in this link

AWS dependencies for accessing S3

In Jupyter notebook, we need :

from os import environ
spark_dependencies = [
'org.apache.hadoop:hadoop-aws:3.2.0'
]
spark_jars_packages = ','.join(list(spark_dependencies))
environ['PYSPARK_SUBMIT_ARGS'] = f'--packages "{spark_jars_packages}" pyspark-shell'
  • hadoop-aws — internally depends on com.amazonaws#aws-java-sdk-bundle;1.11.xxx We don’t need to explicitly call this out. Actually if we add that manually and if there are multiple versions of aws-java-sdk that raises java exception.

Alternate ways to pass AWS Credentials

  • Easiest way is to pass it from docker-compose as shown above.
  • We could also pass it in our notebook like this
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", 'YOUR_ACCESS')
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'YOUR_SECRET')

Found good explanation on this is in spark-redshift document

Spark Standalone mode vs local

I used to get confused between these two initially.

Spark local mode is useful for experimentation on small data when you do not have a Spark cluster available. For example, we could do following for small data and experiment with Spark.

from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()

Standalone mode is running a spark cluster manually. In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or by using provided launch scripts. It is also possible to run these daemons on a single machine for testing.

Installing python dependencies on Jupyter notebook docker

At times, we may need to install python dependencies. We could extend the base image and custom the docker image. Or to have a quick flow, we could install with following command

# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install boto3

Iterate through AWS S3 bucket — folders

Some times we may need to iterate through the S3 bucket and process all the csv in that bucket, for instance.

paginator = s3.get_paginator('list_objects_v2')
result = paginator.paginate(Bucket='bucket-name',Prefix ='some-folder/input')
for page in result:
if "Contents" in page:
for key in page[ "Contents" ]:
keyString = key[ "Key" ]
print(keyString)

Understanding the pyspark API and getting things done

I personally found pyspark documentation less useful. I felt viewing the source code was much easier to understand the flow

# This will help download minimal source code
# This will download only the branch that we are interested
git clone --branch branch-3.0 --single-branch --depth 1 --no-tags https://github.com/apache/spark.git

From python perspective, two directories are super useful to traverse :

  • spark/examples/src/main/python
  • spark/python

Developer references

Good follow up reads

--

--