Skip to content

mtumilowicz/big-data-scala-spark-batch-workshop

Folders and files

NameName
Last commit message
Last commit date

Latest commit

89a87c4 · May 27, 2024

History

69 Commits
Feb 10, 2021
Dec 28, 2020
Jan 1, 2021
Dec 31, 2020
Dec 31, 2020
Dec 31, 2020
Dec 31, 2020
Dec 28, 2020
Jan 1, 2021
Jan 1, 2021
May 27, 2024
Dec 28, 2020

Repository files navigation

Build Status License: GPL v3

big-data-scala-spark-batch-workshop

preface

  • goals of this workshop:
    • introduction to spark and its architecture of spark
    • peek under the hood: data representation and optimizations
    • practicing rudimentary use-cases

spark

  • unified engine designed for large-scale distributed data processing
  • provides in-memory storage for intermediate computations
    • faster than Hadoop MapReduce
  • libraries
    • machine learning (MLlib)
    • SQL for interactive queries (Spark SQL)
    • stream processing (Structured Streaming) based on micro-batch processing engine
    • graph processing (GraphX)
  • typical Spark scenario
    1. Ingestion of raw data
    2. DQ: data quality
      • example: ensure that all birth dates are in the past
      • example: obfuscate Social Security numbers (SSNs)
    3. Transformation
      • example: join with other datasets, perform aggregations
    4. Publication
      • example: load the data into a data warehouse, save in a file on S3

components overview

  • components and architecture alt text
    • SparkSession
      • a single unified entry point to all of Spark’s functionality
      • use cases
        • defining DataFrames and Datasets
        • reading from data sources
        • writing to data lakes
        • accessing catalog metadata
        • issuing Spark SQL queries
      • there is a unique SparkSession for your application, no matter how many nodes it runs
    • Spark driver
      • process running the main() function of the application
      • instantiates SparkSession
      • communicates with the cluster manager
      • requests resources (CPU, memory, etc.) from the cluster manager for Spark’s executors (JVMs)
      • plans and coordinates the execution: transforms operations into directed acyclic graph (DAG) and schedules them
        • keeps track of available resources to execute tasks
        • schedules tasks to run "close" to the data where possible (data locality)
      • distributes operations execution across the Spark executors
      • once the resources are allocated, it communicates directly with the executors
      • resides on master node
    • Spark master & Cluster manager
      • Cluster Manager can be separate from the Master process
      • Spark master requests resources and makes resources available to the Driver
        • monitors the status and health of resources
        • is not involved in the execution of the application and the coordination of its tasks and stages
      • Cluster manager is a process responsible for monitoring the Worker nodes and reserving resources on these nodes upon request by the Master
        • Master then makes these cluster resources available to the Driver in the form of Executors
    • Worker node
      • any node that can run application code in the cluster
      • may not share filesystems with one another
    • Spark executor
      • is a process that executes tasks on the workers often in parallel
      • communicates with the driver program
      • workers hold many executors, for many application
    • Job
      • is a sequence of Stages, triggered by an action (ex. .count())
    • Stage
      • a sequence of Tasks that can all be run in parallel without a shuffle
      • example: .read.map.filter
    • Task
      • unit of work that will be sent to one executor
      • is a single operation (ex. .map or .filter) applied to a single Partition
      • is executed as a single thread in an Executor
      • example: 2 Partitions, operation: filter() => 2 Tasks, one for each Partition
    • Shuffle
      • operation where data is re-partitioned across a cluster
        • by having all the data needed to calculate on a single node, we reduce the overhead on the shuffle (the need for serialization and network traffic)
      • costly operation - a lot of data travels via the network
      • example: join
    • Partition
      • enable parallelization: data is split into Partitions so that each Executor can operate on a single part
        • once the user has submitted his job into the cluster, each partition is sent to a specific executor for further processing
        • the more partitions the more work is distributed to executors, with a smaller number of partitions the work will be done in larger pieces
      • every machine in a spark cluster contains one or more partitions
        • single partition do not span multiple machines
      • good starting point: number of partitions equal to the number of cores
        • example
          • 4 cores and 5 partitions
          • processing of each partition takes 5 minutes
          • total time: 10 minutes (4 in parallel in 5 minutes, then 1 in 5 minutes)
    • deployment on Kubernetes
      • Spark driver: Runs in a Kubernetes pod
      • Spark executor: Each worker runs within its own pod
      • Cluster manager: Kubernetes Master

