Apache Spark

Материал из Национальной библиотеки им. Н. Э. Баумана
Последнее изменение этой страницы: 18:19, 30 января 2019.
Apache Spark
Apache Spark logo.svg
Создатели: Матей Захария
Разработчики: Apache Software Foundation
Выпущена: 30 May 2014 года; 5 years ago (2014-05-30)
Постоянный выпуск: 2.4.0 / November 2018; 1 year ago (2018-11)
Состояние разработки: Active
Написана на: Scala, Java, Python и R
Операционная система: Microsoft Windows, Linux, macOS
Платформа: Java virtual machine
Размер дистрибутива: 217 MB
Лицензия: Apache License 2.0
Веб-сайт spark.apache.org

Apache Spark (от англ. spark — искра, вспышка) — фреймворк с открытым исходным кодом для реализации распределённой обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Apache Hadoop. В отличие от классического обработчика из ядра Apache Hadoop, реализующего двухуровневую концепцию MapReduce с дисковым хранилищем, Spark использует специализированные примитивы для рекуррентной обработки в оперативной памяти, благодаря чему позволяет получать значительный выигрыш в скорости работы для некоторых классов задач[5], в частности, возможность многократного доступа к загруженным в память пользовательским данным делает библиотеку привлекательной для алгоритмов машинного обучения.

Проект предоставляет программные интерфейсы для языков Java, Scala, Python, R. Изначально написан на Scala, впоследствии добавлена существенная часть кода на Java для предоставления возможности написания программ непосредственно на Java. Состоит из ядра и нескольких расширений, таких как Spark SQL (позволяет выполнять SQL-запросы над данными), Spark Streaming (надстройка для обработки потоковых данных), Spark MLlib (набор библиотек машинного обучения), GraphX (предназначено для распределённой обработки графов). Может работать как в среде кластера Apache Hadoop под управлением YARN, так и без компонентов ядра Apache Hadoop, поддерживает несколько распределённых систем хранения — HDFS, OpenStack Swift, NoSQL-СУБД Cassandra, Amazon S3.

Ключевой автор — румынско-канадский учёный в области информатики Матей Захария (англ. Matei Zaharia), начал работу над проектом в 2009 году, будучи аспирантом Университета Калифорнии в Беркли. В 2010 году проект опубликован под лицензией BSD, в 2013 году передан фонду Apache и переведён на лицензию Apache 2.0, в 2014 году принят в число проектов верхнего уровня Apache. [Источник 1].

Компоненты Spark

Рисунок 1 – Компоненты Spark Apache

  • SparkSQL

SparkSQL – это компонент Spark, поддерживающий запрашивание данных либо при помощи SQL, либо посредством Hive Query Language. Библиотека возникла как порт Apache Hive для работы поверх Spark (вместо MapReduce), а сейчас уже интегрирована со стеком Spark. Она не только обеспечивает поддержку различных источников данных, но и позволяет переплетать SQL-запросы с трансформациями кода; получается очень мощный инструмент.

  • Spark Streaming

Spark Streaming поддерживает обработку потоковых данных в реальном времени; такими данными могут быть файлы логов рабочего веб-сервера (напр. Apache Flume и HDFS/S3), информация из соцсетей, например, Twitter, а также различные очереди сообщений вроде Kafka. «Под капотом» Spark Streaming получает входные потоки данных и разбивает данные на пакеты. Далее они обрабатываются движком Spark, после чего генерируется конечный поток данных (также в пакетной форме).

  • MLlib

MLlib - это библиотека для машинного обучения, предоставляющая различные алгоритмы, разработанные для горизонтального масштабирования на кластере в целях классификации, регрессии, кластеризации, совместной фильтрации и т.д. Некоторые из этих алгоритмов работают и с потоковыми данными — например, линейная регрессия с использованием обычного метода наименьших квадратов или кластеризация по методу k-средних (список вскоре расширится). Apache Mahout (библиотека машинного обучения для Apache Hadoop) уже ушла от MapReduce, теперь ее разработка ведется совместно с Spark MLlib.

  • GraphX

