Apache Mahout
Последнее изменение этой страницы: 14:45, 26 февраля 2019.
![]() | |
Разработчики: | Apache Software Foundation |
---|---|
Выпущена: | 2013 |
Постоянный выпуск: | 0.13.0[Источник 1] / April 2017 |
Состояние разработки: | Активный |
Операционная система: | cross-platform |
Тип ПО: | Machine learning |
Лицензия: | Apache License 2.0 |
Веб-сайт |
mahout |
Apache Mahout (TM) - это распределенная структура линейной алгебры и математически выразительный Scala DSL, предназначенная для того, чтобы математики, статистики и ученые данных могли быстро реализовать свои собственные алгоритмы. Apache Spark - это рекомендованный распределенный серверный модуль, который можно распространить на другие распределенные серверные части.
- Математически выразительная Scala DSL
- Поддержка нескольких распределенных бэкэндов (включая Apache Spark)
- Модульные собственные решения для ускорения CPU / GPU / CUDA[Источник 2]
Основные алгоритмы Mahout рассчитаны на кластерный анализ, классификация и совместная фильтрация на основе пакетов реализованы поверх Apache Hadoop с использованием парадигмы map / reduce. Также приветствуются вклады, которые выполняются на одном узле или в кластере, отличном от Hadoop.
Начиная с версии 0.10.0, проект переключает свое внимание на создание независимой от программного обеспечения среды программирования под названием «Самсара». Среда состоит из алгебраического, независимого от программного обеспечения оптимизатора и алгебраического Scala DSL, объединяющего находящихся в памяти и распределенных алгебраических операторов. Ранее также поддерживались алгебраические платформы Apache Spark, H2O и Apache Flink. Поддержка алгоритмов MapReduce постепенно прекращается.[Источник 3]
Содержание
Особенности
Алгоритмы Mahout написаны поверх Hadoop, поэтому он хорошо работает в распределенной среде. Использует библиотеку Apache Hadoop, Mahout способен эффективно выполнять масштабирование в облаке, а также предлагает инженеру готовую к использованию инфраструктуру для выполнения задач интеллектуального анализа данных на больших объемах данных. Mahout позволяет приложениям быстро и эффективно анализировать большие наборы данных и включает несколько реализаций кластеризации с поддержкой MapReduce, таких как k-средства, нечеткие k-средства, Canopy, Dirichlet и Mean-Shift. Поставляется с возможностями распределенной функциональности для эволюционного программирования, а также включает матричные и векторные библиотеки. Mahout имеет ряд «алгоритмов» распределенной линейной алгебры, которые в сочетании с математически выразительным DSL R-Like Scala позволяют пользователям быстро «раскручивать свои» распределенные алгоритмы. Архитектура системы представлена на рисунке 1:
Клиенты
Mahout используют такие компании, как Adobe, Facebook, LinkedIn, Foursquare, Twitter и Yahoo.
- Foursquare использует механизм рекомендаций Mahout для определения адресов.
- Twitter использует Mahout для моделирования интересов пользователей.
- Yahoo! использует Mahout для разработки шаблонов.
QR-разложение
Mahout имеет распределенную реализацию QR-разложения для высоких тонких матриц. Метод scala dqrThin (...)
можно легко вызвать в любом приложении Spark или H2O, созданном с помощью библиотеки math-scala и соответствующего модуля Spark или H2O, следующим образом:
import org.apache.mahout.math._
import decompositions._
import drm._
val(drmQ, inCoreR) = dqrThin(drma)
Stochastic PCA
Mahout имеет распределенную реализацию Stochastic PCA. Этот алгоритм вычисляет точный эквивалент Mahout's dssvd (\ (\ mathbf {A-1 \ mu ^ \ top} \))
, модифицируя алгоритм dssvd, чтобы избежать формирования \ (\ mathbf {A-1 \ mu ^ \ top } \)
, что увеличило бы разреженный ввод. Таким образом, он подходит для работы как с плотными, так и с разреженными входами. Метод scala dspca (...)
можно легко вызвать в любом приложении Spark, Flink или H2O, созданном с помощью библиотеки math-scala и соответствующего модуля Spark, Flink или H2O, следующим образом:
import org.apache.mahout.math._
import decompositions._
import drm._
val (drmU, drmV, s) = dspca(drmA, k=200, q=1)
Обратите внимание, что параметр является необязательным, а его значение по умолчанию равно нулю.
Stochastic Singular Value Decomposition
Mahout имеет распределенную реализацию стохастического разложения сингулярных значений с использованием стратегии распараллеливания, всесторонне определенной в диссертации Натана Халко «Рандомизированные методы вычисления аппроксимаций матриц низкого ранга».Метод scala dssvd (...)
можно легко вызвать в любом приложении Spark или H2O, созданном с помощью библиотеки math-scala и соответствующего модуля Spark или H2O, следующим образом:
import org.apache.mahout.math._
import decompositions._
import drm._
val(drmU, drmV, s) = dssvd(drma, k = 40, q = 1)
Препроцессоры
Препроцессор AsFactor используется для преобразования целочисленных значений столбцов в разреженные векторы, где значение равно 1 в индексе, который соответствует «категории» этого столбца. Это также известно как «One Hot Encoding» во многих других пакетах.
//Пример
import org.apache.mahout.math.algorithms.preprocessing.AsFactor
val A = drmParallelize(dense(
(3, 2, 1, 2),
(0, 0, 0, 0),
(1, 1, 1, 1)), numPartitions = 2)
// 0 -> 2, 3 -> 5, 6 -> 9
val factorizer: AsFactorModel = new AsFactor().fit(A)
val factoredA = factorizer.transform(A)
MeanCenter центрирует значения относительно среднего значения столбца.
//Пример
import org.apache.mahout.math.algorithms.preprocessing.MeanCenter
val A = drmParallelize(dense(
(1, 1, -2),
(2, 5, 2),
(3, 9, 0)), numPartitions = 2)
val scaler: MeanCenterModel = new MeanCenter().fit(A)
val centeredA = scaler.transform(A)
StandardScaler центрирует значения каждого столбца по их среднему значению и масштабирует их до единичной дисперсии.
//Пример
import org.apache.mahout.math.algorithms.preprocessing.StandardScaler
val A = drmParallelize(dense(
(1, 1, 5),
(2, 5, -15),
(3, 9, -2)), numPartitions = 2)
val scaler: StandardScalerModel = new StandardScaler().fit(A)
val scaledA = scaler.transform(A)
Алгоритмы регрессий
Последовательная корреляция слагаемых ошибок может привести к смещенным оценкам параметров регрессии, предусмотрены следующие корректирующие процедуры:
Процедура Кокрейна-Оркутта используется в экономике для корректировки линейной модели для последовательной корреляции в терминах ошибки.
//Пример
val alsmBlaisdellCo = drmParallelize( dense(
(20.96, 127.3),
(21.40, 130.0),
(21.96, 132.7),
(21.52, 129.4),
(22.39, 135.0),
(22.76, 137.1),
(23.48, 141.2),
(23.66, 142.8),
(24.10, 145.5),
(24.01, 145.3),
(24.54, 148.3),
(24.30, 146.4),
(25.00, 150.2),
(25.64, 153.1),
(26.36, 157.3),
(26.98, 160.7),
(27.52, 164.2),
(27.78, 165.6),
(28.24, 168.7),
(28.78, 171.7) ))
val drmY = alsmBlaisdellCo(::, 0 until 1)
val drmX = alsmBlaisdellCo(::, 1 until 2)
var coModel = new CochraneOrcutt[Int]().fit(drmX, drmY , ('iterations -> 2))
println(coModel.rhos)
println(coModel.summary)
Тест Дурбина-Ватсона - это тест на последовательную корреляцию в терминах ошибок. Тестовая статистика Durbin Watson d может принимать значения от 0 до 4, и в целом
- d <1,5
подразумевает положительную автокорреляцию
- d> 2,5
подразумевает отрицательную автокорреляцию
- 1,5 <d <2,5
подразумевает автокорреляцию.
//Пример
// Тест Дурбина-Уотсона должен быть выполнен на модели. Модель не имеет значения.
val drmX = drmParallelize( dense((0 until 50).toArray.map( t => Math.pow(-1.0, t)) ) ).t
val drmY = drmX + err1 + 1
var model = new OrdinaryLeastSquares[Int]().fit(drmX, drmY)
// конец произвольной модели
val err1 = drmParallelize( dense((0.0 until 5.0 by 0.1).toArray) ).t
val syntheticResiduals = err1
model = AutocorrelationTests.DurbinWatson(model, syntheticResiduals)
val myAnswer: Double = model.testResults.getOrElse('durbinWatsonTestStatistic, -1.0).asInstanceOf[Double]
Алгоритмы кластеринга
Canopy clustering - очень простой, быстрый и удивительно точный метод группировки объектов в кластеры. Все объекты представлены в виде точек в многомерном пространстве признаков. Алгоритм использует быструю приблизительную метрику расстояния и два порога расстояния T1> T2 для обработки. Основной алгоритм состоит в том, чтобы начать с набора точек и удалить их наугад. Создайте Canopy, содержащий эту точку, и выполните итерацию по оставшейся части набора точек. В каждой точке, если ее расстояние от первой точки <T1, добавьте эту точку в кластер. Если, кроме того, расстояние <T2, то удалите точку из набора. Таким образом, точки, которые очень близки к оригиналу, позволят избежать дальнейшей обработки. Алгоритм зацикливается до тех пор, пока начальный набор не станет пустым, накапливая набор навесов, каждый из которых содержит одну или несколько точек. Заданная точка может встречаться в нескольких куполах.
Canopy clustering часто используется в качестве начального шага в более строгих методах кластеризации, таких как K-Means Clustering. Начав с начальной кластеризации, можно значительно сократить количество более дорогих измерений расстояний, игнорируя точки за пределами первоначальных куполов.
//Пример
val drmA = drmParallelize(dense((1.0, 1.2, 1.3, 1.4), (1.1, 1.5, 2.5, 1.0), (6.0, 5.2, -5.2, 5.3), (7.0,6.0, 5.0, 5.0), (10.0, 1.0, 20.0, -10.0)))
import org.apache.mahout.math.algorithms.clustering.CanopyClustering
val model = new CanopyClustering().fit(drmA, 't1 -> 6.5, 't2 -> 5.5, 'distanceMeasure -> 'Chebyshev)
model.cluster(drmA).collect
Метрики расстояния
Можно получить доступ к удаленным метрикам напрямую, чтобы измерить расстояние между двумя произвольными векторами, или указать, какую метрику расстояния использовать в качестве части алгоритма. В последнем случае метрика расстояния называется символом, мы никогда не передаем метрики расстояния непосредственно в алгоритм. Этот выбор конструкции, отчасти, связан с сериализацией объекта и обеспечением максимально простой привязки двигателя. За кулисами единственное, что сериализуется и отправляется рабочим, - это число, которое указывает, какую удаленную метрику использовать. Это гораздо более абстрактно и проще для поддержки на бэкэнде, чем уверенность, что каждая функция может быть сериализована любым произвольным двигателем. Мы считаем, что с точки зрения пользователя, это может показаться странным, но не приводит к снижению удобства использования. Если пользователь хочет использовать собственную метрику расстояния, просто добавьте ее в math-scala / src / main / org / apache / mahout / math / common / DistanceMetrics.scala и перекомпилируйте.
//Пример 1
//Расстояние между двумя произвольными векторами
import org.apache.mahout.math.algorithms.common.distance._
val v1 = dvec(1.0, 1.5, -1.2, 3.5)
val v2 = dvec(0.1, -1.4, 10.5, 3.2)
Cosine.distance(v1, v2)
//Пример 2
//Использование расстояний в кластеринге
import org.apache.mahout.math.algorithms.clustering.CanopyClustering
val drmA = drmParallelize(dense((1.0, 1.2, 1.3, 1.4),
(1.1, 1.5, 2.5, 1.0),
(6.0, 5.2, -5.2, 5.3),
(7.0,6.0, 5.0, 5.0),
(10.0, 1.0, 20.0, -10.0)))
val model = new CanopyClustering().fit(drmA, 'distanceMeasure -> 'Cosine)
Начало работы с Apache Mahout
Установка будет производиться на чистую систему Ubuntu 18.04 LTS. Листинг команд используемых в видео доступен ниже. Пример установки:
Установка может быть проведена с помощью install.sh, содержащего следующие команды:
- Mahout требует наличия Java, добавляем её репозиторий
$ sudo add-apt-repository ppa:webupd8team/java
- Подгружаем список доступных пакетов
$ sudo apt-get update
- Устанавливаем Java
$ sudo apt-get install oracle-java8-installer
- Прописываем путь до директории содержащей бинарный файл bin/java
$ export JAVA_HOME=/usr
- Скачиваем последнюю верстю Mahout из официального репозитория
$ wget http://archive.apache.org/dist/mahout/0.13.0/apache-mahout-distribution-0.13.0.tar.gz
- Распаковываем архив
$ tar xvfz apache-mahout-distribution-0.13.0.tar.gz
- Прописываем в переменные окружения местонахождение Mahout и флаг для локального использования
$ export MAHOUT_HOME=/root/apache-mahout-distribution-0.13.0/ $ export MAHOUT_LOCAL=true
- Скачиваем последнюю версию Hadoop из официального репозитория, поверх него работает Mahout
$ wget http://apache-mirror.rbc.ru/pub/apache/hadoop/common/hadoop-3.1.2/hadoop-3.1.2.tar.gz
- Распаковываем архив
$ tar xvfz hadoop-3.1.2.tar.gz
- Прописываем в переменные окружения местонахождения корневой папки и конфигов
$ export HADOOP_HOME=/root/hadoop-3.1.2 $ export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
Далее создаем табличный файл и тестируем установку.
Cборка приложения
Чтобы собрать и запустить CooccurrenceDriver, вам необходимо установить следующее[Источник 4]:
Установите Java 7 JDK из Oracle. Пользователи Mac смотрят здесь: Java SE Development Kit 7u72. Установите sbt (простой инструмент для сборки) 0.13.x для Mac, Linux или вручную. Установите Spark 1.1.1. Не забудьте настроить SPARK_HOME Установите Mahout 0.13.0. Не забудьте настроить MAHOUT_HOME и MAHOUT_LOCAL Определенные двоичные файлы и сценарии требуются библиотекам для получения информации об окружающей среде, например, при обнаружении расположения jar-файлов.
Spark требует набор jar-файлов на пути к классам для клиентской части приложения, а другой набор jar-файлов должен быть передан в Spark Context для запуска распределенного кода. В примере должны быть автоматически обнаружены все необходимые классы.
Использование Mahout в качестве библиотеки в приложении потребует немного кода Scala. У Scala есть свойство App, поэтому мы создадим объект, который наследуется от App.
object CooccurrenceDriver extends App {
}
Это будет выглядеть немного иначе, чем в Java, так как App выполняет отложенную инициализацию, что приводит к выполнению тела при запуске приложения, так же как в Java вы создадите основной метод.
Прежде чем мы сможем выполнить что-то в Spark, нам нужно создать контекст. Мы могли бы использовать необработанные вызовы Spark, но значения по умолчанию устанавливаются для контекста Mahout с помощью вспомогательной функции Mahout.
implicit val mc = mahoutSparkContext(masterUrl = "local",
appName = "CooccurrenceDriver")
Нам нужно прочитать три файла, содержащих разные типы взаимодействия. Каждый файл будет прочитан в Mahed IndexedDataset. Это позволяет нам сохранять специфичные для приложения идентификаторы пользователей и элементов в течение всех вычислений.
Например, вот data / purchase.csv:
u1,iphone
u1,ipad
u2,nexus
u2,galaxy
u3,surface
u4,iphone
u4,galaxy
Mahout имеет вспомогательную функцию, которая читает текстовые файлы с разделителями SparkEngine.indexedDatasetDFSReadElements. Функция считывает одноэлементные кортежи (user-id, item-id) распределенным способом для создания IndexedDataset. Матрицы распределенных строк (DRM) и векторы являются важными типами данных, предоставляемыми Mahout, а IndexedDataset похож на очень легкий Dataframe в R, он упаковывает DRM с HashBiMaps для идентификаторов строк и столбцов.
В этом примере следует отметить одну важную вещь: мы читаем во всех наборах данных, прежде чем скорректировать количество строк в них, чтобы они соответствовали общему количеству пользователей в данных. Это так, что математика работает (A’A, A’B, A’C), даже если некоторые пользователи предприняли одно действие, но не другое, во всех матрицах должно быть одинаковое количество строк.
/**
* Read files of element tuples and create IndexedDatasets one per action. These
* share a userID BiMap but have their own itemID BiMaps
*/
def readActions(actionInput: Array[(String, String)]): Array[(String, IndexedDataset)] = {
var actions = Array[(String, IndexedDataset)]()
val userDictionary: BiMap[String, Int] = HashBiMap.create()
// The first action named in the sequence is the "primary" action and
// begins to fill up the user dictionary
for ( actionDescription <- actionInput ) {// grab the path to actions
val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
actionDescription._2,
schema = DefaultIndexedDatasetElementReadSchema,
existingRowIDs = userDictionary)
userDictionary.putAll(action.rowIDs)
// put the name in the tuple with the indexedDataset
actions = actions :+ (actionDescription._1, action)
}
// After all actions are read in the userDictonary will contain every user seen,
// even if they may not have taken all actions . Now we adjust the row rank of
// all IndxedDataset's to have this number of rows
// Note: this is very important or the cooccurrence calc may fail
val numUsers = userDictionary.size() // one more than the cardinality
val resizedNameActionPairs = actions.map { a =>
//resize the matrix by, in effect by adding empty rows
val resizedMatrix = a._2.create(a._2.matrix, userDictionary, a._2.columnIDs).newRowCardinality(numUsers)
(a._1, resizedMatrix) // return the Tuple of (name, IndexedDataset)
}
resizedNameActionPairs // return the array of Tuples
}
Теперь, когда у нас есть считанные данные, мы можем выполнить расчет коэффициента совпадения.
// actions.map creates an array of just the IndeedDatasets
val indicatorMatrices = SimilarityAnalysis.cooccurrencesIDSs(
actions.map(a => a._2))
Все, что нам нужно сделать сейчас, это написать индикаторы.
// zip a pair of arrays into an array of pairs, reattaching the action names
val indicatorDescriptions = actions.map(a => a._1).zip(indicatorMatrices)
writeIndicators(indicatorDescriptions)
Метод writeIndicators использует функцию записи по умолчанию dfsWrite.
/**
* Write indicatorMatrices to the output dir in the default format
* for indexing by a search engine.
*/
def writeIndicators( indicators: Array[(String, IndexedDataset)]) = {
for (indicator <- indicators ) {
// create a name based on the type of indicator
val indicatorDir = OutputPath + indicator._1
indicator._2.dfsWrite(
indicatorDir,
// Schema tells the writer to omit LLR strengths
// and format for search engine indexing
IndexedDatasetWriteBooleanSchema)
}
}
Смотрите проект Github для полного источника. Теперь мы создаем build.sbt для построения примера.
name := "cooccurrence-driver"
organization := "com.finderbots"
version := "0.1"
scalaVersion := "2.10.4"
val sparkVersion = "1.1.1"
libraryDependencies ++= Seq(
"log4j" % "log4j" % "1.2.17",
// Mahout's Spark code
"commons-io" % "commons-io" % "2.4",
"org.apache.mahout" % "mahout-math-scala_2.10" % "0.10.0",
"org.apache.mahout" % "mahout-spark_2.10" % "0.10.0",
"org.apache.mahout" % "mahout-math" % "0.10.0",
"org.apache.mahout" % "mahout-hdfs" % "0.10.0",
// Google collections, AKA Guava
"com.google.guava" % "guava" % "16.0")
resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
packSettings
packMain := Map(
"cooc" -> "CooccurrenceDriver")
Сборка примеров из корневой папки проекта:
$ sbt pack
Это автоматически настроит некоторые сценарии запуска для драйвера. Чтобы запустить, выполните следующую команду:
$ target/pack/bin/cooc
Драйвер будет работать в автономном режиме Spark и помещать данные в / path / to / 3-input-cooc / data / Indicators / Indicator-type
Источники
- ↑ Downloads// mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/general/downloads/ (дата обращения: 03.02.2019).
- ↑ Apache Mahout Overview // mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/ (дата обращения: 24.12.2017)
- ↑ Deprecated MapReduce Algorithms // mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/docs/latest/algorithms/map-reduce/ (дата обращения: 24.12.2017)
- ↑ Mahout Samsara In Core// mahout.apache.org [2017-2017]. Дата обновления: 02.06.2017. URL: http://mahout.apache.org/docs/latest/tutorials/misc/how-to-build-an-app.html (дата обращения: 03.02.2019).
ISSN 2542-0356
Следуй за Полисом
Оставайся в курсе последних событий
Лицензия
Если не указано иное, содержание этой страницы доступно по лицензии Creative Commons «Attribution-NonCommercial-NoDerivatives» 4.0, а примеры кода – по лицензии Apache 2.0. Подробнее см. Условия использования.