Обновляемый distinct count с помощью Hadoop
При работе с большими данными часто возникает задача оценить кол-во уникальных значений в наборе данных, к примеру пользователей или IP адресов. Обычно с самим подсчетом не возникает проблем, многие инструменты в стеке Hadoop могут это сделать, сложности начинаются когда подобные данные нужно вычислять периодически, к примеру, для ежедневных отчетов.
Проблема
Представим, что есть набор данных по событиям в мобильном приложении, каждый день создается 700 миллионов событий. Для отчета нам нужные данные по кол-ву уникальных пользователей на эвент за каждый день в течение месяца и их общее кол-во.
Подсчет под дням не выглядит сложным, каждый день нужно обрабатывать всего 700 миллионов строк. В первый день мы получили 110 миллионов уникальных пользователей, во второй 80 миллионов. Но как посчитать общее кол-во уникальных пользователей за весь месяц? Мы же не можем просто сложить кол-во уников за все дни, то есть мы не можем использовать предагрегацию.
Если решать задачу в лоб, то придется каждый раз повторно пересчитывать общее кол-во уникальных пользователей (31*700 = 22
миллиарда строк). Во-первых, это будет дорого, а во-вторых довольно медленно.
HyperLogLog
Для решения нужно обратиться к алгоритмам. HyperLogLog (HLL) — это вероятностный алгоритм для подсчета уникальных элементов другими словами оценки мощности набора данных. О том как работает алгоритм лучше, обратится к научным публикациям или к google, нам интересно другое.
- Во-первых, он значительно быстрее обычного
distinct count
, и использует значительно меньше памяти. К примеру,approx_count_distinct
в Apache Spark, как раз использует HyperLogLog под капотом. - Во-вторых, это вероятностный алгоритм, то есть он имеет погрешность, обычно приемлемую, около 4%. Как правило, реализации HLL предоставляют настройки, которые могут помочь уменьшить погрешность, но это ведет к увеличению используемой памяти.
- В-третьих, в ходе работы алгоритм использует последовательность битов часто называемым sketch. Так как это просто набор битов, его можно обновить используя туже реализацию HLL. A сами sketch’и могут быть мержиться (объединяться), то есть имея несколько sketch’ей, можно узнать их общую мощность.
По итогу, минимальное потребление памяти, и возможность повторно использовать sketch’и делают HyperLogLog идеальным для MapReduce.
Не всякий HyperLogLog подойдет
У HyperLogLog есть множество реализаций на всех языках, от самой крутой на C у Redis, до нативных на JavaScript. Но не все нам подойдут. Первый и самых важный критерий это доступ к sketch’ам, второй это переносимость между реализациями. За нас об этом позаботились другие и выпустили документ описывающий формат хранения hll-storage-spec. Опираясь на который на свет появились следующие реализации:
- postgresql-hll
- java-hll
- js-hll
- go-hll
- python-hll
- pig-hll
- как часть spark-alchemy
То есть мы можем посчитать наши данные на Apache Pig, загрузить их в Postgres, и вывести отчеты в реальном времени! Давайте, сделаем это!
Из Hadoop в Postgres
Воспользуемся Apache Pig и HyperLogLog для подсчета уникальных значений:
-- регистрируем HLL библиотеку
REGISTER /path/to/hll-pig-1.0.jar;
-- Объявляем функцию создания HLL с нужными нам параметрами
DEFINE HLL_CREATE com.xad.pig.udf.hll.HLL_CREATE('11', '5', '-1', 'true');
-- Объявляем данные
DATA_LOGS = LOAD '$input_data' AS (user_id:chararray, event_name:chararray);
-- Производим нужные операции
RESULT = GROUP DATA_LOGS BY event_name;
RESULT = FOREACH RESULT {
GENERATE group AS event_name, HLL_CREATE(DATA_LOGS.user_id) AS hll_sketch;
};
-- Сохраняем
STORE RESULT INTO '$output' USING PigStorage('\t');
Загружаем данные в Postgres:
-- создаем временную таблицу
DROP TABLE IF EXISTS tmp_1;
CREATE UNLOGGED TABLE tmp_1(
event_name integer,
raw_hll TEXT
);
-- загружаем данные посчитанные в Pig
COPY tmp_1 FROM STDIN DELIMITER E'\t'
-- заливаем данные в реальную таблицу
-- используем CAST, чтобы залить sketch как тип hll
INSERT INTO stat_table(created_at, event_name, user_ids)
SELECT NOW(), event_name, CAST(CONCAT('\x', raw_hll) AS hll)
FROM tmp_1;
DROP TABLE IF EXISTS tmp_1;
Теперь можно получить кол-во уникальных пользователей за месяц, не пересчитывая все данные:
SELECT hll_cardinality(hll_union_agg(user_ids))
FROM stat_table
WHERE event_name = 1 AND created_at BETWEEN date1 AND date2;
Опять же из-за одинаковых реализаций, можно достать данные из Postgres обновить из через go-hll, для последующих операций над ними.
Вывод
Отдельные подсчеты уникальных значений часто дороги для вычисления и сложны для постоянного обновления. Алгоритмы на подобии HyperLogLog позволяет делать это намного эффективнее.