【梦溪笔谈】6.spark-sql相关代码

import os
import sys
#import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession

#不启动BroadcastJoin 、conf spark.speculation=true
spark = SparkSession \
    .builder \
    .appName("app_test.py") \
    .enableHiveSupport() \
    .config("spark.dynamicAllocation.maxExecutors", "400") \
    .config("spark.sql.autoBroadcastJoinThreshold",-1) \
    .config("spark.yarn.executor.memoryOverhead", 3702) \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.repartition.enabled", "true") \
    .config("spark.log.level", "ERROR") \
    .config("spark.speculation", "true") \
    .config("spark.sql.hive.convertMetastoreOrc", "true")\
    .getOrCreate()
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.orc.split.strategy=ETL")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

from datetime import datetime, timedelta
def get_date(dt,time_delta=0):
    try:
       result=dt+timedelta(days=-time_delta)
    except:
        try:
           dt = datetime.strptime(dt, "%Y-%m-%d")  # 字符串转化为date形式
        except:
           dt = datetime.strptime(dt, '%Y%m%d')  # 字符串转化为date形式
        result = dt + timedelta(days=-time_delta)
    return str(result.strftime('%Y-%m-%d'))

def insert_tab(df,tab,spark):
    col_target = spark.sql("""select * from {tab} limit 1""".format(tab=tab)).columns
    col=df.columns
    not_in_col=[i for i in col_target if i not in col]
    for i in not_in_col:
        df = df.withColumn(i, F.lit(None))
    df2=df.select(col_target)
    df2.repartition('dt','data_type').write.insertInto(tab, overwrite=True)

def search_dt(partitions_list,dt):
    '''
    如果想要取的分区dt在partition_list中,则返回dt,否则返回dt之前最近的一个分区
    :param partition_list: 分区List
    :param dt: 想要取的分区
    :return: 函数最终确定的分区dt,字符串格式
    '''
    dt=get_date(dt,0)
    if 'ACTIVE' in partitions_list:
        partitions_list.remove('ACTIVE')
    if dt in partitions_list:
        return dt
    dt_date=datetime.strptime(dt, '%Y-%m-%d').date()
    partition_list_lag=[(datetime.strptime(p_dt, '%Y-%m-%d').date()-dt_date).days for p_dt in partitions_list]
    try:
        reuslt=max(list(filter(lambda x:x<0,partition_list_lag)))
    except:
        reuslt=min(list(filter(lambda x:x>0,partition_list_lag)))
    return datetime.strftime(dt_date+timedelta(reuslt),'%Y-%m-%d')

def get_nearest_dt(table_name,dt,spark):
    #检查是否有dt分区,如果没有,取最近分区
    partitions = spark.sql("show partitions %s"%table_name).collect()
    partitions_list = []
    for i in range(len(partitions)):
        dt_tmp = partitions[i]['partition']
        partitions_list.append(dt_tmp[3:])
    dt_result=search_dt(partitions_list,dt)
    return dt_result

 

上一篇:Spark基础篇-Spark-Core核心模型


下一篇:NASA丨登陆火星六大前沿技术