python – 向Spark DataFrame添加一个空列

如在Web上的many other locations中所述,向现有DataFrame添加新列并不简单.不幸的是,拥有此功能非常重要(即使它在分布式环境中效率低下),尤其是在尝试使用unionAll连接两个DataFrame时.

将空列添加到DataFrame以便于unionAll的最优雅的解决方法是什么?

我的版本是这样的:

from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))

解决方法:

你需要的只是文字和演员:

from pyspark.sql.functions import lit

new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))

一个完整的例子:

df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
df.printSchema()

## root
##  |-- foo: long (nullable = true)
##  |-- bar: string (nullable = true)

new_df = df.withColumn('new_column', lit(None).cast(StringType()))
new_df.printSchema()

## root
##  |-- foo: long (nullable = true)
##  |-- bar: string (nullable = true)
##  |-- new_column: string (nullable = true)

new_df.show()

## +---+---+----------+
## |foo|bar|new_column|
## +---+---+----------+
## |  1|  2|      null|
## |  2|  3|      null|
## +---+---+----------+

可以在此处找到Scala等效项:Create new Dataframe with empty/null field values

上一篇:Android Architecture Components


下一篇:mysql – 使用for循环数组时使用INSERT INTO表ON DUPLICATE KEY时出错