GraphX - это библиотека для манипуляций над графами и выполнения с ними параллельных операций. Библиотека предоставляет универсальный инструмент для ETL, исследовательского анализа и итерационных вычислений на основе графов. Кроме встроенных операций для манипуляций над графами здесь также предоставляется библиотека обычных алгоритмов для работы с графами, например, PageRank.


Начало работы со Spark

Лучше всего устанавливать дистрибутив на Unix подобную систему,однако здесь будет продемонстрирована установка на Windows 10.

В приведенном выше видео рассказывается об установке Spark на Windows, следуйте приведенным инструкциям ниже.

  • Скачать и установить Gnu на Windows (GOW) можно по следующей ссылке . По сути, GOW позволяет вам использовать команды Linux на Windows. В этой установке нам понадобятся curl, gzip, tar, которые предоставляет GOW.
  • Загрузите и установите Anaconda для Windows.
  • Откройте новую командную строку (CMD).
  • Зайдите на сайт Apache Spark.
    • Выберите релиз Spark.
    • Выберите тип упаковки.
    • Выберите тип загрузки: (Прямая загрузка).
    • Скачать Spark. Помните, что если вы загружаете более новую версию, вам нужно будет изменить оставшиеся команды для загруженного файла.
  • Переместите файл туда, куда вы хотите распаковать его.
  • Распакуйте файл.Используйте жирные команды ниже
$  gzip -d spark-2.4.0-bin-hadoop2.7.tgz
$  tar xvf spark-2.4.0-bin-hadoop2.7.tar
  • Загрузите файл winutils.exe в вашу папку spark-2.4.0-bin-hadoop2.7\bin.После, пропишите следующую команду в CMD
$  curl -k -L -o winutils.exe 
  • Убедитесь, что на вашем компьютере установлена Java 7+.
  • Далее мы отредактируем наши переменные окружения, чтобы мы могли открыть блокнот spark в любом каталоге.
$  setx SPARK_HOME C: \ opt \ spark \ spark-2.4.0-bin-hadoop2.7
$  setx HADOOP_HOME C: \ opt \ spark \ spark-2.4.0-bin-hadoop2.7
$  setx PYSPARK_DRIVER_PYTHON jupyter
$  setx PYSPARK_DRIVER_PYTHON_OPTS notebook
  • Закройте свой терминал и откройте новый. Введите команду ниже.
$  pyspark --master local[2]

Параметр --master указывает главный URL-адрес для распределенного кластера,для локального запуска с одним потоком или [N] для локального запуска с N потоками. Вы должны начать с использования локального с одним потоком для тестирования.

  • Если вас после ввода данной команды перекидывает в браузер,то установка прошла успешно,значит пора переходить к работе с Apache Spark.

Для этого просто введите в командной строке:

$  spark-shell

В ответ вы должны получить:

 Launch...                ########################################### [100%]
 welcome to spark version 2.4.0

Версия у вас может быть другая,в зависимости от выбранного релиза Spark на 4-м шаге.

Работа с данными в Spark

Для демонстрации работы с данными будет использоваться набор данных The Apache Software Foundation с сайта https://github.com (скачать можно по следующей ссылке).[Источник 2].

Введение в структуры данных Spark

В Spark существуют следующие абстракции для работы с данными:

  • RDD (Resilient Distributed Dataset) - основная абстракция для работы с данными, является низкоуровневой.
  • DataFrame - обертка над RDD, используется в Python и R.
  • DataSet - еще одна обертка над RDD. Очень схожа с DataFrame, отличие только в том, что DataSet является строго-типизированной оберткой - типы проверяются при компиляции. Используется в Java и в Scala.

Основное отличие RDD от DataFrame (и DataSet) заключается в том, что если вы применяете цепочку преобразований к RDD (состоящую из преобразований, типа filter, map, reduce), то Spark выполняет эти преобразования ровно в той последовательности, в которой они были указаны в коде. DataFrame и DataSet работают иначе - до применения преобразований срабатывает оптимизатор, который расcчитывает наиболее оптимальный способ получения результата. Поэтому предпочтительнее использовать DataFrame (или DataSet).[Источник 3].

