将Spark DataFrame写入Hive表中的内存分配问题

我正在尝试使用pySpark中的.saveAsTable()将Spark DataFrame保存到Hive表(Parquet)中,但仍然会遇到以下内存问题:

org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1:
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes.

第一个数字(1034931)通常在不同的运行中保持变化.我知道第二个数字(1048576)是1024 ^ 2,但是我不知道这是什么意思.

我在其他几个项目中使用了完全相同的技术(具有更大的DataFrame),并且该方法没有出现问题.在这里,我基本上复制粘贴了过程和配置的结构,但是遇到了内存问题!这一定是我想念的琐碎小事.

Spark DataFrame(我们称其为sdf)具有以下结构(〜10列和〜300k行,但如果运行正确,可能会更多):

+----------+----------+----------+---------------+---------------+
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str|
+----------+----------+----------+---------------+---------------+
|val_a1_str|val_b1_num|val_c1_num|     val_d1_str|     val_e1_str|
|val_a2_str|val_b2_num|val_c2_num|     val_d2_str|     val_e2_str|
|       ...|       ...|       ...|            ...|            ...|
+----------+----------+----------+---------------+---------------+

Hive表是这样创建的:

sqlContext.sql("""
                    CREATE TABLE IF NOT EXISTS my_hive_table (
                        col_a_str string,
                        col_b_num double,
                        col_c_num double
                    ) 
                    PARTITIONED BY (partition_d_str string,
                                    partition_e_str string)
                    STORED AS PARQUETFILE
               """)

使用以下命令尝试向该表中插入数据:

sdf.write \
   .mode('append') \
   .partitionBy('partition_d_str', 'partition_e_str') \
   .saveAsTable('my_hive_table')

Spark / Hive配置如下所示:

spark_conf = pyspark.SparkConf()
spark_conf.setAppName('my_project')

spark_conf.set('spark.executor.memory', '16g')
spark_conf.set('spark.python.worker.memory', '8g')
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000')
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64')
spark_conf.set('spark.executor.cores', '4')

sc = pyspark.SparkContext(conf=spark_conf)

sqlContext = pyspark.sql.HiveContext(sc)
sqlContext.setConf('hive.exec.dynamic.partition', 'true')
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000')
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict')
sqlContext.setConf('hive.exec.compress.output', 'true')

我尝试将.partitionBy(‘partition_d_str’,’partition_e_str’)更改为.partitionBy([‘partition_d_str’,’partition_e_str’])),增加内存,将DataFrame拆分为较小的块,然后重新创建表和DataFrame,但是似乎没有任何工作.我也找不到在线任何解决方案.是什么引起内存错误(我不完全了解这两个错误来自何处),以及如何更改代码以写入Hive表?谢谢.

解决方法:

事实证明,我正在使用可为空的字段进行分区,该字段使.saveAsTable()无效.当我将RDD转换为Spark DataFrame时,我提供的架构是这样生成的:

from pyspark.sql.types import *

# Define schema
my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), True)])

# Convert RDD to Spark DataFrame
sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema)

由于partition_e_str被声明为nullable = True(该StructField的第三个参数),因此在写入Hive表时会遇到问题,因为它被用作分区字段之一.我将其更改为:

# Define schema
my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), False)])

一切都恢复了!

课程:确保分区字段不可为空!

上一篇:实用经验 34 运算符引发的混乱


下一篇:34岁Java程序员裸辞,突围金三银四面试季