Spark UDF на Rust

Apache Spark превосходный инструмент для обработки больших данных, мы используем PySpark (интерфейс для Spark на Python), у которого есть ряд ограничений по сравнению с версией на Scala, но это компенсируется гибкостью, скорость прототипирования и богатой экосистемой языка Python. Одна из главных головных болей в PySpark это UDF, они проигрывают в сравнении со Scala в скорости работы и иногда потребляют больше памяти. Если с первым можно смириться, то со втором приходится повозиться, так как ресурсы кластера небезграничны.

Чтобы бороться с этими проблемами применяют разные подходы такие как pandas_udf и вызов Scala из Python. pandas_udf хорош, когда UDF простой, так как сильно ограничен в типах; второй, конечно, гибкий, быстрый и вообще, но без знаний Scala не обойтись.

Последний каплей в моем терпении стало, странное поведение метода разбора URL в Java (соответственно и в Scala), из-за чего стало невозможно использовать его в работе, и пришлось прибегнуть к UDF. Реализация на Python почти в два раза замедлила работу джобы. В тот момент и родилась идея попробовать применить Rust. Да, идея безумна и граничит с бредом, но когда это кого-то останавливало?. Дальнейшее погружение в эту тему показало, что вызвать из Java методы Rust довольно просто и накладные расходы не такие значительные.

План в следующем: написать класс-обертку на Java, который будет вызывать реализацию на Rust, так как Java по сути родной язык Spark, мы не будем тратить лишние ресурсы на сериализацию объектов (если использовать Python->Rust) и позволит использовать UDF как в PySpark, так и версиях для других языков.

Далее мы погрузимся в реализацию, для этого мы разберем по шагам код в репозитории с примером готовой UDF.

Немного Java

Прежде всего, нам нужно написать обертку - UDF на Java, но место того, чтобы реализовывать логику, эта функция будет иметь специальный метод, вызов которого будет приводить к обращению к Rust. Чтобы иметь возможность работать с Java и в целом обеспечить связь двух технологий, мы будем использовать крейт jni-rs.

За основу UDF будет взят обобщенный (generic) класс UDF1 из пакета org.apache.spark. Раз он обобщенный, мы должны указать тип входных и выходные данных, в нашем случае это String и String[] , а также заимплементить метод call.

package com.github.silentsokolov;
import java.io.IOException;
import org.apache.spark.sql.api.java.UDF1;

public class App implements UDF1<String, String[]> {
    // метод "вызывающий" rust
    public static native String[] nativeUDF(String url);
	
    // чтобы иметь возможность обращаться к rust
    // мы должны подключить библиотеку, реализующую метод
    // иначе получим ошибку
    static {
        NativeUtils.loadLibraryFromJar("/libs/darwin-amd64.dylib");
    }

    // данный метод будет вызван при использовании UDF
    @Override
    public String[] call(String url) throws Exception {
        return App.nativeUDF(url);
    }
}

Связующее звено

Следующем шагом мы должны создать файл с заголовками, где будут указаны имя и типы, которых мы должны придерживаться при реализации в нашем коде Rust. К счастью, в Java сгенерировать заголовки довольно просто: javac -h . App. Для удобства, в коде проекта есть Makefile, позволяющий упростить эту операцию: make java_compile, после ее вызова будет создан файл src/main/native/include/com_github_silentsokolov_App.h со следующим содержимом:

/*
 * Class:     com_github_silentsokolov_App
 * Method:    nativeUDF
 * Signature: (Ljava/lang/String;)[Ljava/lang/String;
 */
JNIEXPORT jobjectArray JNICALL Java_com_github_silentsokolov_App_nativeUDF
  (JNIEnv *, jclass, jstring);

Это даем нам название функции, которое мы должны использовать, принимаемые ей аргументы и возвращаемый тип.

Непосредственно Rust

Из заголовков мы получили нужную информацию, теперь достаточно перенести ее на Rust:

pub extern "system" fn Java_com_github_silentsokolov_App_nativeUDF(
    env: JNIEnv,
    _class: JClass,
    url: JString,
) -> jobjectArray {
}

Как вы, наверно, уже поняли мы не можем напрямую использовать стандартные типы Rust в определении функции. Для этого jni предоставляет обертки: jstring, jobjectArray и другие. Благо они легко конвертируются к стандартным типами, поэтому при реализации непосредственно самой логики можно использовать их.

В коде также можете найти ряд полезных инструментов, как отлов ошибок и конвертация их к Java исключениям, запуск тестов и другие.

В заключении, мы должны скомпилировать библиотеку: cargo build --release.

Опять же, можно воспользоваться make rust_test - для запуска тестов и make rust_build - для сборки.

Готовим Jar

Чтобы сделать наш код переносимым, нужно создать jar-файл. Тут не все так просто, по-умолчанию в jar файл нельзя поместить подключаемые библиотеки. Но к счастью, это можно обойти, для этого воспользуемся NativeUtils.loadLibraryFromJar (thanks Adam Heinrich). У этого подхода есть свои минусы, о которых поговорим позже, но благодаря ему, отдельно переносить файл библиотеки между машинами не придется.

После компиляции, нужно перенести файл из target/release/**.so в src/main/resources/libs/ и скомпилировать jar: mvn package.

Уже догадались? Да, для удобства есть make java_package.

Все в одном месте

Уже не раз упомянутый Makefile можно назвать краеугольным камнем в нашем коде, благодаря ему репозитории становиться не просто примером, а настоящим шаблоном для создания UDF на Rust. Все что требуется поправить типы в Java, реализовать логику в Rust и запустить make build - на выходе будет создан jar готовый к использованию.

Подключаем к Spark

Осталось дело за малым - подключить нашу библиотеку к Spark и проверить ее в работе:

# spark-submit --jars /path/to/udf.jar ...

spark.udf.registerJavaFunction('parseUrl', 'com.github.silentsokolov.App', t.ArrayType(t.StringType(), True))

df = spark.createDataFrame(
    [
        ('https://www.github.com/rust-lang/rust/issues?labels=E-easy&state=open#hash'),
        ('https://google.com/'),
    ],
    ['url']
)
df = df.select(f.expr('parseUrl(url)'))
df.show()

Benchmark

Первое и главное, выше описанный подход имеет смысл только при сложных UDF, в случае конкатенации двух строк - UDF на Rust проиграет в скорости не только нативной реализации, но даже UDF на Python! Причина не в связке Rust-Java, а в том, что при работе нашего jar файла требуется “распаковать” файл библиотеки в систему! То есть файл, который был помещен в src/main/resources/libs/ , копируется во временную директорию, иначе Java просто неспособна к нему обратиться.

Теперь, можно и к тестам, у нас получилось следующее:

native: 7 s ± 197 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
rust: 7.66 s ± 459 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
python: 14 s ± 310 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

NOTE: под native понимается вызов встроенных функций Spark NOTE: 16 CPU, 64Gb RAM, Ubuntu 16.04, Spark 3.1.1 (default settings), Dataset 1kk urls.

Как видим версия на Rust +/- равна по скорости к нативной реализации, что по сути мы и добивались: без знаний Java реализовывать быстрые UDF с минимальным потреблением памяти.

Заключение

Во-первых, если вы знаете Java/Scala, то стоит использовать их! Не стоит городить костыли на ровном месте. Во-вторых, если вы, как и я, топите за Rust в больших данных, стоит присмотреться к таким инструментам как datafusion, ballista, polars. Иначе, если готовы к экспериментам, то надеюсь изыскания выше помогут в достижении ваших целей.