Обновляемый distinct count с помощью Hadoop

При работе с большими данными часто возникает задача оценить кол-во уникальных значений в наборе данных, к примеру пользователей или IP адресов. Обычно с самим подсчетом не возникает проблем, многие инструменты в стеке Hadoop могут это сделать, сложности начинаются когда подобные данные нужно вычислять периодически, к примеру, для ежедневных отчетов.

Проблема

Представим, что есть набор данных по событиям в мобильном приложении, каждый день создается 700 миллионов событий. Для отчета нам нужные данные по кол-ву уникальных пользователей на эвент за каждый день в течение месяца и их общее кол-во.

Подсчет под дням не выглядит сложным, каждый день нужно обрабатывать всего 700 миллионов строк. В первый день мы получили 110 миллионов уникальных пользователей, во второй 80 миллионов. Но как посчитать общее кол-во уникальных пользователей за весь месяц? Мы же не можем просто сложить кол-во уников за все дни, то есть мы не можем использовать предагрегацию.

Если решать задачу в лоб, то придется каждый раз повторно пересчитывать общее кол-во уникальных пользователей (31*700 = 22 миллиарда строк). Во-первых, это будет дорого, а во-вторых довольно медленно.

HyperLogLog

Для решения нужно обратиться к алгоритмам. HyperLogLog (HLL) — это вероятностный алгоритм для подсчета уникальных элементов другими словами оценки мощности набора данных. О том как работает алгоритм лучше, обратится к научным публикациям или к google, нам интересно другое.

По итогу, минимальное потребление памяти, и возможность повторно использовать sketch’и делают HyperLogLog идеальным для MapReduce.

Не всякий HyperLogLog подойдет

У HyperLogLog есть множество реализаций на всех языках, от самой крутой на C у Redis, до нативных на JavaScript. Но не все нам подойдут. Первый и самых важный критерий это доступ к sketch’ам, второй это переносимость между реализациями. За нас об этом позаботились другие и выпустили документ описывающий формат хранения hll-storage-spec. Опираясь на который на свет появились следующие реализации:

То есть мы можем посчитать наши данные на Apache Pig, загрузить их в Postgres, и вывести отчеты в реальном времени! Давайте, сделаем это!

Из Hadoop в Postgres

Воспользуемся Apache Pig и HyperLogLog для подсчета уникальных значений:

-- регистрируем HLL библиотеку
REGISTER /path/to/hll-pig-1.0.jar;

-- Объявляем функцию создания HLL с нужными нам параметрами
DEFINE HLL_CREATE com.xad.pig.udf.hll.HLL_CREATE('11', '5', '-1', 'true');

-- Объявляем данные
DATA_LOGS = LOAD '$input_data' AS (user_id:chararray, event_name:chararray);

-- Производим нужные операции
RESULT = GROUP DATA_LOGS BY event_name;
RESULT = FOREACH RESULT {
    GENERATE group AS event_name, HLL_CREATE(DATA_LOGS.user_id) AS hll_sketch;
};

-- Сохраняем
STORE RESULT INTO '$output' USING PigStorage('\t');

Загружаем данные в Postgres:

-- создаем временную таблицу
DROP TABLE IF EXISTS tmp_1;
CREATE UNLOGGED TABLE tmp_1(
    event_name integer,
    raw_hll TEXT
);

-- загружаем данные посчитанные в Pig
COPY tmp_1 FROM STDIN DELIMITER E'\t'

-- заливаем данные в реальную таблицу
-- используем CAST, чтобы залить sketch как тип hll 
INSERT INTO stat_table(created_at, event_name, user_ids)
SELECT NOW(), event_name, CAST(CONCAT('\x', raw_hll) AS hll)
FROM tmp_1;

DROP TABLE IF EXISTS tmp_1;

Теперь можно получить кол-во уникальных пользователей за месяц, не пересчитывая все данные:

SELECT hll_cardinality(hll_union_agg(user_ids)) 
FROM stat_table
WHERE event_name = 1 AND created_at BETWEEN date1 AND date2;

Опять же из-за одинаковых реализаций, можно достать данные из Postgres обновить из через go-hll, для последующих операций над ними.

Вывод

Отдельные подсчеты уникальных значений часто дороги для вычисления и сложны для постоянного обновления. Алгоритмы на подобии HyperLogLog позволяет делать это намного эффективнее.