Работа с DataFrame (DataSet) происходит преимущественно с помощью SQL-интерфейса (ограниченный набор операций типа select, where, join и пр.), а в работе с RDD мы никак не ограничены - можем писать пользовательские функции для преобразований типа map и reduce. Могут возникнуть ситуации, когда работать с RDD будет предпочтительнее, чем с DataFrame (DataSet), например, если требуемое преобразование тяжело описать на языке SQL.

Инициализация Spark

Работа со Spark в языке Java проводится через объект класса SparkSession.

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("russian-troll-tweets") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

При инициализации сессии Spark указываем следующие опции:

  • .master("local[*]") - указываем, что Spark поднимается на локальной машине и задействует все доступные ядра (.master("local[N]") задействовал бы N ядер);
  • .appName("russian-troll-tweets") - указываем имя приложения - при поднятии локального Spark не является необходимым, но в случае, когда Spark развернут на кластере и имеется много пользователей, поле appName необходимо, чтобы различать приложения, которые используют Spark.
  • .config("some.config.param", "value") - позволяет выставить дополнительные опции (см. все опции в документации), используя метод .config несколько раз подряд.

Пример 1: чтение и вывод данных

Считаем .json - файл в датафрейм.

# Считываем .json-файл в датафрейм 
# и выводим информацию из этого файла

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

//вывод информации на экран пользователя:
df.show();

Output:

 +----+-------+
 | age|   name|
 +----+-------+
 |null|Michael|
 |  30|   Andy|
 |  19| Justin|
 +----+-------+

Теперь поэкспериментируем с выводом разных значений столбцов

import static org.apache.spark.sql.functions.col;
//выбираем только имена из списка
df.select("name").show();

Output:

 +-------+
 |   name|
 +-------+
 |Michael|
 |   Andy|
 | Justin|
 +-------+
import static org.apache.spark.sql.functions.col;
//выбираем людей,которые старше 20
df.filter(col("age").gt(21)).show();

Output:

 +---+----+
 |age|name|
 +---+----+
 | 30|Andy|
 +---+----+

Код ниже у нас же наоборот отвечает за создание полей и заполнение их значениями.

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Создаем экземпляр класса человек.
Person person = new Person();
person.setName("Andy");
person.setAge(32);

Пример 2: JOIN двух датафреймов

Пусть стоит задача: определить пользователей с наибольшим количеством твитов (вывести их данные и количество твитов). Из датафрейма df_tweets мы можем узнать только user_id пользователей с наибольшим количеством твитов:

# Ищем user_id пользователей с наибольшим количеством твитов
# Для этого группируем все записи в датафрейме df_tweets по полю user_id и считаем сколько записец попало в каждую группу
# (и дополнительно сортируем по убыванию количества записей в группе)
df_most_tweets = df_tweets.groupby([df_tweets.user_id]).count().sort("count", ascending=False)
df_most_tweets.show(10)

Output:

+----------+-----+
|   user_id|count|
+----------+-----+
|      null| 7410|
|2882013788| 5424|
|1679279490| 5024|
|1671234620| 4343|
|2671070290| 2943|
|1868496344| 2886|
|1658202894| 2886|
|1768259989| 2870|
|1684524144| 2840|
|1727482238| 2802|
+----------+-----+
only showing top 10 rows

В реальных данных часто бывает так, что часть данных может отсутствовать. В df_tweets для некоторых записей пропущено поле user_id, таких записей 7410, как видно из таблицы выше. Эти 7410 твитов наверняка принадлежат не одному пользователю, а нескольким, поэтому исключим из рассмотрения записи с user_id = null:

# Исключаем результат с user_id = null
df_most_tweets = df_most_tweets.filter(df_tweets.user_id.isNotNull())
df_most_tweets.show(10)

Output:

