HDFS(⼆)--客户端API操作及HDFS读写流程⼀、HDFS客户端环境准备
1)根据⾃⼰电脑的操作系统拷贝对应的编译后的hadoop jar包到⾮中⽂路径
2)配置HADOOP_HOME环境变量
3)创建maven⼯程,引⼊依赖及⽇志
pom依赖:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId&ls</groupId>
<artifactId&ls</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>D:/jdk/Java/jdk1.8.0_192/lib/tools.jar</systemPath>
</dependency>
</dependencies>
log4j.properties:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
编写程序测试:
public class HDFSClient {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
// 配置在集上运⾏
//        configuration.set("fs.defaultFS", "hdfs://hadoop102:9000");
/
/        FileSystem fs = (configuration);
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 创建⽬录
fs.mkdirs(new Path("/1108/daxian/lawrence"));
// 3 关闭资源
fs.close();
System.out.println("over");
}
}
⼆、HDFS客户端API操作
2.1、HDFS⽂件上传(测试参数优先级)
1)编写源代码
/**
* ⽂件上传
*/
@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
//        configuration.set("plication", "2");
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 上传⽂件
// 3 关闭资源
fs.close();
System.out.println("over");
}
2)将l拷贝到项⽬的根⽬录下
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name&plication</name>
<value>1</value>
</property>
</configuration>
3)参数优先级
参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的⽤户⾃定义配置⽂件 >(3)然后是服务器的默认配置2.2、HDFS⽂件下载
1)编写源代码
/**
* ⽂件下载
*/
@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "atguigu");
// 2 执⾏下载操作
// boolean delSrc 指是否将原⽂件删除
// Path src 指要下载的⽂件路径
// Path dst 指将⽂件下载到的路径
// boolean useRawLocalFileSystem 是否开启⽂件校验
// 3 关闭资源
fs.close();
System.out.println("over");
}
2.3、HDFS⽂件夹删除
/**
* ⽂件删除
*/
@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
/
/ 2 执⾏删除
fs.delete(new Path("/1108/"), true);
// 3 关闭资源
fs.close();
System.out.println("over");
}
2.4、HDFS⽂件名更改
/**
* ⽂件重命名
*/
@Test
public void testRename() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 修改⽂件名称
// 3 关闭资源
fs.close();
System.out.println("over");
}
2.5、HDFS⽂件详情查看
/
**
* 获取⽂件详情
* 查看⽂件名称、权限、长度、块信息
*/
@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException {
// 1获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 获取⽂件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus status = ();
// 输出详情
// ⽂件名称
System.out.Path().getName());
// 长度
System.out.Len());
// 权限
System.out.Permission());
// 分组
System.out.Group());
/
/ 获取存储的块信息
BlockLocation[] blockLocations = BlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = Hosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("-----------分割线----------");
}
/
/ 3 关闭资源
fs.close();
}
2.6、HDFS⽂件和⽂件夹判断
@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件配置信息
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 判断是⽂件还是⽂件夹
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
// 如果是⽂件
if (fileStatus.isFile()) {
System.out.println("file:" + Path().getName());
} else {
System.out.println("dir:" + Path().getName());
}
}
// 3 关闭资源
fs.close();
}
2.7、HDFS的I/O流操作
上⾯API操作HDFS系统都是框架封装好的,如果我们想⾃⼰实现上述API的操作该怎么实现呢?我们可以采⽤IO流的⽅式实现数据的上传和下载。
2.7.1、⽂件上传
/**
* 把本地盘上的⽂件上传到HDFS根⽬录
*/
@Test
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 创建输⼊流
FileInputStream fis = new FileInputStream(new File("d:/"));
// 3 获取输出流
FSDataOutputStream fos = fs.create(new Path("/"));
// 4 流对拷
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
2.7.2、⽂件下载
/**
* ⽂件下载
*/
@Test
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 获取输⼊流
FSDataInputStream fis = fs.open(new Path("/"));
/
/ 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("d:/"));
// 4 流的对拷
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
2.7.3、定位⽂件读取
分块读取HDFS上的⼤⽂件,⽐如根⽬录下的/hadoop-2.7.
/
**
* 定位⽂件读取
* 先上传⼀个⼤⽂件
* hadoop fs -put /opt/software/hadoop-2.7. /
*/
@Test
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 获取输⼊流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7."));
// 3 创建输出流
FileOutputStream fos = new FileOutputStream(new File("d:/hadoop-2.7.part1"));
// 4 流的拷贝
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
fos.write(buf);
}
// 5关闭资源
IOUtils.closeStream(fis);
log4j2 delete
IOUtils.closeStream(fos);
fs.close();
}
@Test
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException {
// 1 获取⽂件系统
Configuration configuration = new Configuration();
FileSystem fs = (new URI("hdfs://hadoop102:9000"), configuration, "hadoop");
// 2 打开输⼊流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7."));
// 3 定位输⼊数据位置
fis.seek(1024 * 1024 * 128);
// 4 创建输出流
FileOutputStream fos = new FileOutputStream(new File("d:/hadoop-2.7.part2"));
// 5 流的对拷
// 6 关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
fs.close();
}
合并⽂件,在Window命令窗⼝中进⼊到⽬录D:\,然后执⾏如下命令,对数据进⾏合并
type hadoop-2.7.part2 >> hadoop-2.7.part1
合并完成后,将hadoop-2.7.part1重新命名为hadoop-2.7.。解压发现该tar包⾮常完整。
三、HDFS数据流
3.1、HDFS写数据流程
3.1.1、剖析⽂件写⼊
1)客户端通过Distributed FileSystem模块向NameNode请求上传⽂件,NameNode检查⽬标⽂件是否已存在,⽗⽬录是否存在。
2)NameNode返回是否可以上传。
3)客户端请求第⼀个 Block上传到哪⼏个DataNode服务器上。
4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调⽤dn2,然后dn2调⽤dn3,将这个通信管道建⽴完成。
6)dn1、dn2、dn3逐级应答客户端。
7)客户端开始往dn1上传第⼀个Block(先从磁盘读取数据放到⼀个本地内存缓存),以Packet为单位,dn1收到⼀个Packet就会传给dn2,dn2传给dn3;dn1每传⼀个packet会
放⼊⼀个应答队列等待应答。
8)当⼀个Block传输完成之后,客户端再次请求NameNode上传第⼆个Block的服务器。(重复执⾏3-7步)。
计算
距离计算
节点距离
3.1.2、⽹络拓扑-节点
在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和。
类似于如下的⽹络拓扑图:
3.1.3、机架感知(副本存储节点选择)
机架感知说明:
For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack.对于hadoop2.7.2版本:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。