Apache Mahout

Материал из Национальной библиотеки им. Н. Э. Баумана
Последнее изменение этой страницы: 14:45, 26 февраля 2019.
Apache Mahout
Sdfsdfsf.png
Разработчики: Apache Software Foundation
Выпущена: 2013; 8 years ago (2013)
Постоянный выпуск: 0.13.0[Источник 1] / April 2017; 3 years ago (2017-04)
Состояние разработки: Активный
Операционная система: cross-platform
Тип ПО: Machine learning
Лицензия: Apache License 2.0
Веб-сайт mahout.apache.org

Apache Mahout (TM) - это распределенная структура линейной алгебры и математически выразительный Scala DSL, предназначенная для того, чтобы математики, статистики и ученые данных могли быстро реализовать свои собственные алгоритмы. Apache Spark - это рекомендованный распределенный серверный модуль, который можно распространить на другие распределенные серверные части.

  • Математически выразительная Scala DSL
  • Поддержка нескольких распределенных бэкэндов (включая Apache Spark)
  • Модульные собственные решения для ускорения CPU / GPU / CUDA[Источник 2]

Основные алгоритмы Mahout рассчитаны на кластерный анализ, классификация и совместная фильтрация на основе пакетов реализованы поверх Apache Hadoop с использованием парадигмы map / reduce. Также приветствуются вклады, которые выполняются на одном узле или в кластере, отличном от Hadoop.

Начиная с версии 0.10.0, проект переключает свое внимание на создание независимой от программного обеспечения среды программирования под названием «Самсара». Среда состоит из алгебраического, независимого от программного обеспечения оптимизатора и алгебраического Scala DSL, объединяющего находящихся в памяти и распределенных алгебраических операторов. Ранее также поддерживались алгебраические платформы Apache Spark, H2O и Apache Flink. Поддержка алгоритмов MapReduce постепенно прекращается.[Источник 3]

Особенности

Алгоритмы Mahout написаны поверх Hadoop, поэтому он хорошо работает в распределенной среде. Использует библиотеку Apache Hadoop, Mahout способен эффективно выполнять масштабирование в облаке, а также предлагает инженеру готовую к использованию инфраструктуру для выполнения задач интеллектуального анализа данных на больших объемах данных. Mahout позволяет приложениям быстро и эффективно анализировать большие наборы данных и включает несколько реализаций кластеризации с поддержкой MapReduce, таких как k-средства, нечеткие k-средства, Canopy, Dirichlet и Mean-Shift. Поставляется с возможностями распределенной функциональности для эволюционного программирования, а также включает матричные и векторные библиотеки. Mahout имеет ряд «алгоритмов» распределенной линейной алгебры, которые в сочетании с математически выразительным DSL R-Like Scala позволяют пользователям быстро «раскручивать свои» распределенные алгоритмы. Архитектура системы представлена на рисунке 1:

Рисунок 1 - Трёхуровневая архитектура Mahout

Клиенты

Mahout используют такие компании, как Adobe, Facebook, LinkedIn, Foursquare, Twitter и Yahoo.

  • Foursquare использует механизм рекомендаций Mahout для определения адресов.
  • Twitter использует Mahout для моделирования интересов пользователей.
  • Yahoo! использует Mahout для разработки шаблонов.

QR-разложение

Mahout имеет распределенную реализацию QR-разложения для высоких тонких матриц. Метод scala dqrThin (...) можно легко вызвать в любом приложении Spark или H2O, созданном с помощью библиотеки math-scala и соответствующего модуля Spark или H2O, следующим образом:

import org.apache.mahout.math._
import decompositions._
import drm._

val(drmQ, inCoreR) = dqrThin(drma)

Stochastic PCA

Mahout имеет распределенную реализацию Stochastic PCA. Этот алгоритм вычисляет точный эквивалент Mahout's dssvd (\ (\ mathbf {A-1 \ mu ^ \ top} \)), модифицируя алгоритм dssvd, чтобы избежать формирования \ (\ mathbf {A-1 \ mu ^ \ top } \), что увеличило бы разреженный ввод. Таким образом, он подходит для работы как с плотными, так и с разреженными входами. Метод scala dspca (...) можно легко вызвать в любом приложении Spark, Flink или H2O, созданном с помощью библиотеки math-scala и соответствующего модуля Spark, Flink или H2O, следующим образом:

