Python API and PySpark
https://spark.apache.org/docs/2.4.8/api/python/index.html
Command |
Client Mode |
Cluster Mode |
---|---|---|
pyspark |
useful for interactive sessions or
debugging small applications
|
N/A |
spark-submit |
small applications with stdout |
long running batch applications |
Interactive Jobs
Interactive sessions are primarily meant for prototyping and debugging applications. While this provides the familiar Python interface, is it not a very efficient use of resources.
module load python/booth/3.9/3.9.6
# specify IPython interface
export PYSPARK_DRIVER_PYTHON=ipython3
# uncomment below to use Jupyter interface instead
#export PYSPARK_DRIVER_PYTHON=jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS=notebook
# launch interactive pyspark session
pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.9.6 (default, Aug 2 2021 09:23:49)
SparkSession available as 'spark'.
In [1]:
Batch Jobs
The simplest way to submit a batch job is by using the spark-submit
command with no additional command line options.
By default, this launches spark in client mode.
Client mode is useful for prototyping applications and debugging.
Additional options can be included when submitting your Spark application.
The example below associates a name with the Spark app for help tracking applications.
# load python module
module load python/booth/3.9/3.9.6
# client mode enables stdout
spark-submit --name myFancyApp script.py
The next example demonstrates a Spark application submitted in cluster mode.
Cluster mode is the recommended way to submit mature applications or long-running applications.
In cluster mode, the Spark driver is launched on a physically separate machine, (i.e. a worker node).
For that reason, you must let Spark know if any additional files will be needed at runtime.
The --files
options is used to upload a local data file so that it is available to the executors during runtime.
# load python module
module load python/booth/3.9/3.9.6
# cluster mode disables stdout but allows long-running jobs to continue after logging off
spark-submit --deploy-mode cluster --files myfile.dat script.py
Because the Spark driver runs on a worker node, stdout is disabled in cluster mode. Any print statements or other output can be viewed in the logs by navigating to the web UI http://hdpmn01.chicagobooth.edu:8088. Once on the web UI, click on your application to find the logs.
Examples
Example 1
Apache Spark ships with a few example scripts that serve as useful demos.
You can find the examples at the following path: ${SPARK_HOME}/examples/src/main/python/
.
# load python module
module load python/booth/3.9/3.9.6
# run the example script to estimate the value of pi
spark-submit ${SPARK_HOME}/examples/src/main/python/pi.py
Example 2
The example python script below read a TransUnion parquet file and performs a few simple operations.
import pyspark
infile = "/dataset/TransUnion_Data/2000/dgebooth_Collection*.pq"
with pyspark.sql.SparkSession.builder.getOrCreate() as spark:
df = spark.read.parquet(infile)
g = df.groupBy("SubjectKey")
g.count().sort("SubjectKey").show()
df.printSchema()
print(f"Number of rows: {df.count()}")
Configuration Options
Default Options
Every Spark job uses executors to perform tasks. In Vulcan, the default behavior is that executors are allocated dynamically based on the number of tasks required. For that reason, users should not specify the number of executors required for their application.
Vulcan has 384 CPU cores and 1.88 TB of memory reserved exclusively for Spark jobs. By default, each executor will use 4 CPU cores and 18 GB of memory. At these default values, Vulcan can accommodate a maximum of 96 executors distributed among all running applications.
Additional Options
The basic syntax for submitting spark jobs is: spark-submit [options] python-file [app arguments]
.
Options entered on the command line take precedence over any default values.
Below are a few commonly used options.
--name
you can enter any name to help identify your jobs in the web interface--deploy-mode
the default is client; the other option is cluster--files
any files needed during program execution need to be specified with this option--driver-memory
the default value is 4.5 GB--executor-memory
the default value is 18 GB and should normally not be set by users
Additional Tips
Logging Level
For troubleshooting or debugging purposes, it can be useful to turn on verbose logging. The loglevel can be adjusted inside the python script once the spark context has been initialized.
import pyspark
# initialize spark session
with pyspark.sql.SparkSession.builder.getOrCreate() as spark:
# increase loglevel (the default is "WARN")
spark.sparkContext.setLogLevel("INFO")
Pandas API
Users familiar with Python Pandas may be interested in the recently released Pandas API for Apache Spark 3.2+. The following example demonstrates loading a dataframe using the Pandas API on Spark.
import pyspark
import pyspark.pandas as ps
infile = "/dataset/TransUnion_Data/2000/dgebooth_Collection*.pq"
with pyspark.sql.SparkSession.builder.getOrCreate() as spark:
spark_df = spark.read.parquet(infile)
# convert to pandas-on-spark dataframe
pandas_pdf_1 = spark_df.to_pandas_on_spark()
# OR read parquet file directly using pyspark.pandas
pandas_pdf_2 = ps.read_parquet(infile)