+----------+-----+
|   user_id|count|
+----------+-----+
|2882013788| 5424|
|1679279490| 5024|
|1671234620| 4343|
|2671070290| 2943|
|1868496344| 2886|
|1658202894| 2886|
|1768259989| 2870|
|1684524144| 2840|
|1727482238| 2802|
|1680366068| 2799|
+----------+-----+
only showing top 10 rows
# Производим JOIN датафреймов df_users и df_most_tweets по полю user_id.
# В получившемся датафрейме много колонок (так как в df_users их много) => выведем не все колонки.
# Последняя колонка (count) - количество твитов от конкретного пользователя в df_tweets.
df_users_with_most_tweets = df_users.join(df_most_tweets, df_users.id == df_most_tweets.user_id)
df_users_with_most_tweets.select(
    df_users_with_most_tweets.id,
    df_users_with_most_tweets.name,
    df_users_with_most_tweets.location,
    df_users_with_most_tweets.followers_count,
    df_users_with_most_tweets.friends_count,
    "count"
).show(5)

Output:

+----------+--------------+--------------+---------------+-------------+-----+
|        id|          name|      location|followers_count|friends_count|count|
+----------+--------------+--------------+---------------+-------------+-----+
|2882013788| Giselle Evans|Pittsburgh, US|          24344|        21953| 5424|
|1679279490|Amelie Baldwin|           USA|           2464|         1930| 5024|
|1671234620|         Susan|           USA|           2225|         2159| 4343|
|1658202894|  Laura Baeley| United States|           1073|         1160| 2886|
|1868496344|Briana Ragland|           USA|            768|          415| 2886|
+----------+--------------+--------------+---------------+-------------+-----+

Пример 3: MapReduce с помощью RDD

Пусть теперь стоит следующая задача: найти количество вхождений определенного слова во всем множестве твитов. Сначала преобразуем наш датафрейм с твитами в RDD:

# Преобразовываем датафрейм с твитами в RDD
# Данные остаются неизменными - меняется представление
rdd_tweets = df_tweets.rdd
print("Amount of tweets:", rdd_tweets.count())

Output:

Amount of tweets: 156412

Теперь напишем функцию, которая считает количество вхождений входного слова по всему множеству твитов (и входное слово, и текст твита приводятся к нижнему регистру). К RDD с твитами последовательно применяем два преобразования:

  1. Сначала каждый твит обрабатываем пользовательской функцией - приводим к нижнему регистру и считаем количество вхождений в него входного слова. Операция map вернет новый RDD, в котором будут содержаться посчитанные количества вхождения слова в каждый отдельный твит.
  2. Далее к новому RDD, содержащему числа, применяется операция reduce - все полученные числа аггрегируются с помощью пользовательской функции (обычное сложение).

Надо заметить, что с помощью SQL-интерфейса, который предоставляет DataFrame, данная задача решалась бы не так очевидно.

def count_word_occurance(word):
    word_in_lowercase = word.lower()
    count = rdd_tweets\
    .map(lambda x: x["text"].lower().count(word_in_lowercase) if x["text"] else 0)\
    .reduce(lambda x,y: x + y)
    print("%s occured %d times in all tweets." % (word, count))
    return count

count_word_occurance("Trump")
count_word_occurance("have")
count_word_occurance("Möglicherweise")

Output:

Trump occured 27996 times in all tweets.
have occured 7509 times in all tweets.
Möglicherweise occured 1 times in all tweets.

Работа с IPython-ноутбуком и демонстрация приведенных выше примеров приведена в следующем ролике:

Источники

  1. Introduction // Wikipedia. [2001-2019]. URL: https://ru.wikipedia.org/wiki/Apache_Spark (дата обращения: 10.12.2018)
  2. The Apache Software Foundation // Github. [2008-2019]. URL:https://github.com/apache/spark (дата обращения: 10.12.2018)
  3. A comparison between RDD, DataFrame and Dataset // Zenika. [2011-2019]. URL: https://medium.zenika.com/a-comparison-between-rdd-dataframe-and-dataset-in-spark-from-a-developers-point-of-view-a539b5acf734 (дата обращения: 10.12.2018)

Ссылки