Spark SQL — различия между версиями

Материал из Национальной библиотеки им. Н. Э. Баумана
Последнее изменение этой страницы: 01:26, 23 мая 2020.
(Начало работы со Spark SQL)
 
Строка 4: Строка 4:
 
| released              = {{Start date and age|2012|10|15|df=yes}}
 
| released              = {{Start date and age|2012|10|15|df=yes}}
 
| latest release version = v2.3.2
 
| latest release version = v2.3.2
| latest release date    = {{Start date and age|2018|09|24|df=yes}}  
+
| latest release date    = {{Start date and age|2020|02|08|df=yes}}  
 
| language count        = <!-- Number only -->
 
| language count        = <!-- Number only -->
 
| license                = Apache License, Version 2.0
 
| license                = Apache License, Version 2.0

Текущая версия на 01:26, 23 мая 2020

Spark SQL
Spark logo.png
Выпущена: 15 October 2012 года; 7 years ago (2012-10-15)
Постоянный выпуск: v2.3.2 / 8 February 2020 года; 3 months ago (2020-02-08)
Лицензия: Apache License, Version 2.0
Веб-сайт spark.apache.org/sql/
As of 2 October 2018 года; 19 months ago (2018-10-02)

Spark SQL – расширение Apache Spark для работы со структурированными данными.

В отличие от функционального API Spark RDD, Spark SQL позволяет использовать преимущества реляционной обработки данных (например, декларативные запросы и оптимизированное хранилище). Взаимодействие со Spark SQL происходит несколькими способами, в том числе с использованием SQL или Dataset API. Внутренний механизм выполнения вычислений не зависит от API/языка программирования. Таким образом, разработчики могут выбирать интерфейс, наиболее подходящий для естественного выражения преобразования. При запуске SQL из другого языка программирования результаты будут возвращены в виде Dataset/DataFrame. [Источник 1]

Основные понятия

Dataset

Dataset – это распределенный набор данных. Интерфейс Dataset добавлен в Spark 1.6 для обеспечения преимуществ RDD (таких как строгая типизация, возможность использовать лямбда-функции) совместно с преимуществами механизма выполнения Spark SQL. Dataset может быть построен из JVM-объектов и изменен с помощью функциональных преобразований (map, flatMap, filter и т. д.). Dataset API доступен в Scala и Java. В Python нет поддержки Dataset API, но из-за динамического харакатера Python, многие возможности Dataset API уже доступны (например, обращение к полю в строке по имени). Аналогично в случае языка R.

DataFrame

DataFrame – это Dataset, представленный именованными столбцами. DataFrame – это аналог таблицы в реляционной базе данных или DataFrame'а в Python/R, но с улучшенной оптимизаций для распределенных вычислений. DataFrame может быть создан из большого количества источников: csv-файлов, таблиц в Hive, внешних баз данных или существующих RDD. DataFrame доступен из Scala, Java, Python и R. В Scala API и Java API DataFrame представлен как Dataset[Row] и Dataset<Row> соответственно.

Архитектура

Spark SQL состоит из трех основных элементов:

Программный API. Spark поддерживает Scala, Java, Python и R. Spark SQL поддерживает синтаксис HiveQL, включая Hive SerDes и UDF, предоставляяя доступ к хранилищу Hive.[Источник 2]

SchemaRDD. Поскольку Spark SQL работает со схемами, таблицами и записями, можно использовать SchemaRDD в качестве временной таблицы.

Источники данных. Spark SQL в качестве источников данных использует JSON, Parquet, HIVE-таблицы или Cassandra. [Источник 3]

Ключевые особенности

Интеграция

Spark SQL позволяет бесшовно запрашивать структурированные данные из Spark-программ, используя SQL или обычный DataFrame API.

Универсальный доступ к данным

Работа со всеми истониками данных происходит единообразно. DataFrames и SQL предоставляют общий способ доступа к различным источникам данных (например Hive, Avro, Parquet, ORC, JSON и JDBC/ODBC. Можно смешивать данные, полученные из разных источников.

Работа со Spark SQL

Точка входа

Точкой входа для работы со Spark является класс SparkSession:

Scala

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

Java

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

Python

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

R

sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))

Источники данных