data representation

  • RDD (Resilient Distributed Datasets)
    • fundamental data structure of Spark
    • immutable distributed collection of data
      • data itself is in partitions
  • Dataset and DataFrame
    • take on two characteristics: DataFrame - untyped and Dataset - typed
    • Dataset is a collection of strongly typed JVM objects
      • has also an untyped view called a DataFrame, which is a Dataset of Row
        • Row is a generic untyped JVM object that may hold different types of fields
          • get first column of given row: val name = row.getString(0)
    • DataFrames are similar to distributed in-memory tables with named columns with types (integer, string, array, map, etc), schemas
      • no primary or foreign keys or indexes in Spark
      • data can be nested, as in a JSON or XML document
        • it's often useful to flattening JSON to facilitate using sql syntax
          • flattening JSON = converting the structures into fields and exploding the arrays into distinct rows alt text alt text
          • example: perform aggregates (group by)
    • conversion: DataFrame -> Dataset
      val bloggersDS = spark
        .read
        .json("path")
        .load()
        .as[TargetClass]
      
  • schemas
    • defines the column names and associated data types for a DataFrame
    • inferring a schema (plus inferring data types)
      • requires separate job to read a large portion of file to infer the schema
      • can be expensive and time-consuming
      • no errors detection if data doesn’t match the schema
    • defining a schema
      • programmatically: val schema = StructType(Array(StructField("author", StringType, false)
      • DDL: val schema = "author STRING, title STRING, pages INT"

data import / export

  • typical use case: read from on-premise database and push to cloud storage (Amazon S3)
  • data source types
    • file (CSV, JSON, XML, Avro, Parquet, and ORC, etc)
    • relational and nonrelational database
    • other data provider: (REST) service, etc
  • DataFrameReader
    • core construct for reading data into a DataFrame
    • supports many formats such as JSON, CSV, Parquet, Text, Avro, ORC, etc.
  • DataFrameWriter
    • it saves data to a data source
    • after the files have been successfully exported, Spark will add a _SUCCESS file to the directory

file formats

  • problem with traditional file formats
    • big data files need to be splittable
      • JSON and XML are not easy to split
    • CSV cannot store hierarchical information (like JSON or XML)
    • none designed to incorporate metadata
    • quite heavy in size (especially JSON and XML)
  • big data formats: Avro, ORC, or Parquet
    • Avro
      • schema-based serialization format (binary data)
      • supports dynamic modification of the schema
      • row-based, so easier to split
    • ORC
      • columnar storage format
      • supports compression
    • Parquet
      • columnar storage format
      • supports compression
      • add columns at the end of the schema
      • Parquet metadata usually contains the schema
        • if the DataFrame is written as Parquet, the schema is preserved as part of the Parquet metadata
          • subsequent reads do not require you to supply a schema
      • default and preferred data source for Spark
      • files are stored in a directory structure: data files, metadata, a number of compressed files, and some status files

data transformation

  • operations can be classified into two types
    • actions
      • example: count(), save()
      • triggers evaluation
    • transformations
      • DataFrame -> DataFrame

      • example: select(), filter()

      • evaluated lazily

        • results are recorded as a lineage
          • allows to optimize queries (rearrange certain transformations, coalesce them, etc.)
          • provides fault tolerance: reproduce its state by replaying the lineage
      • two types

        alt text

aggregations

  • DataFrame API
    Dataset<Row> apiDf = df
        .groupBy(col("firstName"), col("lastName"), col("state"))
        .agg(
            sum("quantity"),
            sum("revenue"),
            avg("revenue"));
    
    is equivalent to
    df.createOrReplaceTempView("orders");
    
    String sqlStatement = "SELECT " +
        " firstName, " +
        " lastName, " +
        " state, " +
        " SUM(quantity), " +
        " SUM(revenue), " +
        " AVG(revenue) " +
        " FROM orders " +
        " GROUP BY firstName, lastName, state";
    
    Dataset<Row> sqlDf = spark.sql(sqlStatement);
    

joins

  • Broadcast Hash Join (map-side-only join)
    • used when joining small with large
      • small = fitting in the driver’s and executor’s memory
    • smaller data set is broadcasted by the driver to all Spark executors
    • by default if the smaller data set is less than 10 MB
    • does not involve any shuffle of the data set
  • Shuffle Sort Merge Join
    • used when joining two large data sets
    • default join algorithm
    • pre-requirement: partitions have to be co-located
      • all rows having the same value for the join key should be stored in the same partition
      • otherwise, there will be shuffle operations to co-locate the data
    • has two phases
      • sort phase
        • sorts each data set by its desired join key
      • merge phase
        • iterates over each key in the row from each data set and merges the rows if the two keys match

sql

  • tables
    ids.write
        .option("path", "/tmp/five_ids")
        .saveAsTable("five_ids")
    
    • each table is associated with its relevant metadata (the schema, partitions, physical location where the actual data resides, etc.)
      • metadata is stored in a central metastore
        • by default: Apache Hive metastore
          • Catalog is the interface for managing a metastore
            • spark.catalog.listDatabases()
            • spark.catalog.listTables()
            • spark.catalog.listColumns("us_delay_flights_tbl")
          • location: /user/hive/warehouse
    • two types of tables
      • managed
        • Spark manages metadata and the data
        • example: local filesystem, HDFS, Amazon S3
        • note that SQL command such as DROP TABLE deletes both the metadata and the data
          • with an unmanaged table, the same command will delete only the metadata
      • unmanaged
        • Spark only manages the metadata
        • example: Cassandra
  • views
    • vs table: views don’t actually hold the data
      • tables persist after application terminates, but views disappear
    • to enable a table-like SQL usage in Spark - create a view
      df.createOrReplaceTempView("geodata");
      
      Dataset<Row> smallCountries = spark.sql("SELECT * FROM ...");
      

optimizations

  • at the core of the Spark SQL engine are the Catalyst optimizer and Project Tungsten
  • note that a lot of the issues can come from key skewing: the data is so fragmented among partitions that a join operation becomes very long

tungsten

  • focuses on enhancing three key areas
    • memory management and binary processing
      • manage memory explicitly
        • off-heap binary memory management
      • eliminate the overhead of JVM object model and garbage collection
        • Java objects have large overheads — header info, hashcode, Unicode info, etc.
        • instead use binary in-memory data representation aka Tungsten row format
    • cache-aware computation
      • algorithms and data structures to exploit memory hierarchy (L1, L2, L3)
      • cache-aware computations with cache-aware layout for high cache hit rates
    • code generation
      • exploit modern compilers and CPUs
      • generates JVM bytecode to access Tungsten-managed memory structures that gives a very fast access
      • uses the Janino compiler - super-small, super-fast Java compiler

catalyst

  • similar concept to RDBMS query optimizer
  • converts computational query and converts it into an execution plan alt text
  • Phase 1: Analysis
    • Spark SQL engine generates AST tree for the SQL or DataFrame query
  • Phase 2: Logical optimization
    • catalyst constructs a set of multiple plans and using its cost-based optimizer assign costs to each plan
  • Phase 3: Physical planning
    • Spark SQL generates an optimal physical plan for the selected logical plan
  • Phase 4: Code generation
    • generating efficient Java bytecode to run on each machine

caching

  • Spark maintains a history of all transformations you may apply to a DataFrame or RDD
    • this enables Spark to be fault tolerant
    • however Spark programs take a huge performance hit when fault occurs as the entire set of transformations to RDD have to be recomputed
  • if you reuse a dataframe for different analyses, it is a good idea to cache it
    • steps are executed each time you run an analytics pipeline
    • example: DataFrames commonly used during iterative machine learning training
  • caching vs persistence
    • persist() provides more control over how and where data is stored
      • DISK_ONLY, OFF_HEAP, etc.
    • both are lazy transformations
      • immediately after calling the function nothing happens with the data but the query plan is updated by the Cache Manager by adding a new operator — InMemoryRelation
  • caching vs checkpointing
    • whatever is the case of failure, re-calculating the lost partitions is the most expensive operation
      • best strategy is to start from some checkpoint in case of failure
    • checkpoint() method will truncate the logical plan (DAG) and save the content of the dataframe to disk
      • re-computations need not be done all the way from beginning, instead the checkpoint is used as the beginning of re-calculation
    • cache will be cleaned when the session ends
      • checkpoints will stay on disk
    • example where checkpointing would be preferred over caching
      • DataFrame of taxes for a previous year - they are unlikely to change once calculated so it would be much better to checkpoint and save them forever so that they can be consistently reused in the future

user-defined functions

val cubed = (s: Long) => { s * s * s } // define function

spark.udf.register("cubed", cubed) // register UDF

spark.sql("SELECT id, cubed(id) AS id_cubed FROM ...") // use
  • excellent choice for performing data quality rules
  • UDF’s internals are not visible to Catalyst
    • UDF is treated as a black box for the optimizer
    • Spark won’t be able to analyze the context where the UDF is called
      • if you make dataframe API calls before or after, Catalyst can’t optimize the full transformation
      • should be at the beginning or the end of transformations