Python API and PySpark

https://spark.apache.org/docs/2.4.8/api/python/index.html

Spark Deploy Modes

Command

Client Mode

Cluster Mode

pyspark

useful for interactive sessions or
debugging small applications

N/A

spark-submit

small applications with stdout

long running batch applications

Interactive Jobs

Interactive sessions are primarily meant for prototyping and debugging applications. While this provides the familiar Python interface, is it not a very efficient use of resources.

module load python/booth/3.9/3.9.6

# specify IPython interface
export PYSPARK_DRIVER_PYTHON=ipython3

# uncomment below to use Jupyter interface instead
#export PYSPARK_DRIVER_PYTHON=jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS=notebook

# launch interactive pyspark session
pyspark
Welcome to
   ____              __
  / __/__  ___ _____/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/__ / .__/\_,_/_/ /_/\_\   version 3.1.1
   /_/

   Using Python version 3.9.6 (default, Aug 2 2021 09:23:49)
SparkSession available as 'spark'.

In [1]:

Batch Jobs

The simplest way to submit a batch job is by using the spark-submit command with no additional command line options. By default, this launches spark in client mode. Client mode is useful for prototyping applications and debugging. Additional options can be included when submitting your Spark application. The example below associates a name with the Spark app for help tracking applications.

# load python module
module load python/booth/3.9/3.9.6

# client mode enables stdout
spark-submit --name myFancyApp script.py

The next example demonstrates a Spark application submitted in cluster mode. Cluster mode is the recommended way to submit mature applications or long-running applications. In cluster mode, the Spark driver is launched on a physically separate machine, (i.e. a worker node). For that reason, you must let Spark know if any additional files will be needed at runtime. The --files options is used to upload a local data file so that it is available to the executors during runtime.

# load python module
module load python/booth/3.9/3.9.6

# cluster mode disables stdout but allows long-running jobs to continue after logging off
spark-submit --deploy-mode cluster --files myfile.dat script.py

Because the Spark driver runs on a worker node, stdout is disabled in cluster mode. Any print statements or other output can be viewed in the logs by navigating to the web UI http://hdpmn01.chicagobooth.edu:8088. Once on the web UI, click on your application to find the logs.

Examples

Example 1

Apache Spark ships with a few example scripts that serve as useful demos. You can find the examples at the following path: ${SPARK_HOME}/examples/src/main/python/.

# load python module
module load python/booth/3.9/3.9.6

# run the example script to estimate the value of pi
spark-submit ${SPARK_HOME}/examples/src/main/python/pi.py

Example 2

The example python script below read a TransUnion parquet file and performs a few simple operations.

import pyspark

infile = "/dataset/TransUnion_Data/2000/dgebooth_Collection*.pq"

with pyspark.sql.SparkSession.builder.getOrCreate() as spark:

    df = spark.read.parquet(infile)

    g = df.groupBy("SubjectKey")
    g.count().sort("SubjectKey").show()

    df.printSchema()
    print(f"Number of rows: {df.count()}")

Configuration Options

Default Options

Every Spark job uses executors to perform tasks. In Vulcan, the default behavior is that executors are allocated dynamically based on the number of tasks required. For that reason, users should not specify the number of executors required for their application.

Vulcan has 384 CPU cores and 1.88 TB of memory reserved exclusively for Spark jobs. By default, each executor will use 4 CPU cores and 18 GB of memory. At these default values, Vulcan can accommodate a maximum of 96 executors distributed among all running applications.

Additional Options

The basic syntax for submitting spark jobs is: spark-submit [options] python-file [app arguments]. Options entered on the command line take precedence over any default values. Below are a few commonly used options.

  • --name you can enter any name to help identify your jobs in the web interface

  • --deploy-mode the default is client; the other option is cluster

  • --files any files needed during program execution need to be specified with this option

  • --driver-memory the default value is 4.5 GB

  • --executor-memory the default value is 18 GB and should normally not be set by users

Additional Tips

Logging Level

For troubleshooting or debugging purposes, it can be useful to turn on verbose logging. The loglevel can be adjusted inside the python script once the spark context has been initialized.

import pyspark

# initialize spark session
with pyspark.sql.SparkSession.builder.getOrCreate() as spark:

    # increase loglevel (the default is "WARN")
    spark.sparkContext.setLogLevel("INFO")

Pandas API

Users familiar with Python Pandas may be interested in the recently released Pandas API for Apache Spark 3.2+. The following example demonstrates loading a dataframe using the Pandas API on Spark.

import pyspark
import pyspark.pandas as ps

infile = "/dataset/TransUnion_Data/2000/dgebooth_Collection*.pq"

with pyspark.sql.SparkSession.builder.getOrCreate() as spark:

    spark_df = spark.read.parquet(infile)

    # convert to pandas-on-spark dataframe
    pandas_pdf_1 = spark_df.to_pandas_on_spark()

    # OR read parquet file directly using pyspark.pandas
    pandas_pdf_2 = ps.read_parquet(infile)