import org.apache.mahout.math._
import decompositions._
import drm._

val (drmU, drmV, s) = dspca(drmA, k=200, q=1)

Обратите внимание, что параметр является необязательным, а его значение по умолчанию равно нулю.

Stochastic Singular Value Decomposition

Mahout имеет распределенную реализацию стохастического разложения сингулярных значений с использованием стратегии распараллеливания, всесторонне определенной в диссертации Натана Халко «Рандомизированные методы вычисления аппроксимаций матриц низкого ранга».Метод scala dssvd (...) можно легко вызвать в любом приложении Spark или H2O, созданном с помощью библиотеки math-scala и соответствующего модуля Spark или H2O, следующим образом:

import org.apache.mahout.math._
import decompositions._
import drm._

val(drmU, drmV, s) = dssvd(drma, k = 40, q = 1)

Препроцессоры

Препроцессор AsFactor используется для преобразования целочисленных значений столбцов в разреженные векторы, где значение равно 1 в индексе, который соответствует «категории» этого столбца. Это также известно как «One Hot Encoding» во многих других пакетах.

//Пример 
import org.apache.mahout.math.algorithms.preprocessing.AsFactor

val A = drmParallelize(dense(
      (3, 2, 1, 2),
      (0, 0, 0, 0),
      (1, 1, 1, 1)), numPartitions = 2)

// 0 -> 2, 3 -> 5, 6 -> 9
val factorizer: AsFactorModel = new AsFactor().fit(A)

val factoredA = factorizer.transform(A)

MeanCenter центрирует значения относительно среднего значения столбца.

//Пример 
import org.apache.mahout.math.algorithms.preprocessing.MeanCenter

val A = drmParallelize(dense(
      (1, 1, -2),
      (2, 5, 2),
      (3, 9, 0)), numPartitions = 2)

val scaler: MeanCenterModel = new MeanCenter().fit(A)

val centeredA = scaler.transform(A)

StandardScaler центрирует значения каждого столбца по их среднему значению и масштабирует их до единичной дисперсии.

//Пример 
import org.apache.mahout.math.algorithms.preprocessing.StandardScaler

val A = drmParallelize(dense(
      (1, 1, 5),
      (2, 5, -15),
      (3, 9, -2)), numPartitions = 2)

val scaler: StandardScalerModel = new StandardScaler().fit(A)

val scaledA = scaler.transform(A)

Алгоритмы регрессий

Последовательная корреляция слагаемых ошибок может привести к смещенным оценкам параметров регрессии, предусмотрены следующие корректирующие процедуры:

Процедура Кокрейна-Оркутта используется в экономике для корректировки линейной модели для последовательной корреляции в терминах ошибки.

//Пример 
val alsmBlaisdellCo = drmParallelize( dense(
      (20.96,  127.3),
      (21.40,  130.0),
      (21.96,  132.7),
      (21.52,  129.4),
      (22.39,  135.0),
      (22.76,  137.1),
      (23.48,  141.2),
      (23.66,  142.8),
      (24.10,  145.5),
      (24.01,  145.3),
      (24.54,  148.3),
      (24.30,  146.4),
      (25.00,  150.2),
      (25.64,  153.1),
      (26.36,  157.3),
      (26.98,  160.7),
      (27.52,  164.2),
      (27.78,  165.6),
      (28.24,  168.7),
      (28.78,  171.7) ))

val drmY = alsmBlaisdellCo(::, 0 until 1)
val drmX = alsmBlaisdellCo(::, 1 until 2)

