使用spark来处理CSV文件数据

1、使用spark来处理CSV文件,写入mysql表当中

spark介绍

Spark是一个快速(基于内存),通用、可扩展的计算引擎,采用Scala语言编写。2009年诞生于UC Berkeley(加州大学伯克利分校,CAL的AMP实验室),2010年开源,2013年6月进入Apach孵化器,同年由美国伯克利大学 AMP 实验室的 Spark 大数据处理系统多位创始人联合创立Databricks(属于 Spark 的商业化公司-业界称之为数砖-数据展现-砌墙-侧面应正其不是基石,只是数据计算),2014年成为Apach*项目,自2009年以来,已有1200多家开发商为Spark出力!

Spark支持Java、Scala、Python、R、SQL语言,并提供了几十种(目前80+种)高性能的算法,这些如果让我们自己来做,几乎不可能。

Spark得到众多公司支持,如:阿里、腾讯、京东、携程、百度、优酷、土豆、IBM、Cloudera、Hortonworks等。

spark是在Hadoop基础上的改进,是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

spark是基于内存计算框架,计算速度非常之快,但是它仅仅只是涉及到计算,并没有涉及到数据的存储,后期需要使用spark对接外部的数据源,比如hdfs。

1、创建mysql表

在mysql数据库当中进行创建数据库以及数据库表

CREATE DATABASE /*!32312 IF NOT EXISTS*/`job_crawel` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `job_crawel`;

/*Table structure for table `jobdetail` */

DROP TABLE IF EXISTS `jobdetail`;

CREATE TABLE `jobdetail` (
  `job_id` int(11) NOT NULL AUTO_INCREMENT,
  `job_name` varchar(900) DEFAULT NULL,
  `job_url` varchar(900) DEFAULT NULL,
  `job_location` varchar(900) DEFAULT NULL,
  `job_salary` varchar(900) DEFAULT NULL,
  `job_company` text,
  `job_experience` text,
  `job_class` text,
  `job_given` text,
  `job_detail` text,
  `company_type` text,
  `company_person` text,
  `search_key` varchar(900) DEFAULT NULL,
  `city` varchar(900) DEFAULT NULL,
  PRIMARY KEY (`job_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2、创建maven工程并导入jar包

<properties>
        <spark.version>2.3.3</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-annotations</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-math3</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>commons-net</groupId>
                    <artifactId>commons-net</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>commons-net</groupId>
                    <artifactId>commons-net</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>commons-logging</groupId>
                    <artifactId>commons-logging</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>net.java.dev.jets3t</groupId>
                    <artifactId>jets3t</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.curator</groupId>
                    <artifactId>curator-recipes</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.code.findbugs</groupId>
                    <artifactId>jsr305</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jersey</groupId>
                    <artifactId>jersey-json</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.8.1</version>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <!--这里锁定版本为2.9.2 -->
            <dependency>
                <groupId>com.fasterxml.jackson.module</groupId>
                <artifactId>jackson-module-scala_2.11</artifactId>
                <version>2.9.2</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

3、开发spark程序处理CSV文件数据

使用spark读取csv文件,然后将数据加载到hive表当中去

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object CSVOperate {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("sparkCSV")

    val session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    session.sparkContext.setLogLevel("WARN")
    val frame: DataFrame = session
      .read
      .option("inferSchema", "true")
      .format("csv")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .option("header", "true")
      .option("delimiter","@")
      .option("multiLine","true")
      .option("ignoreLeadingWhiteSpace", true)
      .option("multiLine", true)
      .load("file:///D:\\1、课程6.0版本课程资料\\20、公开课与训练营\\1、用数据告诉你职业发展之路该如何选择\\3、数据资料集\\job_detail4.csv")

    frame.createOrReplaceTempView("job_detail")
    //session.sql("select job_name,job_url,job_location,job_salary,job_company,job_experience,job_class,job_given,job_detail,company_type,company_person,search_key,city from job_detail where job_company = '北京无极慧通科技有限公司'  ").show(80)
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/job_crawel?useUnicode=true&characterEncoding=UTF-8", "job_crawel.jobdetail", prop)
  }
}
上一篇:调优之池化技术和异步化以及锁优化


下一篇:HTB openadmin writeup