python-Spark UDF没有并行运行

我正在尝试使用Google phonenumbers库的Python端口标准化5000万个电话号码.我正在从S3上的Parquet文件读入一个SparkDataFrame,然后在该数据帧上运行操作.以下函数parsePhoneNumber表示为UDF:

def isValidNumber(phoneNum):
    try:
        pn = phonenumbers.parse(phoneNum, "US")
    except:
        return False
    else:
        return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
    if isValidNumber(phoneNum):
        parsedNumber = phonenumbers.parse(phoneNum, "US")
        formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

        return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
    else:
        return (False, None, None, None)

以下是我如何使用UDF派生新列的示例:

newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)

通过运行display(newDataFrame)或newDataFrame.show(5)或类似的东西来执行UDF,集群中仅使用一个执行程序,因此,似乎UDF中的某些内容不会导致它仅在一个工作程序上运行.

如果我正在采取任何措施阻止并行运行,那么您能否提供一些见解?

执行环境位于Databricks控制的云群集上.

编辑:下面是oldDataFrame.explain的输出

== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...

解决方法:

你们都很好Display(带有默认参数)最多显示前1000行.同样,newDataFrame.show(5)仅显示前五行.

同时执行平原(oldDataFrame.explain)不会显示乱序,因此在这两种情况下,Spark都会仅评估最小数量的分区以获得所需的行数-对于这些值,它可能是一个分区.

如果您想确定:

>检查oldDataFrame.rdd.getNumPartitions()是否大于一.
>如果是,则使用df.foreach(lambda _:None)或newDataFrame.foreach(lambda _:None)强制执行所有分区.

您应该会看到更多活跃的执行者.

上一篇:python-将PySpark数据框列类型转换为字符串并替换方括号


下一篇:Visual Studio Code添加了对SQL Server 2019大数据群集PySpark开