var coModel = new CochraneOrcutt[Int]().fit(drmX, drmY , ('iterations -> 2))

println(coModel.rhos)
println(coModel.summary)

Тест Дурбина-Ватсона - это тест на последовательную корреляцию в терминах ошибок. Тестовая статистика Durbin Watson d может принимать значения от 0 до 4, и в целом

  • d <1,5

подразумевает положительную автокорреляцию

  • d> 2,5

подразумевает отрицательную автокорреляцию

  • 1,5 <d <2,5

подразумевает автокорреляцию.

//Пример 
// Тест Дурбина-Уотсона должен быть выполнен на модели. Модель не имеет значения.
val drmX = drmParallelize( dense((0 until 50).toArray.map( t => Math.pow(-1.0, t)) ) ).t
val drmY = drmX + err1 + 1
var model = new OrdinaryLeastSquares[Int]().fit(drmX, drmY)
// конец произвольной модели

val err1 =  drmParallelize( dense((0.0 until 5.0 by 0.1).toArray) ).t
val syntheticResiduals = err1
model = AutocorrelationTests.DurbinWatson(model, syntheticResiduals)
val myAnswer: Double = model.testResults.getOrElse('durbinWatsonTestStatistic, -1.0).asInstanceOf[Double]

Алгоритмы кластеринга

Canopy clustering - очень простой, быстрый и удивительно точный метод группировки объектов в кластеры. Все объекты представлены в виде точек в многомерном пространстве признаков. Алгоритм использует быструю приблизительную метрику расстояния и два порога расстояния T1> T2 для обработки. Основной алгоритм состоит в том, чтобы начать с набора точек и удалить их наугад. Создайте Canopy, содержащий эту точку, и выполните итерацию по оставшейся части набора точек. В каждой точке, если ее расстояние от первой точки <T1, добавьте эту точку в кластер. Если, кроме того, расстояние <T2, то удалите точку из набора. Таким образом, точки, которые очень близки к оригиналу, позволят избежать дальнейшей обработки. Алгоритм зацикливается до тех пор, пока начальный набор не станет пустым, накапливая набор навесов, каждый из которых содержит одну или несколько точек. Заданная точка может встречаться в нескольких куполах.

Canopy clustering часто используется в качестве начального шага в более строгих методах кластеризации, таких как K-Means Clustering. Начав с начальной кластеризации, можно значительно сократить количество более дорогих измерений расстояний, игнорируя точки за пределами первоначальных куполов.

//Пример 
val drmA = drmParallelize(dense((1.0, 1.2, 1.3, 1.4), (1.1, 1.5, 2.5, 1.0), (6.0, 5.2, -5.2, 5.3), (7.0,6.0, 5.0, 5.0), (10.0, 1.0, 20.0, -10.0)))

import org.apache.mahout.math.algorithms.clustering.CanopyClustering

val model = new CanopyClustering().fit(drmA, 't1 -> 6.5, 't2 -> 5.5, 'distanceMeasure -> 'Chebyshev)
model.cluster(drmA).collect

Метрики расстояния

Можно получить доступ к удаленным метрикам напрямую, чтобы измерить расстояние между двумя произвольными векторами, или указать, какую метрику расстояния использовать в качестве части алгоритма. В последнем случае метрика расстояния называется символом, мы никогда не передаем метрики расстояния непосредственно в алгоритм. Этот выбор конструкции, отчасти, связан с сериализацией объекта и обеспечением максимально простой привязки двигателя. За кулисами единственное, что сериализуется и отправляется рабочим, - это число, которое указывает, какую удаленную метрику использовать. Это гораздо более абстрактно и проще для поддержки на бэкэнде, чем уверенность, что каждая функция может быть сериализована любым произвольным двигателем. Мы считаем, что с точки зрения пользователя, это может показаться странным, но не приводит к снижению удобства использования. Если пользователь хочет использовать собственную метрику расстояния, просто добавьте ее в math-scala / src / main / org / apache / mahout / math / common / DistanceMetrics.scala и перекомпилируйте.

//Пример 1 
//Расстояние между двумя произвольными векторами

import org.apache.mahout.math.algorithms.common.distance._

val v1 = dvec(1.0, 1.5, -1.2, 3.5)
val v2 = dvec(0.1, -1.4, 10.5, 3.2)

Cosine.distance(v1, v2)
//Пример 2 
//Использование расстояний в кластеринге

import org.apache.mahout.math.algorithms.clustering.CanopyClustering

val drmA = drmParallelize(dense((1.0, 1.2, 1.3, 1.4), 
                                (1.1, 1.5, 2.5, 1.0), 
                                (6.0, 5.2, -5.2, 5.3), 
                                (7.0,6.0, 5.0, 5.0), 
                                (10.0, 1.0, 20.0, -10.0)))
                                
val model = new CanopyClustering().fit(drmA, 'distanceMeasure -> 'Cosine)

Начало работы с Apache Mahout

Установка будет производиться на чистую систему Ubuntu 18.04 LTS. Листинг команд используемых в видео доступен ниже. Пример установки:

Установка может быть проведена с помощью install.sh, содержащего следующие команды:

  • Mahout требует наличия Java, добавляем её репозиторий
$  sudo add-apt-repository ppa:webupd8team/java
  • Подгружаем список доступных пакетов
$  sudo apt-get update
  • Устанавливаем Java
$  sudo apt-get install oracle-java8-installer
  • Прописываем путь до директории содержащей бинарный файл bin/java
$  export JAVA_HOME=/usr
  • Скачиваем последнюю верстю Mahout из официального репозитория
$  wget http://archive.apache.org/dist/mahout/0.13.0/apache-mahout-distribution-0.13.0.tar.gz
  • Распаковываем архив
$  tar xvfz apache-mahout-distribution-0.13.0.tar.gz
  • Прописываем в переменные окружения местонахождение Mahout и флаг для локального использования
$  export MAHOUT_HOME=/root/apache-mahout-distribution-0.13.0/
$  export MAHOUT_LOCAL=true


  • Скачиваем последнюю версию Hadoop из официального репозитория, поверх него работает Mahout
$  wget http://apache-mirror.rbc.ru/pub/apache/hadoop/common/hadoop-3.1.2/hadoop-3.1.2.tar.gz
  • Распаковываем архив
$  tar xvfz hadoop-3.1.2.tar.gz
  • Прописываем в переменные окружения местонахождения корневой папки и конфигов
$  export HADOOP_HOME=/root/hadoop-3.1.2
$  export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

Далее создаем табличный файл и тестируем установку.

Cборка приложения

Чтобы собрать и запустить CooccurrenceDriver, вам необходимо установить следующее[Источник 4]:

Установите Java 7 JDK из Oracle. Пользователи Mac смотрят здесь: Java SE Development Kit 7u72. Установите sbt (простой инструмент для сборки) 0.13.x для Mac, Linux или вручную. Установите Spark 1.1.1. Не забудьте настроить SPARK_HOME Установите Mahout 0.13.0. Не забудьте настроить MAHOUT_HOME и MAHOUT_LOCAL Определенные двоичные файлы и сценарии требуются библиотекам для получения информации об окружающей среде, например, при обнаружении расположения jar-файлов.

Spark требует набор jar-файлов на пути к классам для клиентской части приложения, а другой набор jar-файлов должен быть передан в Spark Context для запуска распределенного кода. В примере должны быть автоматически обнаружены все необходимые классы.

Использование Mahout в качестве библиотеки в приложении потребует немного кода Scala. У Scala есть свойство App, поэтому мы создадим объект, который наследуется от App.

object CooccurrenceDriver extends App {
}

Это будет выглядеть немного иначе, чем в Java, так как App выполняет отложенную инициализацию, что приводит к выполнению тела при запуске приложения, так же как в Java вы создадите основной метод.

Прежде чем мы сможем выполнить что-то в Spark, нам нужно создать контекст. Мы могли бы использовать необработанные вызовы Spark, но значения по умолчанию устанавливаются для контекста Mahout с помощью вспомогательной функции Mahout.

implicit val mc = mahoutSparkContext(masterUrl = "local", 
  appName = "CooccurrenceDriver")

Нам нужно прочитать три файла, содержащих разные типы взаимодействия. Каждый файл будет прочитан в Mahed IndexedDataset. Это позволяет нам сохранять специфичные для приложения идентификаторы пользователей и элементов в течение всех вычислений.

Например, вот data / purchase.csv:

u1,iphone
u1,ipad
u2,nexus
u2,galaxy
u3,surface
u4,iphone
u4,galaxy

Mahout имеет вспомогательную функцию, которая читает текстовые файлы с разделителями SparkEngine.indexedDatasetDFSReadElements. Функция считывает одноэлементные кортежи (user-id, item-id) распределенным способом для создания IndexedDataset. Матрицы распределенных строк (DRM) и векторы являются важными типами данных, предоставляемыми Mahout, а IndexedDataset похож на очень легкий Dataframe в R, он упаковывает DRM с HashBiMaps для идентификаторов строк и столбцов.

В этом примере следует отметить одну важную вещь: мы читаем во всех наборах данных, прежде чем скорректировать количество строк в них, чтобы они соответствовали общему количеству пользователей в данных. Это так, что математика работает (A’A, A’B, A’C), даже если некоторые пользователи предприняли одно действие, но не другое, во всех матрицах должно быть одинаковое количество строк.

/**
 * Read files of element tuples and create IndexedDatasets one per action. These 
 * share a userID BiMap but have their own itemID BiMaps
 */
def readActions(actionInput: Array[(String, String)]): Array[(String, IndexedDataset)] = {
  var actions = Array[(String, IndexedDataset)]()

  val userDictionary: BiMap[String, Int] = HashBiMap.create()

  // The first action named in the sequence is the "primary" action and 
  // begins to fill up the user dictionary
  for ( actionDescription <- actionInput ) {// grab the path to actions
    val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
      actionDescription._2,
      schema = DefaultIndexedDatasetElementReadSchema,
      existingRowIDs = userDictionary)
    userDictionary.putAll(action.rowIDs)
    // put the name in the tuple with the indexedDataset
    actions = actions :+ (actionDescription._1, action) 
  }

  // After all actions are read in the userDictonary will contain every user seen, 
  // even if they may not have taken all actions . Now we adjust the row rank of 
  // all IndxedDataset's to have this number of rows
  // Note: this is very important or the cooccurrence calc may fail
  val numUsers = userDictionary.size() // one more than the cardinality

  val resizedNameActionPairs = actions.map { a =>
    //resize the matrix by, in effect by adding empty rows
    val resizedMatrix = a._2.create(a._2.matrix, userDictionary, a._2.columnIDs).newRowCardinality(numUsers)
    (a._1, resizedMatrix) // return the Tuple of (name, IndexedDataset)
  }
  resizedNameActionPairs // return the array of Tuples
}

Теперь, когда у нас есть считанные данные, мы можем выполнить расчет коэффициента совпадения.

// actions.map creates an array of just the IndeedDatasets
val indicatorMatrices = SimilarityAnalysis.cooccurrencesIDSs(
  actions.map(a => a._2))

Все, что нам нужно сделать сейчас, это написать индикаторы.

// zip a pair of arrays into an array of pairs, reattaching the action names
val indicatorDescriptions = actions.map(a => a._1).zip(indicatorMatrices)
writeIndicators(indicatorDescriptions)

Метод writeIndicators использует функцию записи по умолчанию dfsWrite.

/**
 * Write indicatorMatrices to the output dir in the default format
 * for indexing by a search engine.
 */
def writeIndicators( indicators: Array[(String, IndexedDataset)]) = {
  for (indicator <- indicators ) {
    // create a name based on the type of indicator
    val indicatorDir = OutputPath + indicator._1
    indicator._2.dfsWrite(
      indicatorDir,
      // Schema tells the writer to omit LLR strengths 
      // and format for search engine indexing
      IndexedDatasetWriteBooleanSchema) 
  }
}

Смотрите проект Github для полного источника. Теперь мы создаем build.sbt для построения примера.

name := "cooccurrence-driver"

organization := "com.finderbots"

version := "0.1"

scalaVersion := "2.10.4"

val sparkVersion = "1.1.1"

libraryDependencies ++= Seq(
  "log4j" % "log4j" % "1.2.17",
  // Mahout's Spark code
  "commons-io" % "commons-io" % "2.4",
  "org.apache.mahout" % "mahout-math-scala_2.10" % "0.10.0",
  "org.apache.mahout" % "mahout-spark_2.10" % "0.10.0",
  "org.apache.mahout" % "mahout-math" % "0.10.0",
  "org.apache.mahout" % "mahout-hdfs" % "0.10.0",
  // Google collections, AKA Guava
  "com.google.guava" % "guava" % "16.0")

resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"

resolvers += Resolver.mavenLocal

packSettings

packMain := Map(
  "cooc" -> "CooccurrenceDriver")

Сборка примеров из корневой папки проекта:

$ sbt pack

Это автоматически настроит некоторые сценарии запуска для драйвера. Чтобы запустить, выполните следующую команду:

$ target/pack/bin/cooc

Драйвер будет работать в автономном режиме Spark и помещать данные в / path / to / 3-input-cooc / data / Indicators / Indicator-type

Источники

  1. Downloads// mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/general/downloads/ (дата обращения: 03.02.2019).
  2. Apache Mahout Overview // mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/ (дата обращения: 24.12.2017)
  3. Deprecated MapReduce Algorithms // mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/docs/latest/algorithms/map-reduce/ (дата обращения: 24.12.2017)
  4. Mahout Samsara In Core// mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/docs/latest/tutorials/misc/how-to-build-an-app.html (дата обращения: 03.02.2019).