hdfs,Java编程以及SequenceFile,java编程

创建目录的两种方法。第二种不会出现权限问题

个人建议用第二种

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.testng.annotations.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
//已经有的文件夹不会再创建
public class 创建目录 {
	//第一种创建方式
    @Test
    public static void main(String[] args) throws IOException {
        Configuration con = new Configuration();
        //设置namenode
        con.set("fs.defaultFS","hdfs://node00:9000");
        FileSystem fileSystem =FileSystem.get(con);
        boolean mkdirs = fileSystem.mkdirs(new Path("/kkb/dir1"));
        fileSystem.close();
    }
    //第二种创建方式
    @Test
    public void mkDirONHDFS2() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root");

        //通过filesystem创建目录
        boolean mkdirs = fileSystem.mkdirs(new Path("/kkb/dir2"));

        fileSystem.close();
    }
    //第二种创建方式,以及设置文件权限
    @Test
    public void mkDirONHDFS3() throws IOException, URISyntaxException, InterruptedException {
        Configuration con = new Configuration();
        //创建FsPermission对象,为文件设置权限
        FsPermission fsPermission = new FsPermission(FsAction.READ_WRITE,FsAction.READ,FsAction.READ);
        //创建fileSystem对象
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), con,"root");
        boolean mkdirs = fileSystem.mkdirs(new Path("/kkb/dir4"),fsPermission);
        fileSystem.close();
    }
}

上传下载文件file1-2方法

查看文件信息file3方法

IO方式上传下载文件(输入输出流)file4-5方法

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class 上传文件 {
    //从本地上传文件
    @Test
    public void uploadFile1HDFS() throws IOException, URISyntaxException, InterruptedException {
        //获取文件系统对象
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root");
        //传输文件
        fileSystem.copyFromLocalFile(new Path("D:/12.txt"),new Path("/kkb"));
        fileSystem.close();
    }
    从hdfs下载文件///
    @Test
    public void downloadFile2HDFS() throws IOException, URISyntaxException, InterruptedException {
        //获取文件系统对象
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root");
        //传输文件
        fileSystem.copyToLocalFile(new Path("/kkb/12.txt"),new Path("E:"));
        fileSystem.close();
    }



    查看文件信息,这只是范例,其他方法自己尝试///
    @Test
    public void viewFile3HDFS() throws IOException, URISyntaxException, InterruptedException {
        //获取文件系统对象
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root");
        RemoteIterator<LocatedFileStatus> listFile = fileSystem.listFiles(new Path("/kkb"), true);
        while (listFile.hasNext()){
            LocatedFileStatus FileStatus = listFile.next();
            String name = FileStatus.getPath().getName();//获取路径
            System.out.println("name:"+name);
            BlockLocation[] blockLocations = FileStatus.getBlockLocations();  //获取block块的位置
            for (BlockLocation bl :blockLocations ){
                String[] hosts = bl.getHosts();//获取block块所在节点,一般有三个节点
                for (int i = 0; i < hosts.length; i++) {
                    System.out.println(hosts[i]);
                }
                for (String host : hosts) {
                    System.out.println("host:"+host);
                }
            }
        }
        fileSystem.close();
    }

    // IO方式上传下载文件//
	//上传单个文件
    @Test
    public void putFile4HDFS() throws URISyntaxException, IOException, InterruptedException {
        //获取文件系统对象
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration, "root");
        //创建输入流不要加file,否则会报错
        FileInputStream fin = new FileInputStream(new File("D:/12.txt"));
        //创建输出流
        FSDataOutputStream fout = fileSystem.create(new Path("/kkb/13.txt"));
        //调用IO完成流拷贝
        IOUtils.copy(fin,fout);
        IOUtils.closeQuietly(fin);
        IOUtils.closeQuietly(fout);
        fileSystem.close();
    }

    //同时上传多个文件到一个里面
    @Test
    public void putFile5HDFS() throws URISyntaxException, IOException, InterruptedException {
        //获取分布式文件系统对象
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), new Configuration(), "root");
        FSDataOutputStream fout = fileSystem.create(new Path("/kkb/14.txt"));
        //文件输入流创建
