Python API and PySpark ====================== https://spark.apache.org/docs/2.4.8/api/python/index.html .. list-table:: Spark Deploy Modes :widths: 20 40 40 :header-rows: 1 * - Command - Client Mode - Cluster Mode * - pyspark - | useful for interactive sessions or | debugging small applications - N/A * - spark-submit - small applications with stdout - long running batch applications Interactive Jobs ---------------- Interactive sessions are primarily meant for prototyping and debugging applications. While this provides the familiar Python interface, is it not a very efficient use of resources. .. code-block:: bash module load python/booth/3.9/3.9.6 # specify IPython interface export PYSPARK_DRIVER_PYTHON=ipython3 # uncomment below to use Jupyter interface instead #export PYSPARK_DRIVER_PYTHON=jupyter #export PYSPARK_DRIVER_PYTHON_OPTS=notebook # launch interactive pyspark session pyspark .. code-block:: console Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.1.1 /_/ Using Python version 3.9.6 (default, Aug 2 2021 09:23:49) SparkSession available as 'spark'. In [1]: Batch Jobs ---------- The simplest way to submit a batch job is by using the :code:`spark-submit` command with no additional command line options. By default, this launches spark in `client` mode. Client mode is useful for prototyping applications and debugging. Additional options can be included when submitting your Spark application. The example below associates a name with the Spark app for help tracking applications. .. code-block:: bash # load python module module load python/booth/3.9/3.9.6 # client mode enables stdout spark-submit --name myFancyApp script.py The next example demonstrates a Spark application submitted in `cluster` mode. Cluster mode is the recommended way to submit mature applications or long-running applications. In cluster mode, the Spark driver is launched on a physically separate machine, (i.e. a worker node). For that reason, you must let Spark know if any additional files will be needed at runtime. The :code:`--files` options is used to upload a local data file so that it is available to the executors during runtime. .. code-block:: bash # load python module module load python/booth/3.9/3.9.6 # cluster mode disables stdout but allows long-running jobs to continue after logging off spark-submit --deploy-mode cluster --files myfile.dat script.py Because the Spark driver runs on a worker node, stdout is disabled in cluster mode. Any print statements or other output can be viewed in the logs by navigating to the web UI http://hdpmn01.chicagobooth.edu:8088. Once on the web UI, click on your application to find the logs. Examples -------- Example 1 !!!!!!!!! Apache Spark ships with a few example scripts that serve as useful demos. You can find the examples at the following path: ``${SPARK_HOME}/examples/src/main/python/``. .. code-block:: bash # load python module module load python/booth/3.9/3.9.6 # run the example script to estimate the value of pi spark-submit ${SPARK_HOME}/examples/src/main/python/pi.py Example 2 !!!!!!!!! The example python script below read a TransUnion parquet file and performs a few simple operations. .. code-block:: python import pyspark infile = "/dataset/TransUnion_Data/2000/dgebooth_Collection*.pq" with pyspark.sql.SparkSession.builder.getOrCreate() as spark: df = spark.read.parquet(infile) g = df.groupBy("SubjectKey") g.count().sort("SubjectKey").show() df.printSchema() print(f"Number of rows: {df.count()}") Configuration Options --------------------- Default Options !!!!!!!!!!!!!!! Every Spark job uses executors to perform tasks. In Vulcan, the default behavior is that executors are allocated dynamically based on the number of tasks required. For that reason, users should not specify the number of executors required for their application. Vulcan has 384 CPU cores and 1.88 TB of memory reserved exclusively for Spark jobs. By default, each executor will use 4 CPU cores and 18 GB of memory. At these default values, Vulcan can accommodate a maximum of 96 executors distributed among all running applications. Additional Options !!!!!!!!!!!!!!!!!! The basic syntax for submitting spark jobs is: :code:`spark-submit [options] python-file [app arguments]`. Options entered on the command line take precedence over any default values. Below are a few commonly used options. * :code:`--name` you can enter any name to help identify your jobs in the web interface * :code:`--deploy-mode` the default is `client`; the other option is `cluster` * :code:`--files` any files needed during program execution need to be specified with this option * :code:`--driver-memory` the default value is 4.5 GB * :code:`--executor-memory` the default value is 18 GB and should normally not be set by users Additional Tips --------------- Logging Level !!!!!!!!!!!!! For troubleshooting or debugging purposes, it can be useful to turn on verbose logging. The loglevel can be adjusted inside the python script once the spark context has been initialized. .. code-block:: python import pyspark # initialize spark session with pyspark.sql.SparkSession.builder.getOrCreate() as spark: # increase loglevel (the default is "WARN") spark.sparkContext.setLogLevel("INFO") Pandas API !!!!!!!!!! Users familiar with Python Pandas may be interested in the recently released `Pandas API`_ for Apache Spark 3.2+. The following example demonstrates loading a dataframe using the Pandas API on Spark. .. _Pandas API: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html .. code-block:: python import pyspark import pyspark.pandas as ps infile = "/dataset/TransUnion_Data/2000/dgebooth_Collection*.pq" with pyspark.sql.SparkSession.builder.getOrCreate() as spark: spark_df = spark.read.parquet(infile) # convert to pandas-on-spark dataframe pandas_pdf_1 = spark_df.to_pandas_on_spark() # OR read parquet file directly using pyspark.pandas pandas_pdf_2 = ps.read_parquet(infile)