Spark UDF или всегда используйте explain
Всякий, кто хоть раз разбирался с User Defined Functions (UDF) в Spark натыкался на следующее утверждение:
Use the higher-level standard Column-based functions with Data set operators whenever possible before reverting to using your own custom UDF functions since UDFs are a blackbox for Spark and so it does not even try to optimize them.
И еще вереницу статей на тему, что использовать UDF крайне нежелательно и лучше работать только со встроенными функциями. И все это действительно так, но иногда без UDF обойтись нельзя. А материалов на тему “как правильно писать UDF” днем с огнем не найдешь.
Поэтому главное правило: если есть UDF всегда используйте explain
.
Часто Catalyst (движок оптимизаций) работает с UDF так, как если бы это было обычные встроенные функции, что иногда приводит к нежелательному поведению. Предположим, что нам нужно найти подсеть в наборе IP адресов. Мы определяем UDF и пишем набор инструкций для трансформации данных:
from pyspark.sql import functions as f
def to_subnet(v):
# body
return v
udf_to_subnet = f.udf(to_subnet)
df.select(
f.col('ip'),
f.col('unix_timestamp'),
).filter(
f.col('ip').isNotNull()
).groupBy(
f.col('ip'),
).agg(
f.min(f.col('ts')).alias('ts')
).withColumn(
'subnet', to_subnet(f.col('ip'))
).filter(
f.col('subnet').isNotNull()
)
При разборе плана запроса мы увидим, что наша функция была вызвана дважды (внимание на BatchEvalPython).
== Physical Plan ==
*(4) Project [ip#243, unix_timestamp#252L, pythonUDF0#261 AS subnet#256]
+- BatchEvalPython [to_subnet(ip#243)], [pythonUDF0#261]
+- *(3) HashAggregate(keys=[ip#243], functions=[min(unix_timestamp#244L)], output=[ip#243, unix_timestamp#252L, ip#243])
+- Exchange hashpartitioning(ip#243, 200), true, [id=#246]
+- *(2) HashAggregate(keys=[ip#243], functions=[partial_min(unix_timestamp#244L)], output=[ip#243, min#263L])
+- *(2) Project [_c2#136 AS ip#243, unix_timestamp(gettimestamp(_c3#137, dd/MMM/yyyy:HH:mm:ss Z, Some(Europe/Moscow)), yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS unix_timestamp#244L]
+- *(2) Project [_c2#136, _c3#137]
+- *(2) Filter isnotnull(pythonUDF0#260)
+- BatchEvalPython [to_subnet(_c2#136)], [pythonUDF0#260]
+- *(1) Project [_c2#136, _c3#137]
+- *(1) Filter isnotnull(_c2#136)
+- FileScan csv [_c2#136,_c3#137] Batched: false, DataFilters: [isnotnull(_c2#136)], Format: CSV, Location: InMemoryFileIndex[hdfs://..., PartitionFilters: [], PushedFilters: [IsNotNull(_c2)], ReadSchema: struct<_c2:string,_c3:string>
В случае если функция выполняет несложные операции, то это не так критично, но иначе мы серьезно потеряем в производительности.
Ответ, почему так происходит прост: все UDF по умолчанию детерминированные, поэтому Catalyst не стесняется использовать их многократно. Чтобы избежать подобного поведения, функцию нужно пометить как недетерминированную:
udf_to_subnet = f.udf(to_subnet).asNondeterministic()
Теперь план выглядит не только иначе, но и порой работает в несколько раз быстрее:
== Physical Plan ==
*(3) Project [ip#264, unix_timestamp#273L, pythonUDF0#281 AS subnet#277]
+- *(3) Filter isnotnull(pythonUDF0#281)
+- BatchEvalPython [to_subnet(ip#264)], [pythonUDF0#281]
+- *(2) HashAggregate(keys=[ip#264], functions=[min(unix_timestamp#265L)], output=[ip#264, unix_timestamp#273L, ip#264])
+- Exchange hashpartitioning(ip#264, 200), true, [id=#290]
+- *(1) HashAggregate(keys=[ip#264], functions=[partial_min(unix_timestamp#265L)], output=[ip#264, min#283L])
+- *(1) Project [_c2#136 AS ip#264, unix_timestamp(gettimestamp(_c3#137, dd/MMM/yyyy:HH:mm:ss Z, Some(Europe/Moscow)), yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS unix_timestamp#265L]
+- *(1) Filter isnotnull(_c2#136)
+- FileScan csv [_c2#136,_c3#137] Batched: false, DataFilters: [isnotnull(_c2#136)], Format: CSV, Location: InMemoryFileIndex[hdfs://..., PartitionFilters: [], PushedFilters: [IsNotNull(_c2)], ReadSchema: struct<_c2:string,_c3:string>
Как видите, правило обозначенное вначале помогло сэкономить время и ресурсы (а часто и деньги). Поэтому еще раз: если есть UDF всегда используйте explain
.