Работа с C/C++ расширениями в PySpark
Последние годы Apache Spark зарекомендовал себя как отличный инструмент для работы с большими данными. Не только из-за высокой производительности, но и из-за простоты использования.
PySpark – отличный инструмент для работы со Spark на Python. Он отлично подходит для быстрого прототипирования обработчиков данных, и все бы ничего, но порой приходится использовать сторонние пакеты. Обычно достаточно упаковать код в .zip
или .egg
использовать --py-files foo.zip,bar.egg
, как сказано в документации. Но это работает не всегда, и огромную долю проблем вызывают python-пакеты с C/C++ кодом. К примеру, можно встретить такое:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/../lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 359, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/../lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in read_command
command = serializer._read_with_length(file)
File "/opt/../lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/opt/../lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in loads
return pickle.loads(obj, encoding=encoding)
File "./hll-py3.6-linux-x86_64.egg/hll.py", line 7, in <module>
__bootstrap__()
File "./hll-py3.6-linux-x86_64.egg/hll.py", line 3, in __bootstrap__
import sys, pkg_resources, imp
ModuleNotFoundError: No module named 'pkg_resources'
Конечно, можно поискать альтернативную pure-python реализацию, но это может привести к потере производительности. Можно использовать conda
, но не факт что там будет нужный вам пакет. Еще можно установить расширение на все ноды в кластере, но тогда мы будет зависеть от инфраструктуры, что тоже не хорошо.
В нашем проекте мы решили использовать другой подход – динамически загружать модуль в ходе работы.
- Для начала нужно скомпилировать C код в совместимую библиотеку, в ручную или в крайнем случае достать из яйца.
- Пробросить библиотеку в Spark, через
--files /path/lib/foo.so,/project/conf.ini
- С помощью
imp
импортировать модуль. У себя мы написали небольшую обертку для удобства:
import imp
import sys
from pyspark import SparkFiles
def load_lib(name, so_path):
if name not in sys.modules:
return imp.load_dynamic(name, SparkFiles.get(so_path))
return sys.modules[name]
Вызов будет следующим:
foo = load_lib('foo', 'foo.so')
data = foo.call_function('hello')
В итоге мы получаем все преимущества С расширений, независимость от инфраструктуры и решаем проблемы связанные зависимостями в замен на не совсем “удобный” вызов функций.