Spark SQL поддерживает работу с различными источниками данных посредством интерфейса DataFrame. DataFrame используют при реляционных преобразованиях, а также для создания временного представления. Регистрация DataFrame'а как временного представления позволяет применять к таким данным SQL-запросы.

По умолчанию источником данных является Parquet. [Источник 4]

Пример с JSON

Scala
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
Python
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
R
df <- read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame
showDF(df)
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Операции над данными

DataFrame предоставляет предметно-ориентированный язык для рабооты со структурированными данными.

Scala

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()

// Select only the "name" column
df.select("name").show()

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()


// Select people older than 21
df.filter($"age" > 21).show()


// Count people by age
df.groupBy("age").count().show()

Java

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();


// Select only the "name" column
df.select("name").show();

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();

// Select people older than 21
df.filter(col("age").gt(21)).show();

// Count people by age
df.groupBy("age").count().show();

Python

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

# Select only the "name" column
df.select("name").show()

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

# Select people older than 21
df.filter(df['age'] > 21).show()

# Count people by age
df.groupBy("age").count().show()

R

# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")


# Print the schema in a tree format
printSchema(df)

# Select only the "name" column
showDF(select(df, "name"))

# Select everybody, but increment the age by 1
showDF(select(df, df$name, df$age + 1))

# Select people older than 21
showDF(where(df, df$age > 21))

# Count people by age
showDF(count(groupBy(df, "age")))
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

SQL-запросы

Функция sql позволяет выполнять SQL-запросы из приложения. Существует возможность применять SQL-запросы напрямую к файлам.

Scala

// This import is needed to use the $-notation
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();

Python

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

R

df <- sql("SELECT * FROM table")

showDF(df)
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Начало работы со Spark SQL

Пакет Spark SQL в последних версиях Apache Spark входит в ядро, поэтому его установка сводится к установке Apache Spark. Установка будет производиться на чистую систему Ubuntu 18.04 LTS. Листинг команд используемых в видео доступен ниже. Пример установки:

Установка может быть проведена с помощью install.sh, содержащего следующие команды:

  • Hadoop требует наличия Java, добавляем её репозиторий
$  sudo add-apt-repository ppa:webupd8team/java
  • Подгружаем список доступных пакетов
$  sudo apt-get update
  • Устанавливаем Java
$  sudo apt-get install oracle-java8-installer
  • Прописываем путь до директории содержащей бинарный файл bin/java
$  export JAVA_HOME=/usr
  • Скачиваем последнюю версию пакета spark, содержащую модуль Spark SQL с официального репозитория разработчика
$  wget http://apache-mirror.rbc.ru/pub/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
  • Распаковываем архив
$  tar -xvzf spark-2.4.0-bin-hadoop2.7.tgz
  • Для проверки работоспособности устанавливаем удобный ЯП
$  sudo apt-get install python python3
  • Запускаем локальный интерактивный шелл python (по умолчанию запускается временный локальный кластер hadoop в один поток)
$  ./bin/pyspark
  • Пишем файл примера для загрузки в БД
import json
json_file = open('json.json', 'w')
json_file.write(json.dumps({"test_string": "hello", "test_int": 5, "test_float": 5.12345}))
json_file.close()
  • Выгружаем файл во временную таблицу temp_json_table
spark.read.json("json.json").registerTempTable("temp_json_table")
  • Проверяем, создалась ли таблица с помощью обычного SQL синтаксиса
spark.sql("SHOW TABLES").show()
  • Проверяем заполнение
spark.sql("SELECT * FROM temp_json_table").show()

Источники

  1. Spark SQL: Relational Data Processing in Spark. URL: https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf (дата обращения: 02.10.2018).
  2. Apache Spark [Электронный ресурс]: Spark SQL & DataFrames. URL: https://spark.apache.org/sql/ (дата обращения: 02.10.2018).
  3. Intellipaat [Электронный ресурс]: What is Spark SQL – Introduction to Spark SQL Architecture. URL: https://intellipaat.com/blog/what-is-spark-sql/ (дата обращения: 16.01.2019).
  4. Spark 2.3.2 Documentation [Электронный ресурс]: Spark SQL & DataFrames. URL: https://spark.apache.org/docs/latest/sql-programming-guide.html (дата обращения: 02.10.2018).