spark

- submit application (sparkconf object cannot be changed after SparkContext creation

  • method 1
bin/spark-submit \
—class com.example.MyApp \
—master local[4] \
—name “My Spark App” \
—conf spark.ui.port=36000 \
myApp.jar

If an error occurred with "Invalid signature file digest for Manifest main attributes", use below method to fix,

zip -d <jar file name>.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF 

  • method 2
bin/spark-submit \
—class com.example.MyApp \
—properties-file my-config.conf \ // default is <spark dir>/conf/spark-defaults.conf
myApp.jar
## Contents of my-config.conf ##
spark.master local[4]
spark.app.name “My Spark App”
spark.ui.port 36000
  • configuration priority

set() > flags to spark-submit > values in property file > default value

SPARK_LOCAL_DIRS cannot be set in SparkConf. it's set in conf/spark-env.sh. it's dirs of shuffled data and RDD data.

- Execution, job, stage and task

  • transformation RDD, to get metadata of lineage, rdd.toDebugString, indentation for each stage
  • action on RDD, 

a job for a particular action. A job is consisted of set of stages. A stage has tasks for each partition. stage may be not 1: 1 correspondent with RDD because there could be pipelining among RDDs if they can be computed from their parents without data movement.

if RDD is cached/persisted, stages is truncated since data can be read directly from memory or disk. the shuffled data can also avoid some stages since RDD can read data from the shuffle.

Invalid signature file digest for Manifest main attributes

- web ui and log

spark's web UI to see logs if it's standalone mode

yarn logs - applicationId <app id> if it's yarn and completed job. for uncompleted job, go to spark's resource manager UI to specific worker node and container to get log info.

conf/log4j.properties


- Key performance consideration

  • Level of parallelism 

one task for each partition usually. e.g. one partition for each HDFS block

RDD derived from shuffle is set based on its parent RDDs.

set RDD's parallelism after shuffling or repartition (coalesce) existing RDD

  • Serialization format (KryoSerializer as alternative to Java Serailizer)
val conf = new SparkConf()
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
// Be strict about class registration
conf.set(“spark.kryo.registrationRequired”, “true”)
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

To debug serialization issue,  add“-Dsun.io.serialization.extended DebugInfo=true”You can enable this option this using the —driver-java-options and —executor-java-options

  • Memory Management

spark.storage.memoryFraction for RDD's storage when persist or cache (60%)

persist with MEMORY_AND_DISK to drop and restore old cache to disk. it avoids re-computation. for cache with large number of objects, use MEMORY_AND_DISK_SER instead since it serializes multiple object into one giant buffer which reduces number of objects. thus good for GC which scales with number of objects in heap.

spark.shuffle.memoryFraction for shuffle data and aggregation (20%)

left memery is for user code, like object allocation (20%)

- Hardware provisioning

—executor-memory

—executor-cores

—num-executors

To check spark storage UI to see what fraction of your cached data is in memory. Usually, cache a subset of data in small cluster and then extrapolate total memory you need to fit larger amount of data in memory.

one caveat of "more is better" is long GC time of large memory allocation in executor.

;