Параллельный запуск заданий в PySpark

Порой требуется обработать небольшие куски данных используя разные подходы, к примеру применяя для каждого свой фильтр в зависимости от источника данных; или тренировка нескольких ML-моделей разом. Конечно, можно запустить обработку в отдельных приложениях (разные SparkContext), но это не оптимальный вариант (оверхед на создание контекста, не читаемый jobhistory, и тд), будет лучше использовать встроенные механизмы самого PySpark.

PySpark имеет планировщик задач, по-умолчанию, работающий в режиме FIFO. То есть каждое задание будет получать приоритет на использование всех ресурсов кластера. Это приводит к тому, что небольшие задания могут запускаться не параллельно, а последовательно, если мало внимания уделить работе с партициями. Поэтому в подобных случаях лучше отдать предпочтение “справедливому” (FAIR) планировщику, который равномерно распределит ресурсы кластера.

Наглядный пример, с использованием FAIR scheduler и ThreadPool:

from multiprocessing.pool import ThreadPool

from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t


THREADS = 5
SCHEMA = t.StructType([
    t.StructField('source', t.StringType(), False),
    t.StructField('city', t.StringType(), False),
    t.StructField('population', t.FloatType(), False),
])


def processing_source(df, source):
    df_local = df.filter(f.col('source') == source)
    df_local = '...'


sc = SparkContext(appName='city by source')
ss = SparkSession(sc)

df = ss.read.format('csv').load('/path/to/file.csv', schema=SCHEMA)
df = df.select('...')  # prepare data
df = df.persist(StorageLevel.DISK_ONLY_2)  # or df.cache()

pool = ThreadPool(THREADS)
pool.map(lambda s: processing_source(df, s), SOURCE_LIST)
pool.close()
pool.join()

# run
# spark-submit --conf spark.scheduler.mode=FAIR ... task.py