//        FSDataInputStream open = fileSystem.open(new Path("/kkb/14.txt"));
        //获取本地文件系统对象
        Configuration configuration = new Configuration();
        LocalFileSystem localfileSystem = FileSystem.getLocal(new Configuration());
        FileStatus[] loacalfileStatus = localfileSystem.listStatus(new Path("D:/txt"));
        for (FileStatus fStatus: loacalfileStatus){
            Path path = fStatus.getPath();
            FSDataInputStream fin = localfileSystem.open(path);
            IOUtils.copy(fin,fout);
            IOUtils.closeQuietly(fin);
        }

        IOUtils.closeQuietly(fout);
        fileSystem.close();
    }
}

写入文件

package wl_02编程操作sequence_files;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class SequenceFiles_write {
    //模拟数据源;数组中一个元素表示一个文件的内容(DATA长度为5)
    private static final String[] DATA = {
            "The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.",
            "It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.",
            "Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer",
            "o delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.",
            "Hadoop Common: The common utilities that support the other Hadoop modules."
    };
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        //获取文件系统对象
        Configuration conf = new Configuration();
        FileSystem.get(new URI("hdfs://node00:9000/kkb/2.txt"),conf,"root");
        System.setProperty("HADOOP_USER_NAME","root");
        //创建各种option
        //将文件写入到那个文件--》option
        SequenceFile.Writer.Option file = SequenceFile.Writer.file(new Path("hdfs://node00:9000/kkb/2.txt"));
        因为SequenceFile每个record是键值对的,K是optipn
        SequenceFile.Writer.Option keyClass = SequenceFile.Writer.keyClass(IntWritable.class);
        //v也是option
        SequenceFile.Writer.Option valueClass = SequenceFile.Writer.valueClass(Text.class);

        //SequenceFile压缩方式:NONE | RECORD | BLOCK三选一
        //方案一:RECORD、不指定压缩算法
//        SequenceFile.Writer.Option compressOption   = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
//        SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);

        //方案二:BLOCK、不指定压缩算法
//        SequenceFile.Writer.Option compressOption   = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK);
//        SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);

        //方案三:使用BLOCK、压缩算法BZip2Codec;压缩耗时间
        //再加压缩算法
//        BZip2Codec codec = new BZip2Codec();
//        codec.setConf(conf);
//        SequenceFile.Writer.Option compressAlgorithm = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, codec);

        //开始写SequenceFile文件
        SequenceFile.Writer.Option compressOption   = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, file, keyClass, valueClass,compressOption);
        IntWritable key = new IntWritable();
        Text value = new Text();
        for (int i = 0; i < 10000; i++) {
            //设置key,value写入文件
            key.set(i);
            value.set(DATA[i % DATA.length]);
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
            writer.append(key,value);
        }
        IOUtils.closeStream(writer);
    }
}

命令查看SequenceFile内容

hdfs dfs -text /kkb/2.txt 或者打开网页查看

读取文件

package wl_02编程操作sequence_files;

import com.sun.corba.se.spi.ior.Writeable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import sun.reflect.Reflection;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class SequenceFiles_read {
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        //获取文件系统对象
        Configuration conf = new Configuration();
//        //要读的SequenceFile
//        FileSystem.get(new URI("hdfs://node00:9000/kkb/2.txt"), conf, "root");
        //指定用户
        System.setProperty("HADOOP_USER_NAME", "root");
        // 要读的SequenceFile文件
        SequenceFile.Reader.Option file = SequenceFile.Reader.file(new Path("hdfs://node00:9000/kkb/2.txt"));
        //        Reader对象
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, file);
        //创建key,value对象,用于存储key,value值
        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
        //获取现在要读的位置
        long position = reader.getPosition();
        System.out.println("position"+position);
        while (reader.next(key, value)) {
            //看一下当前位置是否是同步点
            String syncSeen = reader.syncSeen() ? "True" : "False";
            System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
            //移动到下一个record开头的位置
            position = reader.getPosition(); // beginning of next record
        }
        IOUtils.closeStream(reader);
    }
}

上一篇:vue项目如何关闭Eslint


下一篇:webapi读取配置文件内容