kafka学习笔记(一) kafka搭建

1 Kafka概述

1.1 定义

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

1.2 消息队列

1.2.1 传统消息队列的应用场景

1637153923628

使用消息队列的好处

1)解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2)可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

3)缓冲

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

4)灵活性 & 峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

5)异步通信

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.2.2 消息队列的两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。(这里注意是消费者主动拉取的

消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

1637153951339

(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息)

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。(这里注意数据也是消费者拉取的,因为消费者会一直轮询topic是否有消息

1637153963262

1.3 Kafka基础架构

1637154054233

1)Producer :消息生产者,就是向kafka broker发消息的客户端;

2)Consumer :消息消费者,向kafka broker取消息的客户端;

3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;

6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;

7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。

8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。

9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。

10)kafka集群依赖于zookeeper管理。

2 Kafka安装部署

2.1 集群规划

hadoop102hadoop103hadoop104
zkzkzk
kafkakafkakafka

2.2 Kafka 下载

http://kafka.apache.org/downloads.html

2.3 集群部署

1)解压安装包

[molly@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/

2)修改解压后的文件名称

[molly@hadoop102 module]$ mv kafka_2.11-2.4.1.tgz kafka

3)在/opt/module/kafka目录下创建logs文件夹

[molly@hadoop102 kafka]$ mkdir logs

4)修改配置文件

[molly@hadoop102 kafka]$ cd config/
[molly@hadoop102 config]$ vi server.properties

输入以下内容:

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

5)配置环境变量

[molly@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[molly@hadoop102 module]$ source /etc/profile

6)分发安装包

[molly@hadoop102 module]$ xsync kafka/

​ 注意:分发之后记得配置其他机器的环境变量

7)分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2

​ 注:broker.id不得重复

8)启动集群

​ 先启动Zookeeper集群,然后启动kafaka

[molly@hadoop102  kafka]$ zk.sh start 

依次在hadoop102、hadoop103、hadoop104节点上启动kafka

[molly@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[molly@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon  config/server.properties

[molly@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon  config/server.properties

9)关闭集群

[molly@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[molly@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[molly@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

10)kafka群起脚本

#!/bin/bash
if [ $# -lt 1 ]
then 
  echo "Input Args Error....."
  exit
fi
for i in hadoop102 hadoop103 hadoop104
do

case $1 in
start)
  echo "==================START $i KAFKA==================="
  ssh $i /opt/module/kafka_2.11-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
;;
stop)
  echo "==================STOP $i KAFKA==================="
  ssh $i /opt/module/kafka_2.11-2.4.1/bin/kafka-server-stop.sh stop
;;

*)
 echo "Input Args Error....."
 exit
;;  
esac
done

3 Kafka命令行操作

kafka提供了测试脚本kafka-topics.sh用来测试。

1)查看当前服务器中的所有topic

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2)创建topic

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first

选项说明:

–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数

3)删除topic

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

4)发送消息:生产消息– 9092是kafka默认端口

[molly@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>molly molly

5)消费消息

[molly@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --topic first

[molly@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic first

–from-beginning:会把主题中现有的所有的数据都读取出来

6)查看某个Topic的详情

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe –-topic first

7)修改分区数 alter只能修改

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter –-topic first --partitions 6

4 Kafka监控

我们知道一个叫kafka manager的kafka管理工具,这个工具管理kafka确实很强大,但是没有安全认证,随便都可以创建,删除,修改topic,而且告警系统,流量波动做的不好。所以,在这里浪尖,再给大家推荐一款kafka 的告警监控管理工具,kafka-eagle。Kafka Eagle是一款开源的Kafka集群监控系统。能够实现broker级常见的JMX监控;能对consumer消费进度进行监控;能在页面上直接对多个集群进行管理;安装方式简单,二进制包解压即用;可以配置告警(钉钉、微信、email均可)。

kafka-eagle主要是有几个我们关注 但kafkamanager不存在的点,值得一提:

  • 流量,最长可以查看最近七天的流量波动图
  • lag size邮件告警
  • 可以用kafkasql分析

相关官方地址:

1)修改kafka启动命令

修改kafka-server-start.sh命令中

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在启动Kafka之前要分发之其他节点

2)上传压缩包kafka-eagle-bin-1.4.5.tar.gz到集群/opt/software目录

3)解压到本地

[molly@hadoop102 software]$ tar -zxvf kafka-eagle-bin-1.4.5.tar.gz

4)进入刚才解压的目录

[molly@hadoop102 kafka-eagle-bin-1.4.5]$ ll
总用量 82932
-rw-rw-r--. 1 molly molly 84920710 8月 13 23:00 kafka-eagle-web-1.4.5-bin.tar.gz

5)将kafka-eagle-web-1.3.7-bin.tar.gz解压至/opt/module

[molly@hadoop102 kafka-eagle-bin-1.4.5]$ tar -zxvf kafka-eagle-web-1.4.5-bin.tar.gz -C /opt/module/

6)修改名称

[molly@hadoop102 module]$ mv kafka-eagle-web-1.4.5/ eagle

7)给启动文件执行权限

[molly@hadoop102 eagle]$ cd bin/
[molly@hadoop102 bin]$ ll

总用量 12

-rw-r--r--. 1 molly molly 1848 8月 22 2017 ke.bat

-rw-r--r--. 1 molly molly 7190 7月 30 20:12 ke.sh

[molly@hadoop102 bin]$ chmod 777 ke.sh

8)修改配置文件 conf/system-config.properties

######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181

######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka

######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false

######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

9)添加环境变量

export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin
#注意:source /etc/profile

10)启动

[atguigu@hadoop102 eagle]$ bin/ke.sh start
... ...
... ...
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.202.102:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

注意:启动之前需要先启动ZK以及KAFKA

11)登录页面查看监控数据

http://192.168.202.102:8048/ke

1637153214475

5 Flume对接Kafka

5.1 简单实现

1)配置flume

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F  /opt/module/data/flume.log

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 启动kafka消费者

3) 进入flume根目录下,启动flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/data/flume.log里追加数据,查看kafka消费者消费情况

$ echo hello >> /opt/module/data/flume.log

5.2 数据分离

0)需求

将flume采集的数据按照不同的类型输入到不同的topic中

​ 将日志数据中带有molly的,输入到Kafka的first主题中,

​ 将日志数据中带有shangguigu的,输入到Kafka的second主题中,

​ 其他的数据输入到Kafka的third主题中

1) 编写Flume的Interceptor

package com.atguigu.kafka.flumeInterceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import javax.swing.text.html.HTMLEditorKit;
import java.util.List;
import java.util.Map;

public class FlumeKafkaInterceptor implements Interceptor &#123;
    @Override
    public void initialize() &#123;

    &#125;

    /**
     * 如果包含"atguigu"的数据,发送到first主题
     * 如果包含"sgg"的数据,发送到second主题
     * 其他的数据发送到third主题
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) &#123;
        //1.获取event的header
        Map<String, String> headers = event.getHeaders();
        //2.获取event的body
        String body = new String(event.getBody());
        if(body.contains("atguigu"))&#123;
            headers.put("topic","first");
        &#125;else if(body.contains("sgg"))&#123;
            headers.put("topic","second");
        &#125;
        return event;

    &#125;

    @Override
    public List<Event> intercept(List<Event> events) &#123;
        for (Event event : events) &#123;
          intercept(event);
        &#125;
        return events;
    &#125;

    @Override
    public void close() &#123;

    &#125;

    public static class MyBuilder implements  Builder&#123;

        @Override
        public Interceptor build() &#123;
            return  new FlumeKafkaInterceptor();
        &#125;

        @Override
        public void configure(Context context) &#123;

        &#125;
    &#125;
&#125;

2)将写好的interceptor打包上传到Flume安装目录的lib目录下

3)配置flume

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = third
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.kafka.flumeInterceptor.FlumeKafkaInterceptor$MyBuilder

# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4) 启动kafka消费者

5) 进入flume根目录下,启动flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

6) 向6666端口写数据,查看kafka消费者消费情况

Hive学习笔记(一) Hive安装

前面学习到利用mapreduce去计算,但是mapreduce写起来麻烦,并且代码重复度高,可以进行封装,所以就出来了Hive,hive工具通过执行类SQL来启动写好的mapreduce,进一步执行hdfs中的资源。

1 Hive是什么

1.1 概念

1) hive简介

Hive:由Facebook开源用于解决海量结构化日志的数据统计工具。

Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能。

2) Hive本质:将HQL转化成MapReduce程序如下图所示。需要注意的是以下三点:

(1)Hive处理的数据存储在HDFS

(2)Hive分析数据底层的实现是MapReduce

(3)执行程序运行在Yarn上

1636945549117

1.2 Hive架构

Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。具体架构图如下所示。

1636945658710

1)用户接口:Client

CLI(command-line interface)、JDBC/ODBC(jdbc访问hive)、WEBUI(浏览器访问hive)

2)元数据:Metastore

元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;

默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore

3)Hadoop

使用HDFS进行存储,使用MapReduce进行计算。

4)驱动器:Driver

(1)解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。

(2)编译器(Physical Plan):将AST编译生成逻辑执行计划。

(3)优化器(Query Optimizer):对逻辑执行计划进行优化。

(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR/Spark。

2 Hive 安装

2.1 Hive安装地址

1)Hive官网地址

http://hive.apache.org/

2)文档查看地址

https://cwiki.apache.org/confluence/display/Hive/GettingStarted

3)下载地址

http://archive.apache.org/dist/hive/

4)github地址

https://github.com/apache/hive

2.2 MySql安装

0)为什么需要Mysql

原因在于Hive默认使用的元数据库为derby,开启Hive之后就会占用元数据库,且不与其他客户端共享数据,如果想多窗口操作就会报错,操作比较局限。以我们需要将Hive的元数据地址改为MySQL,可支持多窗口操作。

1)检查当前系统是否安装过Mysql

[molly@hadoop102 ~]$ rpm -qa|grep -I -E mysql\|mariadb
mariadb-libs-5.5.56-2.el7.x86_64 //如果存在通过如下命令卸载
[molly@hadoop102 ~]$ sudo rpm -e --nodeps mariadb-libs  //用此命令卸载mariadb
[molly@hadoop102 ~]$ rpm -qa|grep -I -E mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps#卸载所有

2)将MySQL安装包拷贝到/opt/software目录下

[molly@hadoop102 software]# ll
总用量 528384
-rw-r--r--. 1 root root 609556480 3月 21 15:41 mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

3)解压MySQL安装包

[molly@hadoop102 software]# tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

4)在安装目录下执行rpm安装

[molly@hadoop102 software]$ sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm
[molly@hadoop102 software]$ sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm
[molly@hadoop102 software]$ sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm
[molly@hadoop102 software]$ sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm
[molly@hadoop102 software]$ sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm

注意:按照顺序依次执行

如果Linux是最小化安装的,在安装mysql-community-server-5.7.28-1.el7.x86_64.rpm时可能会出 现如下错误

[molly@hadoop102 software]$ sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm

警告:mysql-community-server-5.7.28-1.el7.x86_64.rpm: 头V3 DSA/SHA1 Signature, 密钥 ID 5072e1f5: NOKEY

错误:依赖检测失败:

​ libaio.so.1()(64bit) 被 mysql-community-server-5.7.28-1.el7.x86_64 需要

​ libaio.so.1(LIBAIO_0.1)(64bit) 被 mysql-community-server-5.7.28-1.el7.x86_64 需要

​ libaio.so.1(LIBAIO_0.4)(64bit) 被 mysql-community-server-5.7.28-1.el7.x86_64 需要

通过yum安装缺少的依赖,然后重新安装mysql-community-server-5.7.28-1.el7.x86_64 即可

[molly@hadoop102 software] yum install -y libaio

5)删除/etc/my.cnf文件中datadir指向的目录下的所有内容,如果有内容的情况下:

查看datadir的值:

[mysqld]

datadir=/var/lib/mysql

删除/var/lib/mysql目录下的所有内容:

[molly@hadoop102 mysql]# cd /var/lib/mysql
[molly@hadoop102 mysql]# sudo rm -rf ./*  //注意执行命令的位置

6)初始化数据库

[molly@hadoop102 opt]$ sudo mysqld --initialize --user=mysql

7)查看临时生成的root用户的密码

[molly@hadoop102 opt]$ cat /var/log/mysqld.log |grep password

8)启动MySQL服务

[molly@hadoop102 opt]$ sudo systemctl start mysqld

9)登录MySQL数据库

[molly@hadoop102 opt]$ mysql -uroot -p
Enter password:  输入临时生成的密码

登录成功.

10)必须先修改root用户的密码,否则执行其他的操作会报错

mysql> set password = password("000000")

11)修改mysql库下的user表中的root用户允许任意ip连接

mysql> update mysql.user set host='%' where user='root';
mysql> flush privileges;

2.3 Hive安装部署

1)把apache-hive-3.1.2-bin.tar.gz上传到linux的/opt/software目录下

2)解压apache-hive-3.1.2-bin.tar.gz到/opt/module/目录下面

[molly@hadoop102 software]$ tar -zxvf /opt/software/apache-hive-3.1.2-bin.tar.gz -C /opt/module/

3)修改apache-hive-3.1.2-bin.tar.gz的名称为hive

[molly@hadoop102 software]$ mv /opt/module/apache-hive-3.1.2-bin/ /opt/module/hive-3.1.2

4)修改/etc/profile.d/my_env.sh,添加环境变量

[molly@hadoop102 software]$ sudo vim /etc/profile.d/my_env.sh

5)添加内容

#HIVE_HOME
HIVE_HOME=/opt/module/hive-3.1.2
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin
export PATH JAVA_HOME HADOOP_HOME HIVE_HOME

6)解决日志Jar包冲突:Hive日志与Hadoop默认日志冲突,可以直接删除hive日志JAR

[molly@hadoop102 software]$ rm -rf /opt/module/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar 

2.4 Hive元数据配置到MySql

因为Hive默认使用的元数据库为derby,为了想多窗口操作,我们需要将Hive的元数据地址改为MySQL。下面安装好mysql后,进行将Hive元数据配置到mysql上。其中HIVE_HOME=/opt/module/hive-3.1.2

2.4.1 拷贝驱动

将MySQL的JDBC驱动拷贝到Hive的lib目录下

[molly@hadoop102 software]$ cp /opt/software/mysql-connector-java-5.1.48.jar $HIVE_HOME/lib

2.4.2 配置Metastore到MySql

在$HIVE_HOME/conf目录下新建hive-site.xml文件

[molly@hadoop102 software]$ vim $HIVE_HOME/conf/hive-site.xml

添加如下内容

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <!-- jdbc连接的URL -->
  <property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value>
</property>
  <!-- jdbc连接的Driver-->
  <property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value>
</property>
  <!-- jdbc连接的username-->
  <property><name>javax.jdo.option.ConnectionUserName</name><value>root</value>
  </property>
  <!-- jdbc连接的password -->
  <property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value>
</property>
  <!-- Hive默认在HDFS的工作目录 -->
  <property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value>
  </property>
  <!-- Hive元数据存储的验证 -->
  <property><name>hive.metastore.schema.verification</name><value>false</value>
  </property>
  <!-- 元数据存储授权 -->
  <property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value>
  </property>
</configuration>

2.5 启动Hive

2.5.1 初始化元数据库

1)登陆MySQL

[molly@hadoop102 software]$ mysql -uroot -p000000

2)新建Hive元数据库

mysql> create database metastore;
mysql> quit;

3)初始化Hive元数据库

[molly@hadoop102 software]$ schematool -initSchema -dbType mysql -verbose

2.5.2 启动Hive

0)先启动hadoop集群

[molly@hadoop102 bin]$ start-dfs.sh
[molly@hadoop102 bin]$ start-yarn.sh
[molly@hadoop102 bin]$ myjps.sh

浏览器中查看hdfs:http://Hadoop102:9870

浏览器中查看yarn :http://Hadoop103:8088

1)启动Hive

[molly@hadoop102 hive]$ bin/hive

2)使用Hive

hive> show databases;
hive> show tables;
hive> create table test (id int);
hive> insert into test values(1);
hive> select * from test;

2.5.3 使用元数据服务的方式访问Hive

原始方法:Hive直接访问mysql

使用元数据服务方式:Hive—》元数据服务,元数据服务—》访问mysql。

1)在hive-site.xml文件中添加如下配置信息

  <!-- 指定存储元数据要连接的地址 -->
  <property><name>hive.metastore.uris</name><value>thrift://hadoop102:9083</value>
  </property>

2)启动metastore

[molly@hadoop202 hive]$ hive --service metastore
2020-04-24 16:58:08: Starting Hive Metastore Server

注意: 启动后窗口不能再操作,需打开一个新的shell窗口做别的操作

3)启动 hive

[molly@hadoop202 hive]$ bin/hive

2.5.4 使用JDBC方式访问Hive

客户端是beeline(JDBC协议去访问);

服务端:Hive使用hiveserver2提供JDBC协议:

所以访问数据流是:blleline通过hiveserver2去访问Hive。

1)在hive-site.xml文件中添加如下配置信息

2)启动hiveserver2

[molly@hadoop102 hive]$ bin/hive --service hiveserver2

3)启动beeline客户端(需要多等待一会)

[molly@hadoop102 hive]$ bin/beeline -u jdbc:hive2://hadoop102:10000 -n molly

4)看到如下界面

Connecting to jdbc:hive2://hadoop102:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
0: jdbc:hive2://hadoop102:10000>

3 常用命令

3.1 Hive常用交互命令

[molly@hadoop102 hive]$ bin/hive -help

1)“-e”不进入hive的交互窗口执行sql语句

[molly@hadoop102 hive]$ bin/hive -e "select id from student;"

2)“-f”执行脚本中sql语句

(1)在/opt/module/hive/下创建datas目录并在datas目录下创建hivef.sql文件

[molly@hadoop102 datas]$ touch hivef.sql

(2)文件中写入正确的sql语句

select *from student;

(3)执行文件中的sql语句

[molly@hadoop102 hive]$ bin/hive -f /opt/module/hive/datas/hivef.sql

(4)执行文件中的sql语句并将结果写入文件中

[molly@hadoop102 hive]$ bin/hive -f /opt/module/hive/datas/hivef.sql > /opt/module/datas/hive_result.txt

3.2 Hive其他命令操作

1)退出hive窗口:

hive(default)>exit;
hive(default)>quit;

在新版的hive中没区别了,在以前的版本是有的:

exit:先隐性提交数据,再退出;

quit:不提交数据,退出;

2)在hive cli命令窗口中如何查看hdfs文件系统

hive(default)>dfs -ls /;

3)查看在hive中输入的所有历史命令

(1)进入到当前用户的根目录/root或/home/molly

(2)查看. hivehistory文件

[atguig2u@hadoop102 ~]$ cat .hivehistory

3.3 Hive常见属性配置

3.3.1 hive窗口打印默认库和表头

1)打印 当前库 和 表头

在hive-site.xml中加入如下两个配置:

<property>
  <name>hive.cli.print.header</name>
  <value>true</value>
 </property>
  <property>
  <name>hive.cli.print.current.db</name>
  <value>true</value>
 </property>

3.3.2 Hive运行日志信息配置

1)Hive的log默认存放在/tmp/molly/hive.log目录下(当前用户名下)

2)修改hive的log存放日志到/opt/module/hive/logs

(1)修改/opt/module/hive/conf/hive-log4j2.properties.template文件名称为

hive-log4j2.properties

[molly@hadoop102 conf]$ pwd
/opt/module/hive/conf
[molly@hadoop102 conf]$ mv hive-log4j2.properties.template hive-log4j2.properties

(2)在hive-log4j.properties文件中修改log存放位置

property.hive.log.dir=/opt/module/hive/logs

3.3.3 参数配置方式

1)查看当前所有的配置信息

hive>set;

2)参数的配置三种方式

(1)配置文件方式

默认配置文件:hive-default.xml

用户自定义配置文件:hive-site.xml

注意:用户自定义配置会覆盖默认配置。另外,Hive也会读入Hadoop的配置,因为Hive是作为Hadoop的客户端启动的,Hive的配置会覆盖Hadoop的配置。配置文件的设定对本机启动的所有Hive进程都有效。

(2)命令行参数方式

启动Hive时,可以在命令行添加-hiveconf param=value来设定参数。

例如:

[molly@hadoop103 hive]$ bin/hive -hiveconf mapred.reduce.tasks=10;

注意:仅对本次hive启动有效

查看参数设置:

hive (default)> set mapred.reduce.tasks;

(3)参数声明方式

可以在HQL中使用SET关键字设定参数

例如:

hive (default)> set mapred.reduce.tasks=100;

注意:仅对本次hive启动有效。

查看参数设置

hive (default)> set mapred.reduce.tasks;

上述三种设定方式的优先级依次递增。即配置文件<命令行参数<参数声明。注意某些系统级的参数,例如log4j相关的设定,必须用前两种方式设定,因为那些参数的读取在会话建立以前已经完成了。

Hadoop 教程(五)mapreduce架构解析

1 MapReduce框架原理

本文介绍MapReduce框架原理。

1 InputFormat数据输入

1638352994475

1.1 切片与MapTask并行度决定机制

1)问题引出

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

2)MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

数据切片与MapReduce并行度决定机制如下图所示。

1638353087535

1.2 Job提交流程源码和切片源码详解

1.2.1 Job提交流程源码详解

1638353195677

waitForCompletion()

submit();

// 1建立连接
    connect();    
        // 1)创建提交Job的代理
        new Cluster(getConfiguration());
            // (1)判断是本地运行环境还是yarn集群运行环境
            initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)创建给集群提交数据的Stag路径
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobid ,并创建Job路径
    JobID jobId = submitClient.getNewJobID();

    // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);    
    rUploader.uploadFiles(job, jobSubmitDir);

// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
    conf.writeXml(out);

// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

1.2.2 FileInputFormat切片源码解析(input.getSplits(job))

1638353267613

1.3 FileInputFormat切片机制

1638353303839

1638353324835

1.4 TextInputFormat

File InputFormat实现类如下图所示

1638353351028

1638353386562

1.5 CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

1)应用场景:

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

2)虚拟存储切片最大值设置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

3)切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分。

combineTestInputFormat切片机制如下图所示。

1638353429951

(1)虚拟存储过程:

将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)

例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

(2)切片过程:

(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。

(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:

1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

最终会形成3个切片,大小分别为:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

1.6 CombineTextInputFormat案例实操

1)需求

将输入的大量小文件合并成一个切片统一处理。

(1)输入数据

准备4个小文件

1638353518528

(2)期望

期望一个切片处理4个文件

2)实现过程

(1)不做任何处理,运行1.6节的WordCount案例程序,观察切片个数为4。

1638353530002

(2)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为3。

(a)驱动类中添加代码如下:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

​ (b)运行如果为3个切片。1638353550913

(3)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1。

​ (a)驱动中添加代码如下:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

(b)运行如果为1个切片。1638353569666

2 MapReduce工作流程

MapReduce工作流程图1:

1638353610352

MapReduce工作流程图2:

1638353644031

上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:

(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

(3)多个溢出文件会被合并成大的溢出文件

(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

(5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

注意:

(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。

(3)源码解析流程


=================== MapTask ===================
context.write(k, NullWritable.get());   //自定义的map方法的写出,进入
output.write(key, value);  
    //MapTask727行,收集方法,进入两次 
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
    HashPartitioner(); //默认分区器
collect()  //MapTask1082行 map端所有的kv全部写出后会走下面的close方法
    close() //MapTask732行
    collector.flush() // 溢出刷写方法,MapTask735行,提前打个断点,进入
sortAndSpill() //溢写排序,MapTask1505行,进入
    sorter.sort()   QuickSort //溢写排序方法,MapTask1625行,进入
mergeParts(); //合并文件,MapTask1527行,进入

collector.close();
=================== ReduceTask ===================
if (isMapOrReduce())  //reduceTask324行,提前打断点
initialize()   // reduceTask333行,进入
init(shuffleContext);  // reduceTask375行,走到这需要先给下面的打断点
        totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl第120行,提前打断点
         merger = createMergeManager(context); //合并方法,Shuffle第80行
            // MergeManagerImpl第232 235行,提前打断点
            this.inMemoryMerger = createInMemoryMerger(); //内存合并
            this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
        eventFetcher.start();  //开始抓取数据,Shuffle第107行,提前打断点
        eventFetcher.shutDown();  //抓取结束,Shuffle第141行,提前打断点
        copyPhase.complete();   //copy阶段完成,Shuffle第151行
        taskStatus.setPhase(TaskStatus.Phase.SORT);  //开始排序阶段,Shuffle第152行
    sortPhase.complete();   //排序阶段完成,即将进入reduce阶段 reduceTask382行
reduce();  //reduce阶段调用的就是我们自定义的reduce方法,会被调用多次
    cleanup(context); //reduce完成之前,会最后调用一次Reducer里面的cleanup方法

3 Shuffle机制

3.1 Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

1638353745842

3.2 Partition分区

1638353755493

1638353813686

1638353821140

3.3 Partition分区案例实操

1)需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据

(2)期望输出数据
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
2)需求分析

1638353840886

3)在案例2.4的基础上,增加一个分区类


package com.atguigu.mapreduce.flowsum;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> &#123;
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) &#123;

        // 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);

        int partition = 4;

        // 2 判断是哪个省
        if ("136".equals(preNum)) &#123;
            partition = 0;
        &#125;else if ("137".equals(preNum)) &#123;
            partition = 1;
        &#125;else if ("138".equals(preNum)) &#123;
            partition = 2;
        &#125;else if ("139".equals(preNum)) &#123;
            partition = 3;
        &#125;

        return partition;
    &#125;
&#125;

4)在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.atguigu.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver &#123;

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException &#123;

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]&#123;"e:/output1","e:/output2"&#125;;

        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 8 指定自定义数据分区
        job.setPartitionerClass(ProvincePartitioner.class);

        // 9 同时指定相应数量的reduce task
        job.setNumReduceTasks(5);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    &#125;
&#125;

3.4 WritableComparable排序

排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均会对数据按照key进行排序。该操作属于hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值厚,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序

对于ReduceTask,它从每个MapTask上原创拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储到内存中。如果磁盘上文件数目达到一定阈值,则进行一次归排序以生产一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

自定义排序WritableComparable原理分析

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override
public int compareTo(FlowBean bean) &#123;
  int result;

  // 按照总流量大小,倒序排列
  if (this.sumFlow > bean.getSumFlow()) &#123;
​    result = -1;
  &#125;else if (this.sumFlow < bean.getSumFlow()) &#123;
​    result = 1;
  &#125;else &#123;
​    result = 0;

  &#125;
  return result;
&#125;

3.5 WritableComparable排序案例实操(全排序)

1)需求
根据案例2.3序列化案例产生的结果再次对总流量进行倒序排序。
(1)输入数据
原始数据 第一次处理后的数据

(2)期望输出数据
13509468723 7335 110349 117684
13736230513 2481 24681 27162
13956435636 132 1512 1644
13846544121 264 0 264
。。。 。。。
2)需求分析

1638354454570

3)代码实现
(1)FlowBean对象在在需求1基础上增加了比较功能

package com.atguigu.mapreduce.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> &#123;

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() &#123;
        super();
    &#125;

    public FlowBean(long upFlow, long downFlow) &#123;
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    &#125;

    public void set(long upFlow, long downFlow) &#123;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    &#125;

    public long getSumFlow() &#123;
        return sumFlow;
    &#125;

    public void setSumFlow(long sumFlow) &#123;
        this.sumFlow = sumFlow;
    &#125;    

    public long getUpFlow() &#123;
        return upFlow;
    &#125;

    public void setUpFlow(long upFlow) &#123;
        this.upFlow = upFlow;
    &#125;

    public long getDownFlow() &#123;
        return downFlow;
    &#125;

    public void setDownFlow(long downFlow) &#123;
        this.downFlow = downFlow;
    &#125;

    /**
     * 序列化方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException &#123;
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    &#125;

    /**
     * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException &#123;
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    &#125;

    @Override
    public String toString() &#123;
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    &#125;

    @Override
    public int compareTo(FlowBean bean) &#123;

        int result;

        // 按照总流量大小,倒序排列
        if (sumFlow > bean.getSumFlow()) &#123;
            result = -1;
        &#125;else if (sumFlow < bean.getSumFlow()) &#123;
            result = 1;
        &#125;else &#123;
            result = 0;
        &#125;

        return result;
    &#125;
&#125;

(2)编写Mapper类

package com.atguigu.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>&#123;

    FlowBean bean = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException &#123;

        // 1 获取一行
        String line = value.toString();

        // 2 截取
        String[] fields = line.split("\t");

        // 3 封装对象
        String phoneNbr = fields[0];
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);

        bean.set(upFlow, downFlow);
        v.set(phoneNbr);

        // 4 输出
        context.write(bean, v);
    &#125;
&#125;

(3)编写Reducer类

package com.atguigu.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>&#123;

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context)    throws IOException, InterruptedException &#123;

        // 循环输出,避免总流量相同情况
        for (Text text : values) &#123;
            context.write(text, key);
        &#125;
    &#125;
&#125;

(4)编写Driver类

package com.atguigu.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSortDriver &#123;

    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException &#123;

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]&#123;"e:/output1","e:/output2"&#125;;

        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCountSortDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    &#125;
&#125;

3.6 WritableComparable排序案例实操(区内排序)

1)需求
要求每个省份手机号输出的文件中按照总流量内部排序。
2)需求分析
基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

1638354490441

3)案例实操
(1)增加自定义分区类

package com.atguigu.mapreduce.sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> &#123;

    @Override
    public int getPartition(FlowBean key, Text value, int numPartitions) &#123;

        // 1 获取手机号码前三位
        String preNum = value.toString().substring(0, 3);

        int partition = 4;

        // 2 根据手机号归属地设置分区
        if ("136".equals(preNum)) &#123;
            partition = 0;
        &#125;else if ("137".equals(preNum)) &#123;
            partition = 1;
        &#125;else if ("138".equals(preNum)) &#123;
            partition = 2;
        &#125;else if ("139".equals(preNum)) &#123;
            partition = 3;
        &#125;

        return partition;
    &#125;
&#125;

(2)在驱动类中添加分区类

// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);

// 设置Reducetask个数
job.setNumReduceTasks(5);

3.7 Combiner合并

1638354530636

自定义Combiner实现步骤:
(a)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>&#123;

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException &#123;

        // 1 汇总操作
        int count = 0;
        for(IntWritable v :values)&#123;
            count += v.get();
        &#125;

        // 2 写出
        context.write(key, new IntWritable(count));
    &#125;
&#125;

(b)在Job驱动类中设置:

job.setCombinerClass(WordcountCombiner.class);

3.8 Combiner合并案例实操

1)需求
统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
(1)数据输入

(2)期望输出数据
期望:Combine输入数据多,输出时经过合并,输出数据降低。
2)需求分析

对每一个MapTask的输出进行局部汇总

1638354559335

3)案例实操-方案一
(1)增加一个WordcountCombiner类继承Reducer

package com.atguigu.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>&#123;

IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException &#123;

        // 1 汇总
        int sum = 0;

        for(IntWritable value :values)&#123;
            sum += value.get();
        &#125;

        v.set(sum);

        // 2 写出
        context.write(key, v);
    &#125;
&#125;

(2)在WordcountDriver驱动类中指定Combiner
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);
4)案例实操-方案二
(1)将WordcountReducer作为Combiner在WordcountDriver驱动类中指定
// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordcountReducer.class);
运行程序,如下图所示

1638354590504

4 MapTask工作机制

1638354667388

​ (1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。

​ (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

​ (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

​ (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:

​ 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

​ 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

​ 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

​ (5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

​ 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

​ 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

​ 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

5 ReduceTask工作机制

1638354702967

​ (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

​ (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

​ (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

​ (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

1)设置ReduceTask并行度(个数)

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

2)实验:测试ReduceTask多少合适

(1)实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G

(2)实验结论:

表 改变ReduceTask (数据量为1GB)

MapTask =16
ReduceTask151015162025304560
总时间8921461109288100128101145104

3)注意事项

1638354728399

6 OutputFormat数据输出

6.1 OutputFormat接口实现类

1638354781642

6.2 自定义OutputFormat

1638354799834

6.3 自定义OutputFormat案例实操

1)需求

​ 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

1638354813110

(2)期望输出数据

1638354808990

2)需求分析

1638354826027

3)案例实操

(1)编写FilterMapper类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> &#123;
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException &#123;
        //不做任何处理,直接写出一行log数据
        context.write(value,NullWritable.get());
    &#125;
&#125;

(2)编写FilterReducer类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer<Text, NullWritable,Text, NullWritable> &#123;
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException &#123;
        //防止有相同的数据,迭代写出
        for (NullWritable value : values) &#123;
            context.write(key,NullWritable.get());
        &#125;
    &#125;
&#125;

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> &#123;
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException &#123;
        //创建一个自定义的RecordWriter返回
        LogRecordWriter logRecordWriter = new LogRecordWriter(job);
        return logRecordWriter;
    &#125;
&#125;

(4)编写RecordWriter类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter<Text, NullWritable> &#123;

    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) &#123;
        try &#123;
            //获取文件系统对象
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //用文件系统对象创建两个输出流对应不同的目录
            atguiguOut = fs.create(new Path("d:/hadoop/atguigu.txt"));
            otherOut = fs.create(new Path("d:/hadoop/other.txt"));
        &#125; catch (IOException e) &#123;
            e.printStackTrace();
        &#125;
    &#125;

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException &#123;
        String log = key.toString();
        //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
        if (log.contains("atguigu")) &#123;
            atguiguOut.writeBytes(log + "\n");
        &#125; else &#123;
            otherOut.writeBytes(log + "\n");
        &#125;
    &#125;

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException &#123;
        //关流
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    &#125;
&#125;

(5)编写FilterDriver类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver &#123;
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException &#123;

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input"));
        //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
        //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    &#125;
&#125;

Hadoop 教程(二)安装hadoop集群-完全分布式部署及API使用

1 前言

本文来搭建hadoop集群,准备三台服务器,分别为hadoop102,hadoop103,hadoop104.其中hadoop 采用3.1.3版本,jdk 采用1.8.0_212 。

2 准备工作

2.1 映射

为了方便直接通过主机名去访问,下面进行映射

1)修改克隆机主机名,以下以hadoop102举例说明

(1)修改主机名称,:修改/etc/hostname文件

[root@hadoop100 ~]# vim /etc/hostname
hadoop102

(2)配置linux克隆机主机名称映射hosts文件,打开/etc/hosts

[root@hadoop100 ~]# vim /etc/hosts
192.168.1.102 hadoop102
192.168.1.103 hadoop103
192.168.1.104 hadoop104

2)重启hadoop102

[root@hadoop100 ~]# reboot

3)修改windows的主机映射文件(hosts文件)

操作系统是window10,先拷贝出来,修改保存以后,再覆盖即可

(a)进入C:\Windows\System32\drivers\etc路径

(b)拷贝hosts文件到桌面

(c)打开桌面hosts文件并添加如下内容

192.168.1.102 hadoop102
192.168.1.103 hadoop103
192.168.1.104 hadoop104

(d)将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件

2.2 安装JDK

1)在Linux系统下的opt目录中下载软件包

[root@hadoop102 ~]$ ls /opt/software/
jdk-8u212-linux-x64.tar.gz

2)解压JDK到/opt/module目录下

[root@hadoop102 software]$ tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

3)配置JDK环境变量

​ (1)新建/etc/profile.d/my_env.sh文件

[root@hadoop102 ~]$ sudo vim /etc/profile.d/my_env.sh

添加如下内容

#JAVA_HOME

export JAVA_HOME=/opt/module/jdk1.8.0_212

export PATH=$PATH:$JAVA_HOME/bin

​ (2)保存后退出:wq

​ (3)source一下/etc/profile文件,让新的环境变量PATH生效

[root@hadoop102 ~]$ source /etc/profile

4)测试JDK是否安装成功

[root@hadoop102 ~]$ java -version

如果能看到以下结果,则代表Java安装成功。

java version “1.8.0_212”

注意:重启(如果java -version可以用就不用重启)

[root@hadoop102 ~]$ sudo reboot

2.3 SSH免密码登录

免密登录原理如下图所示:

1636709256729

具体操作如下:

1)生成公钥和私钥:

[root@hadoop102 .ssh]$ ssh-keygen -t rsa

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)

2)将公钥拷贝到要免密登录的目标机器上

[root@hadoop102 .ssh]$ ssh-copy-id hadoop102

[root@hadoop102 .ssh]$ ssh-copy-id hadoop103

[root@hadoop102 .ssh]$ ssh-copy-id hadoop104

这样hadoop102登录到hadoop103和hadoop104就不需要输入密码了。可以相互登录 还需要在hadoop103和hadoop104上做同样的操作。

2.4 编写集群分发脚本

为了在集群中各个主机中文件拷贝方便,我们可以写个脚本用于三台主机中分发文件。

(1)需求:循环复制文件到所有节点的相同目录下

(2)需求分析:

(a)rsync命令原始拷贝:

rsync -av   /opt/module     root@hadoop103:/opt/

(b)期望脚本:

xsync要同步的文件名称

(c)说明:在/home/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行。

(3)脚本实现

(a)在/home/root/bin目录下创建xsync文件

[root@hadoop102 opt]$ cd /home/root
[root@hadoop102 ~]$ mkdir bin
[root@hadoop102 ~]$ cd bin
[root@hadoop102 bin]$ vim xsync

在该文件中编写如下代码

#!/bin/bash
\#1. 判断参数个数
if [ $# -lt 1 ]
then
 echo Not Enough Arguement!
 exit;
fi
\#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
 echo ==================== $host ====================
 \#3. 遍历所有目录,挨个发送
 for file in $@
 do
  \#4. 判断文件是否存在
  if [ -e $file ]
  then
   \#5. 获取父目录
   pdir=$(cd -P $(dirname $file); pwd)
   \#6. 获取当前文件的名称
   fname=$(basename $file)
   ssh $host "mkdir -p $pdir"
   rsync -av $pdir/$fname $host:$pdir
  else
   echo $file does not exists!
  fi
 done
done

(b)修改脚本 xsync 具有执行权限

[root@hadoop102 bin]$ chmod +x xsync

(c)将脚本复制到/bin中,以便全局调用

[root@hadoop102 bin]$ sudo cp xsync /bin/

(d)测试脚本

[root@hadoop102 ~]$ xsync /home/root/bin
[root@hadoop102 bin]$ sudo xsync /bin/xsync

3 安装hadoop

Hadoop下载地址:https://archive.apache.org/dist/hadoop/common/hadoop-3.1.3/

1)下载hadoop并进入到Hadoop安装包路径下

[root@hadoop102 ~]$ cd /opt/software/

2)解压安装文件到/opt/module下面

[root@hadoop102 software]$ tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

3)查看是否解压成功

[root@hadoop102 software]$ ls /opt/module/
hadoop-3.1.3

4)将Hadoop添加到环境变量

​ (1)获取Hadoop安装路径

[root@hadoop102 hadoop-3.1.3]$ pwd
/opt/module/hadoop-3.1.3

​ (2)打开/etc/profile.d/my_env.sh文件

sudo vim /etc/profile.d/my_env.sh

在my_env.sh文件末尾添加如下内容:(shift+g)

#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

(3)保存后退出:wq

(4)让修改后的文件生效

[root@hadoop102 hadoop-3.1.3]$ source /etc/profile

5)测试是否安装成功

[root@hadoop102 hadoop-3.1.3]$ hadoop version
Hadoop 3.1.3

6)重启(如果Hadoop命令不能用再重启)

[root@hadoop102 hadoop-3.1.3]$ sync
[root@hadoop102 hadoop-3.1.3]$ sudo reboot

4 Hadoop运行模式启动

Hadoop运行模式包括:本地模式、伪分布式模式以及完全分布式模式。本地允许模式很简单,公司用的大部分是完全分布式模式。

Hadoop官方网站:http://hadoop.apache.org/

4.1 本地运行模式

下面展示hadoop本地运行模式,并成功计算一个wordcout功能

1)创建在hadoop-3.1.3文件下面创建一个wcinput文件夹

[root@hadoop102 hadoop-3.1.3]$ mkdir wcinpu

2)在wcinput文件下创建一个word.txt文件

[root@hadoop102 hadoop-3.1.3]$ cd wcinput

3)编辑word.txt文件

[root@hadoop102 wcinput]$ vim word.txt
hadoop yarn
hadoop mapreduce
root
root

保存退出::wq

4)回到Hadoop目录/opt/module/hadoop-3.1.3

5)执行程序

[root@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount wcinput wcoutput

6)查看结果

[root@hadoop102 hadoop-3.1.3]$ cat wcoutput/part-r-00000
root 2
hadoop 2
mapreduce    1
yarn  1

4.2 完全分布式模式

4.2.1 集群规划

准备三台机器,分别安装HDFS和yarn。

hadoop102hadoop103hadoop104
HDFSNameNode DataNodeDataNodeSecondaryNameNode DataNode
YARNNodeManagerResourceManager NodeManagerNodeManager

注意:NameNode和SecondaryNameNode不要安装在同一台服务器

注意:ResourceManager也很消耗内存,不要和NameNode、SecondaryNameNode配置在同一台机器上。

4.2.2 配置文件说明

Hadoop配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值。

(1)默认配置文件:

要获取的默认文件文件存放在Hadoop的jar包中的位置
[core-default.xml]hadoop-common-3.1.3.jar/ core-default.xml
[hdfs-default.xml]hadoop-hdfs-3.1.3.jar/ hdfs-default.xml
[yarn-default.xml]hadoop-yarn-common-3.1.3.jar/ yarn-default.xml
[mapred-default.xml]hadoop-mapreduce-client-core-3.1.3.jar/ mapred-default.xml

2)自定义配置文件:

core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml四个配置文件存放在$HADOOP_HOME/etc/hadoop这个路径上,用户可以根据项目需求重新进行修改配置。

(3)常用端口号说明

DaemonAppHadoop2Hadoop3
NameNode PortHadoop HDFS NameNode8020 / 90009820
Hadoop HDFS NameNode HTTP UI500709870
Secondary NameNode PortSecondary NameNode500919869
Secondary NameNode HTTP UI500909868
DataNode PortHadoop HDFS DataNode IPC500209867
Hadoop HDFS DataNode500109866
Hadoop HDFS DataNode HTTP UI500759864

4.2.3 配置集群

(1)核心配置文件:配置core-site.xml

[root@hadoop102 ~]$ cd $HADOOP_HOME/etc/hadoop
[root@hadoop102 hadoop]$ vim core-site.xml

文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <configuration>
  <!-- 指定NameNode的地址 -->
  <property><name>fs.defaultFS</name><value>hdfs://hadoop102:9820</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
  <property><name>hadoop.tmp.dir</name><value>/opt/module/hadoop-3.1.3/data</value>
</property>
<!-- 配HDFS网页登录使用的静态用户为root -->
  <property><name>hadoop.http.staticuser.user</name><value>root</value>
</property>
<!-- 配置该root(superUser)允许通过代理访问的主机节点 -->
  <property><name>hadoop.proxyuser.root.hosts</name><value>*</value>
</property>
<!-- 配置该root(superUser)允许通过代理用户所属组 -->
  <property><name>hadoop.proxyuser.root.groups</name><value>*</value>
</property>
<!-- 配置该root(superUser)允许通过代理的用户-->
  <property><name>hadoop.proxyuser.root.groups</name><value>*</value>
</property>
</configuration>

(2)HDFS配置文件:配置hdfs-site.xml

[root@hadoop102 hadoop]$ vim hdfs-site.xml

文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <configuration>
  <!-- nn web端访问地址-->
  <property><name>dfs.namenode.http-address</name><value>hadoop102:9870</value>
  </property>
  <!-- 2nn web端访问地址-->
  <property><name>dfs.namenode.secondary.http-address</name><value>hadoop104:9868</value>
  </property>
</configuration>

(3)YARN配置文件:配置yarn-site.xml

[root@hadoop102 hadoop]$ vim yarn-site.xml

文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <!-- 指定MR走shuffle -->
  <property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value>
</property>
<!-- 指定ResourceManager的地址-->
  <property><name>yarn.resourcemanager.hostname</name><value>hadoop103</value>
</property>
<!-- 环境变量的继承 -->
  <property><name>yarn.nodemanager.env-whitelist</name>
  <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<!-- yarn容器允许分配的最大最小内存 -->
  <property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value>
  </property>
  <property><name>yarn.scheduler.maximum-allocation-mb</name><value>4096</value>
</property>
<!-- yarn容器允许管理的物理内存大小 -->
  <property><name>yarn.nodemanager.resource.memory-mb</name><value>4096</value>
</property>
<!-- 关闭yarn对物理内存和虚拟内存的限制检查 -->
  <property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value>
  </property>
  <property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value>
  </property>
</configuration>

(4)MapReduce配置文件:配置mapred-site.xml

[root@hadoop102 hadoop]$ vim mapred-site.xml

文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <!-- 指定MapReduce程序运行在Yarn上 -->
  <property><name>mapreduce.framework.name</name><value>yarn</value>
  </property>
</configuration>

4)在集群上分发配置好的Hadoop配置文件,将配置文件同步到hadoop103和hadoop104

[root@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc/hadoop/

5)去103和104上查看文件分发情况

[root@hadoop103 ~]$ cat /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
[root@hadoop104 ~]$ cat /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml

4.2.4 启动集群

1)配置workers

[root@hadoop102 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workers

在该文件中增加如下内容:

hadoop102
hadoop103
hadoop104

注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。

同步所有节点配置文件

[root@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc

2)启动集群

​ (1)如果集群是第一次启动,需要在hadoop102节点格式化NameNode(注意格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化。)

[root@hadoop102 ~]$ hdfs namenode -format

(2)启动HDFS

[root@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

(3)在配置了ResourceManager的节点(hadoop103)启动YARN

[root@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

(4)Web端查看HDFS的NameNode

​ (a)浏览器中输入:http://hadoop102:9870

​ (b)查看HDFS上存储的数据信息

(5)Web端查看YARN的ResourceManager

​ (a)浏览器中输入:http://hadoop103:8088

​ (b)查看YARN上运行的Job信息

4.2.5 集群基本测试

HDFS相当于一个文件存储框架,搭好集群后,可以在集群去对文件进行操作,上传,下载,删除,查看等。

(1)上传文件到集群

​ 上传小文件

[root@hadoop102 ~]$ hadoop fs -mkdir /input
[root@hadoop102 ~]$ hadoop fs -put $HADOOP_HOME/wcinput/word.txt /input

​ 上传大文件

[root@hadoop102 ~]$ hadoop fs -put  /opt/software/jdk-8u212-linux-x64.tar.gz  /

(2)上传文件后查看文件存放在什么位置

(a)查看HDFS文件存储路径

[root@hadoop102 subdir0]$ pwd
/opt/module/hadoop-3.1.3/data/dfs/data/current/BP-938951106-192.168.10.107-1495462844069/current/finalized/subdir0/subdir0

(b)查看HDFS在磁盘存储文件内容

[root@hadoop102 subdir0]$ cat blk_1073741825
hadoop yarn
hadoop mapreduce 
root
root

(3)下载

[root@hadoop104 software]$ hadoop fs -get /jdk-8u212-linux-x64.tar.gz ./

(4)执行wordcount程序

[root@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output

4.2.6 集群启动/停止方式总结

1)各个服务组件逐一启动/停止

​ (1)分别启动/停止HDFS组件

hdfs --daemon start/stop namenode/datanode/secondarynamenode

​ (2)启动/停止YARN

yarn --daemon start/stop resourcemanager/nodemanager

2)各个模块分开启动/停止(配置ssh是前提)常用

​ (1)整体启动/停止HDFS

start-dfs.sh/stop-dfs.sh

​ (2)整体启动/停止YARN

start-yarn.sh/stop-yarn.sh

4.2.7 编写hadoop集群常用脚本

(1)查看三台服务器java进程脚本:jpsall

[root@hadoop102 ~]$ cd /home/root/bin
[root@hadoop102 ~]$ vim jpsall

然后输入

#!/bin/bash
for host in hadoop102 hadoop103 hadoop104
doecho =============== $host ===============ssh $host jps $@ | grep -v Jps
done

保存后退出,然后赋予脚本执行权限

[root@hadoop102 bin]$ chmod +x jpsall

(2)hadoop集群启停脚本(包含hdfs,yarn,historyserver):myhadoop.sh

[root@hadoop102 ~]$ cd /home/root/bin
[root@hadoop102 ~]$ vim myhadoop.sh

然后输入

#!/bin/bash

if [ $# -lt 1 ]
then
  echo "No Args Input..."
  exit ;
fi
case $1 in
"start")echo " =================== 启动 hadoop集群 ==================
​    echo " --------------- 启动 hdfs ---------------"
​    ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
​    echo " --------------- 启动 yarn ---------------"
​    ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
​    echo " --------------- 启动 historyserver ---------------"
​    ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;

"stop")
​    echo " =================== 关闭 hadoop集群 ===================echo " --------------- 关闭 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"echo " --------------- 关闭 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"echo " --------------- 关闭 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"

;;

*)
  echo "Input Args Error..."

;;
esac

保存后退出,然后赋予脚本执行权限

[root@hadoop102 bin]$ chmod +x myhadoop.sh

3)分发/home/root/bin目录,保证自定义脚本在三台机器上都可以使用

[root@hadoop102 ~]$ xsync /home/root/bin/

5 hdfs常用shell操作

5.1 基本语法

hadoop fs 具体命令 OR hdfs dfs 具体命令

两个是完全相同的。

5.2 命令大全

查看所有命令

[root@hadoop102 hadoop-3.1.3]$ bin/hadoop fs

5.3 常用命令实操

5.3.1 准备工作

1)启动Hadoop集群(方便后续的测试)

[root@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[root@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

2)-help:输出这个命令参数

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -help rm

5.3.2 上传

1)-moveFromLocal:从本地剪切粘贴到HDFS

[root@hadoop102 hadoop-3.1.3]$ touch kongming.txt
[root@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./kongming.txt /sanguo/shuguo

2)-copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -copyFromLocal README.txt /

3)-appendToFile:追加一个文件到已经存在的文件末尾

[root@hadoop102 hadoop-3.1.3]$ touch liubei.txt
[root@hadoop102 hadoop-3.1.3]$ vi liubei.txt
san gu mao lu
[root@hadoop102 hadoop-3.1.3]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo/kongming.txt

4)-put:等同于copyFromLocal

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -put ./liubei.txt /user/root/test/

5.3.3 下载

1)-copyToLocal:从HDFS拷贝到本地

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -copyToLocal /sanguo/shuguo/kongming.txt ./

2)-get:等同于copyToLocal,就是从HDFS下载文件到本地

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -get /sanguo/shuguo/kongming.txt ./

3)-getmerge:合并下载多个文件,比如HDFS的目录 /user/root/test下有多个文件:log.1, log.2,log.3,…

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -getmerge /user/root/test/* ./zaiyiqi.txt

5.3.4 HDFS直接操作

1)-ls: 显示目录信息

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -ls /

2)-mkdir:在HDFS上创建目录

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir -p /sanguo/shuguo

3)-cat:显示文件内容

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -cat /sanguo/shuguo/kongming.txt

4)-chgrp 、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -chmod 666 /sanguo/shuguo/kongming.txt
[root@hadoop102 hadoop-3.1.3]$ hadoop fs -chown root:root  /sanguo/shuguo/kongming.txt

5)-cp :从HDFS的一个路径拷贝到HDFS的另一个路径

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -cp /sanguo/shuguo/kongming.txt /zhuge.txt

6)-mv:在HDFS目录中移动文件

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /zhuge.txt /sanguo/shuguo/

7)-tail:显示一个文件的末尾1kb的数据

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -tail /sanguo/shuguo/kongming.txt

8)-rm:删除文件或文件夹

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -rm /user/root/test/jinlian2.txt

9)-rmdir:删除空目录

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /test
[root@hadoop102 hadoop-3.1.3]$ hadoop fs -rmdir /test

10)-du统计文件夹的大小信息

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -du -s -h /user/root/test

11)-setrep:设置HDFS中文件的副本数量

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -setrep 10 /sanguo/shuguo/kongming.txt

这里设置的副本数只是记录在NameNode的元数据中,是否真的会有这么多副本,还得看DataNode的数量。因为目前只有3台设备,最多也就3个副本,只有节点数的增加到10台时,副本数才能达到10。

6 hdfs的API操作

6.1 准备Windows关于Hadoop的开发环境

1)找到资料目录下的Windows依赖目录,打开:

选择Hadoop-3.1.0,拷贝到其他地方(比如d:)。
2)配置HADOOP_HOME环境变量。

3)配置Path环境变量。然后重启电脑

4)创建一个Maven工程HdfsClientDemo,并导入相应的依赖坐标+日志添加

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>
在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d&#123;yyyy-MM-dd HH:mm:ss&#125;][%c&#123;10&#125;]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>
</Configuration>

5)创建包名:com.atguigu.hdfs
6)创建HdfsClient类

public class HdfsClient&#123;    
@Test
public void testMkdirs() throws IOException, InterruptedException, URISyntaxException&#123;

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        // 配置在集群上运行
        // configuration.set("fs.defaultFS", "hdfs://hadoop102:9820");
        // FileSystem fs = FileSystem.get(configuration);

        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "atguigu");

        // 2 创建目录
        fs.mkdirs(new Path("/1108/daxian/banzhang"));

        // 3 关闭资源
        fs.close();
    &#125;
&#125;

7)执行程序
运行时需要配置用户名称

客户端去操作HDFS时,是有一个用户身份的。默认情况下,HDFS客户端API会从JVM中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=atguigu,atguigu为用户名称。

6.2 HDFS的API操作

6.2.1 HDFS文件上传(测试参数优先级)

1)编写源代码

@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException &#123;

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication", "2");
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "atguigu");

        // 2 上传文件
        fs.copyFromLocalFile(new Path("e:/banzhang.txt"), new Path("/banzhang.txt"));

        // 3 关闭资源
        fs.close();

        System.out.println("over");

2)将hdfs-site.xml拷贝到项目的根目录下

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
         <value>1</value>
    </property>
</configuration>

3)参数优先级
参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的自定义配置(xxx-site.xml) >(4)服务器的默认配置(xxx-default.xml)

6.2.2 HDFS文件下载

@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException&#123;

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "atguigu");

        // 2 执行下载操作
        // boolean delSrc 指是否将原文件删除
        // Path src 指要下载的文件路径
        // Path dst 指将文件下载到的路径
        // boolean useRawLocalFileSystem 是否开启文件校验
        fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("e:/banhua.txt"), true);

        // 3 关闭资源
        fs.close();
&#125;

6.2.3 HDFS删除文件和目录

@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException&#123;

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "atguigu");

    // 2 执行删除
    fs.delete(new Path("/0508/"), true);

    // 3 关闭资源
    fs.close();
&#125;

6.2.4 HDFS文件更名和移动

@Test
public void testRename() throws IOException, InterruptedException, URISyntaxException&#123;

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "atguigu"); 

    // 2 修改文件名称
    fs.rename(new Path("/banzhang.txt"), new Path("/banhua.txt"));

    // 3 关闭资源
    fs.close();
&#125;

6.2.5 HDFS文件详情查看

//查看文件名称、权限、长度、块信息
@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException&#123;

    // 1获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "atguigu"); 

    // 2 获取文件详情
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

    while(listFiles.hasNext())&#123;
        LocatedFileStatus status = listFiles.next();

        // 输出详情
        // 文件名称
        System.out.println(status.getPath().getName());
        // 长度
        System.out.println(status.getLen());
        // 权限
        System.out.println(status.getPermission());
        // 分组
        System.out.println(status.getGroup());

        // 获取存储的块信息
        BlockLocation[] blockLocations = status.getBlockLocations();

        for (BlockLocation blockLocation : blockLocations) &#123;

            // 获取块存储的主机节点
            String[] hosts = blockLocation.getHosts();

            for (String host : hosts) &#123;
                System.out.println(host);
            &#125;
        &#125;

        System.out.println("-----------班长的分割线----------");
    &#125;

// 3 关闭资源
fs.close();
&#125;

6.2.6 HDFS文件和文件夹判断

@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException&#123;


    // 1 获取文件配置信息
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "atguigu");

    // 2 判断是文件还是文件夹
    FileStatus[] listStatus = fs.listStatus(new Path("/"));

    for (FileStatus fileStatus : listStatus) &#123;

        // 如果是文件
        if (fileStatus.isFile()) &#123;
                System.out.println("f:"+fileStatus.getPath().getName());
            &#125;else &#123;
                System.out.println("d:"+fileStatus.getPath().getName());
            &#125;
        &#125;

    // 3 关闭资源
    fs.close();
&#125;

Hadoop 教程(一)hadoop介绍

1 hadoop是什么

Hadoop是一个由Apache基金会所开发的分布式系统基础架构。它提供了一个海量数据存储分析计算的能力。广义上来说Hadoop这个词代表了Hadoop生态圈。

Hadoop的核心是YARN,HDFS和Mapreduce。随着处理任务不同,各种组件相继出现,丰富Hadoop生态圈,目前生态圈结构大致如图所示 。

1636705530947

2. Hadoop的特点

  • 扩容能力(Scalable) 能可靠地(reliably)存储和处理千兆字节(PB)数据
  • 成本低(Economical) 可以通过普通机器组成的服务器集群来分发以及处理数据。这些服务器几圈总计可以达到千个节点。
  • 高效率(Efficient) 通过分发数据,hadoop 可以在数据所在的节点上并行的(parallel)处理它们,这使得处理非常快。
  • 可靠性(Reliable) hadoop 能自动地维护数据的多份副本,并且在任务失败后能自动重新部署(redeploy)计算任务

3 Hadoop三大发行版本

Hadoop三大发行版本:Apache、Cloudera、Hortonworks。

Apache版本最原始(最基础)的版本,对于入门学习最好。

Cloudera内部集成了很多大数据框架。对应产品CDH。

Hortonworks文档较好。对应产品HDP。后面被Cloudera收购

4 Hadoop组成

Hadoop的核心组件分为:HDFS(分布式文件系统)、MapRuduce(分布式运算编程框架)、YARN(运算资源调度系统) 。

1636706148747

4.1 HDFS

​ 整个Hadoop的体系结构主要是通过HDFS(Hadoop分布式文件系统)来实现对分布式存储的底层支持,并通过MR来实现对分布式并行任务处理的程序支持。HDFS是Hadoop体系中数据存储管理的基础

一个HDFS集群是由一个NameNode和若干个DataNode组成的。NameNodee存储元数据,作为主服务器,管理文件系统命名空间和客户端对文件的访问操作。DataNode存储数据,DataNode管理存储的数据。HDFS支持文件形式的数据。

1636706441346

4.2 YARN架构

上面我们说了 Hadoop2.x 中增加了 Yarn(资源调度),那资源调度是在调度什么呢?在计算机中资源就是CPU和内存,CPU和内存都是有上限的,所以需要分配给更需要的进程来使用。

1636706565758

ResourceManager(RM)就是资源管理者,外部的客户端提交作业请求都会先到 ResourceManager(RM),他代表了集群所有的资源,并监控 NodeManager、启动或监控ApplicationMaster。

NodeManager(NM) 只管理一个节点的资源,处理来自ResourceManager(RM)的命令和来自ApplicationMaster的命令。

ApplicationMaster(AM)负责数据的切分、为应用程序申请资源分配内部任务和任务的监控容错。当一个任务提交到 ResourceManager(RM)时就会选择一个节点启动一个ApplicationMaster(AM)来负责这个任务的跟进,也就是对这个任务的一个负责人。也就是说有一个作业任务就会有对应的一个ApplicationMaster(AM)来跟进这个作业任务的执行和调度。

Container 是对资源的一个抽象封装,里面会包含内存、CPU、磁盘、网络等资源,NodeManager(NM) 就是通过打开和关闭 Container 来调度资源的。

4.3 MapReduce架构概述

MapReduce是一种编程模型,用于大规模数据集的并行计算,需要将数据分配到大量的机器上计算,每台机器运行一个子计算任务,最后再合并每台机器运算结果并输出。 MapReduce 的思想就是 『分而治之』.

MapReduce将计算过程分为两个阶段:Map和Reduce

1)Map阶段并行处理输入数据

2)Reduce阶段对Map结果进行汇总

1636706761153

5 大数据技术生态体系

1636706809649

图中涉及的技术名词解释如下:

1)Sqoop:Sqoop是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库(MySql)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

2)Flume:Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;

3)Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统;

4)Spark:是当前最流行的开源大数据内存计算框架。可以基于Hadoop上存储的大数据进行计算。

5)Flink:Flink是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。

6)Oozie:Oozie是一个管理Hdoop作业(job)的工作流程调度管理系统。

7)Hbase:HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。

8)Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

9)ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。

参考文献:

https://cloud.tencent.com/developer/article/1661405

Hadoop 教程(六)yarn-架构解析

1 Yarn资源调度器

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。

1.1 Yarn基本架构

 YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成。

1638343400280

1.2 Yarn工作机制

1638343444066

(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。

1.3 作业提交全过程

作业提交工作机制

1638343485216

作业提交全过程详解

(1)作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

1638343533411

1.4 资源调度器

目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop3.1.3默认的资源调度器是Capacity Scheduler。
具体设置详见:yarn-default.xml文件

<property>
    <description>The class to use as the resource scheduler.</description>
    <name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

1.4.1 先进先出调度器(FIFO)

1638343565235

Hadoop最初设计目的是支持大数据批处理作业,如日志挖掘、Web索引等作业,
为此,Hadoop仅提供了一个非常简单的调度机制:FIFO,即先来先服务,在该调度机制下,所有作业被统一提交到一个队列中,Hadoop按照提交顺序依次运行这些作业。
但随着Hadoop的普及,单个Hadoop集群的用户量越来越大,不同用户提交的应用程序往往具有不同的服务质量要求,典型的应用有以下几种:
批处理作业:这种作业往往耗时较长,对时间完成一般没有严格要求,如数据挖掘、机器学习等方面的应用程序。
交互式作业:这种作业期望能及时返回结果,如SQL查询(Hive)等。
生产性作业:这种作业要求有一定量的资源保证,如统计值计算、垃圾数据分析等。
此外,这些应用程序对硬件资源需求量也是不同的,如过滤、统计类作业一般为CPU密集型作业,而数据挖掘、机器学习作业一般为I/O密集型作业。因此,简单的FIFO调度策略不仅不能满足多样化需求,也不能充分利用硬件资源。

1.4.2 容量调度器(Capacity Scheduler)

1638343621340

Capacity Scheduler Capacity Scheduler 是Yahoo开发的多用户调度器,它以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用。而当一个队列的资源有剩余时,可暂时将剩余资源共享给其他队列。
总之,Capacity Scheduler 主要有以下几个特点:
①容量保证。管理员可为每个队列设置资源最低保证和资源使用上限,而所有提交到该队列的应用程序共享这些资源。
②灵活性,如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新的应用程序提交,则其他队列借调的资源会归还给该队列。这种资源灵活分配的方式可明显提高资源利用率。
③多重租赁。支持多用户共享集群和多应用程序同时运行。为防止单个应用程序、用户或者队列独占集群中的资源,管理员可为之增加多重约束(比如单个应用程序同时运行的任务数等)。
④安全保证。每个队列有严格的ACL列表规定它的访问用户,每个用户可指定哪些用户允许查看自己应用程序的运行状态或者控制应用程序(比如杀死应用程序)。此外,管理员可指定队列管理员和集群系统管理员。
⑤动态更新配置文件。管理员可根据需要动态修改各种配置参数,以实现在线集群管理。

1.4.3 公平调度器(Fair Scheduler)(了解)

1638343637281

Fair Scheduler Fair Schedulere是Facebook开发的多用户调度器。
公平调度器的目的是让所有的作业随着时间的推移,都能平均地获取等同的共享资源。当有作业提交上来,系统会将空闲的资源分配给新的作业,每个任务大致上会获取平等数量的资源。和传统的调度策略不同的是它会让小的任务在合理的时间完成,同时不会让需要长时间运行的耗费大量资源的任务挨饿!
同Capacity Scheduler类似,它以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用;当一个队列的资源有剩余时,可暂时将剩余资源共享给其他队列。
当然,Fair Scheduler也存在很多与Capacity Scheduler不同之处,这主要体现在以下几个方面:
① 资源公平共享。在每个队列中,Fair Scheduler 可选择按照FIFO、Fair或DRF策略为应用程序分配资源。其中,
FIFO策略
公平调度器每个队列资源分配策略如果选择FIFO的话,就是禁用掉每个队列中的Task共享队列资源,此时公平调度器相当于上面讲过的容量调度器。
Fair策略
Fair 策略(默认)是一种基于最大最小公平算法实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。这意味着,如果一个队列中有两个应用程序同时运行,则每个应用程序可得到1/2的资源;如果三个应用程序同时运行,则每个应用程序可得到1/3的资源。

DRF策略
DRF(Dominant Resource Fairness),我们之前说的资源,都是单一标准,例如只考虑内存(也是yarn默认的情况)。但是很多时候我们资源有很多种,例如内存,CPU,网络带宽等,这样我们很难衡量两个应用应该分配的资源比例。
那么在YARN中,我们用DRF来决定如何调度:假设集群一共有100 CPU和10T 内存,而应用A需要(2 CPU, 300GB),应用B需要(6 CPU, 100GB)。则两个应用分别需要A(2%CPU, 3%内存)和B(6%CPU, 1%内存)的资源,这就意味着A是内存主导的, B是CPU主导的,针对这种情况,我们可以选择DRF策略对不同应用进行不同资源(CPU和内存)的一个不同比例的限制。
②支持资源抢占。当某个队列中有剩余资源时,调度器会将这些资源共享给其他队列,而当该队列中有新的应用程序提交时,调度器要为它回收资源。为了尽可能降低不必要的计算浪费,调度器采用了先等待再强制回收的策略,即如果等待一段时间后尚有未归还的资源,则会进行资源抢占:从那些超额使用资源的队列中杀死一部分任务,进而释放资源。
yarn.scheduler.fair.preemption=true 通过该配置开启资源抢占。
③提高小应用程序响应时间。由于采用了最大最小公平算法,小作业可以快速获取资源并运行完成

2 容量调度器多队列提交案例

2.1 需求

​ Yarn默认的容量调度器是一条单队列的调度器,在实际使用中会出现单个任务阻塞整个队列的情况。同时,随着业务的增长,公司需要分业务限制集群使用率。这就需要我们按照业务种类配置多条任务队列。

2.2 配置多队列的容量调度器

默认Yarn的配置下,容量调度器只有一条Default队列。在capacity-scheduler.xml中可以配置多条队列,并降低default队列资源占比:

<!-- 指定多队列,增加hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,hive</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
</property>

<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
</property>

<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
</property>
同时为新加队列添加必要属性:
<!-- 指定hive队列的资源额定容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.capacity</name>
    <value>60</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    <value>1</value>
</property>

<!-- 指定hive队列的资源最大容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    <value></value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.state</name>
    <value>RUNNING</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    <value>*</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    <value>*</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    <value>*</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
    <value>-1</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
    <value>-1</value>
</property>

在配置完成后,重启Yarn或者执行yarn rmadmin -refreshQueues刷新队列,就可以看到两条队列:

1638343702545

在配置完成后,重启Yarn或者执行yarn rmadmin -refreshQueues刷新队列,就可以看到两条队列:

2.3 向Hive队列提交任务

​ 默认的任务提交都是提交到default队列的。如果希望向其他队列提交任务,需要在Driver中声明:

public class WcDrvier &#123;public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException &#123;
​        Configuration configuration = new Configuration();

​    configuration.set("mapreduce.job.queuename","hive");//1. 获取一个Job实例
​    Job job = Job.getInstance(configuration);//2. 设置类路径
​    job.setJarByClass(WcDrvier.class);//3. 设置Mapper和Reducer
​    job.setMapperClass(WcMapper.class);
​    job.setReducerClass(WcReducer.class);//4. 设置Mapper和Reducer的输出类型
​    job.setMapOutputKeyClass(Text.class);
​    job.setMapOutputValueClass(IntWritable.class);

​    job.setOutputKeyClass(Text.class);
​    job.setOutputValueClass(IntWritable.class);

​    job.setCombinerClass(WcReducer.class);//5. 设置输入输出文件
​    FileInputFormat.setInputPaths(job, new Path(args[0]));
​    FileOutputFormat.setOutputPath(job, new Path(args[1]));//6. 提交Jobboolean b = job.waitForCompletion(true);
​    System.exit(b ? 0 : 1);
&#125;

&#125;

这样,这个任务在集群提交时,就会提交到hive队列:

这样,这个任务在集群提交时,就会提交到hive队列:

1638343722576

Hadoop 教程(四)mapreduce介绍

第1章 MapReduce概述

1.1 MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。

1.2 MapReduce优缺点

1.2.1 优点

1)MapReduce 易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

2)良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

4)适合PB级以上海量数据的离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

1.2.2 缺点

1)不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

2)不擅长流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

3)不擅长DAG(有向无环图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

1.3 MapReduce核心思想

1638342393148

(1)分布式的运算程序往往需要分成至少2个阶段。

(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。

(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。

(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

总结:分析WordCount数据流走向深入理解MapReduce核心思想。

1.4 MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责整个程序的过程调度及状态协调。

(2)MapTask:负责Map阶段的整个数据处理流程。

(3)ReduceTask:负责Reduce阶段的整个数据处理流程。

1.5 官方WordCount源码

采用反编译工具反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是Hadoop自身封装的序列化类型。

1.6 常用数据序列化类型

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

1.7 MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

1.7.1 Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据是KV对的形式

(3)Mapper中的业务逻辑编写在map()方法中

(4)Mapper的输出数据是KV对的形式

(5)map()方法(MapTask进程)对每一个<K,V>调用一次

1.7.2 Reducer阶段

(1)用户自定义的Reducer要继承自己的父类

(2)Reducer的输入数据是Mapper的输出数据类型

(3)Reducer中的业务逻辑编写在reduce()方法中

(4)Mapper的输出数据是KV对的形式

(5)ReduceTask进程对每一组仙童的<K,V>调用reduce()一次

1.7.3 Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN,提交的是封装了MapReduce程序相关运行参数的job对象。

1.8 WordCount案例实操

1)需求

在给定的文本文件中统计输出每一个单词出现的总次数

(1)输入数据

1638342710639

(2)期望输出数据

molly 2

banzhang 1

cls 2

hadoop 1

jiao 1

ss 2

xue 1

2)需求分析

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

1638342700100
3)环境准备
(1)创建maven工程
(2)在pom.xml文件中添加如下依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d&#123;yyyy-MM-dd HH:mm:ss&#125;][%c&#123;10&#125;]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>

</Configuration>

4)编写程序
(1)编写Mapper类

package com.molly.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>&#123;

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException &#123;

        // 1 获取一行
        String line = value.toString();

        // 2 切割
        String[] words = line.split(" ");

        // 3 输出
        for (String word : words) &#123;

            k.set(word);
            context.write(k, v);
        &#125;
    &#125;
&#125;

(2)编写Reducer类

package com.molly.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>&#123;

int sum;
IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException &#123;

        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) &#123;
            sum += count.get();
        &#125;

        // 2 输出
         v.set(sum);
        context.write(key,v);
    &#125;
&#125;

(3)编写Driver驱动类

package com.molly.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver &#123;

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException &#123;

        // 1 获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 关联本Driver程序的jar
        job.setJarByClass(WordcountDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    &#125;
&#125;

5)本地测试
(1)需要首先配置好HADOOP_HOME变量以及Windows运行依赖
(2)在IDEA/Eclipse上运行程序
6)集群上测试
(0)用maven打jar包,需要添加的打包插件依赖

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意:如果工程上显示红叉。在项目上右键->maven->Reimport即可。
(1)将程序打成jar包,然后拷贝到Hadoop集群中
步骤详情:右键->Run as->maven install。等待编译完成就会在项目的target文件夹中生成jar包。如果看不到。在项目上右键->Refresh,即可看到。修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群。
(2)启动Hadoop集群
(3)执行WordCount程序

[molly@hadoop102 software]$ hadoop jar  wc.jar
 com.molly.wordcount.WordcountDriver /user/molly/input /user/molly/output

7)在Windows上向集群提交任务
(1)添加必要配置信息

public class WordcountDriver &#123;
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException &#123;

        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        //设置HDFS NameNode的地址
       configuration.set("fs.defaultFS", "hdfs://hadoop102:9820");
        // 指定MapReduce运行在Yarn上
       configuration.set("mapreduce.framework.name","yarn");
        // 指定mapreduce可以在远程集群运行
       configuration.set("mapreduce.app-submission.cross-platform","true");
        //指定Yarn resourcemanager的位置
    configuration.set("yarn.resourcemanager.hostname","hadoop103");

        Job job = Job.getInstance(configuration);

        // 2 设置jar加载路径
    job.setJar("F:\\idea_project\\main\\bigdata1214\\MapReduce\\target\\MapReduce-1.0-SNAPSHOT.jar");

        // 3 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    &#125;
&#125;

(2)编辑任务配置
1)检查第一个参数Main class是不是我们要运行的类的全类名,如果不是的话一定要修改!
2)在VM options后面加上 :-DHADOOP_USER_NAME=molly
3)在Program arguments后面加上两个参数分别代表输入输出路径,两个参数之间用空格隔开。如:hdfs://hadoop102:9820/input hdfs://hadoop102:9820/output

(3)打包,并将Jar包设置到Driver中

public class WordcountDriver &#123;

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException &#123;

        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();

       configuration.set("fs.defaultFS", "hdfs://hadoop102:9820");
       configuration.set("mapreduce.framework.name","yarn");
       configuration.set("mapreduce.app-submission.cross-platform","true");
       configuration.set("yarn.resourcemanager.hostname","hadoop103");

        Job job = Job.getInstance(configuration);

        // 2 设置jar加载路径
//job.setJarByClass(WordCountDriver.class);
        job.setJar("D:\IdeaProjects\mapreduce\target\mapreduce-1.0-SNAPSHOT.jar");

        // 3 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    &#125;
&#125;

(4)提交并查看结果

Hadoop 教程(三)hdfs-架构解析

1 HDFS的数据流

1.1 HDFS写数据流程

1.1.1 剖析文件写入

1638341373066

(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
(2)NameNode返回是否可以上传。
(3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
(4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
(6)dn1、dn2、dn3逐级应答客户端。
(7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
(8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
源码解析:org.apache.hadoop.hdfs.DFSOutputStream

1.1.2 网络拓扑-节点距离计算

​ 在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和。

1638341407745

例如,假设有数据中心d1机架r1中的节点n1。该节点可以表示为/d1/r1/n1。利用这种标记,这里给出四种距离描述。
大家算一算每两个节点之间的距离。

1638341416287

1.1.3 机架感知(副本存储节点选择)

1)官方IP地址
机架感知说明
http://hadoop.apache.org/docs/r3.1.3/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#Data_Replication
For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.
2)Hadoop3.1.3副本节点选择

1638341452275

1.2 HDFS读数据流程

1638341472416

(1)客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

2 NameNode和SecondaryNameNode(面试开发重点)

2.1 NN和2NN工作机制

思考:NameNode中的元数据是存储在哪里的?
首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage
这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。
但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并

1638341524914

1)第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
2)第二阶段:Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。

NN和2NN工作机制详解:
Fsimage:NameNode内存中元数据序列化后形成的文件。
Edits:记录客户端更新元数据信息的每一步操作(可通过Edits运算出元数据)。
NameNode启动时,先滚动Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此时NameNode内存就持有最新的元数据信息。Client开始对NameNode发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress中(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中执行元数据的增删改的操作。
由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。
SecondaryNameNode首先会询问NameNode是否需要CheckPoint(触发CheckPoint需要满足两个条件中的任意一个,定时时间到和Edits中数据写满了)。直接带回NameNode是否检查结果。SecondaryNameNode执行CheckPoint操作,首先会让NameNode滚动Edits并生成一个空的edits.inprogress,滚动Edits的目的是给Edits打个标记,以后所有新的操作都写入edits.inprogress,其他未合并的Edits和Fsimage会拷贝到SecondaryNameNode的本地,然后将拷贝的Edits和Fsimage加载到内存中进行合并,生成fsimage.chkpoint,然后将fsimage.chkpoint拷贝给NameNode,重命名为Fsimage后替换掉原来的Fsimage。NameNode在启动时就只需要加载之前未合并的Edits和Fsimage即可,因为合并过的Edits中的元数据信息已经被记录在Fsimage中。

2.2 Fsimage和Edits解析

1638341553613

1)oiv查看Fsimage文件
(1)查看oiv和oev命令

(2)基本语法
hdfs oiv -p 文件类型 -i镜像文件 -o 转换后文件输出路径
(3)案例实操

[molly@hadoop102 current]$ pwd
/opt/module/hadoop-3.1.3/data/dfs/name/current

[molly@hadoop102 current]$ hdfs oiv -p XML -i fsimage_0000000000000000025 -o /opt/module/hadoop-3.1.3/fsimage.xml

[molly@hadoop102 current]$ cat /opt/module/hadoop-3.1.3/fsimage.xml

将显示的xml文件内容拷贝到IDEA中创建的xml文件中,并格式化。部分显示结果如下。

<inode>
    <id>16386</id>
    <type>DIRECTORY</type>
    <name>user</name>
    <mtime>1512722284477</mtime>
    <permission>molly:supergroup:rwxr-xr-x</permission>
    <nsquota>-1</nsquota>
    <dsquota>-1</dsquota>
</inode>
<inode>
    <id>16387</id>
    <type>DIRECTORY</type>
    <name>molly</name>
    <mtime>1512790549080</mtime>
    <permission>molly:supergroup:rwxr-xr-x</permission>
    <nsquota>-1</nsquota>
    <dsquota>-1</dsquota>
</inode>
<inode>
    <id>16389</id>
    <type>FILE</type>
    <name>wc.input</name>
    <replication>3</replication>
    <mtime>1512722322219</mtime>
    <atime>1512722321610</atime>
    <perferredBlockSize>134217728</perferredBlockSize>
    <permission>molly:supergroup:rw-r--r--</permission>
    <blocks>
        <block>
            <id>1073741825</id>
            <genstamp>1001</genstamp>
            <numBytes>59</numBytes>
        </block>
    </blocks>
</inode >

思考:可以看出,Fsimage中没有记录块所对应DataNode,为什么?
在集群启动后,要求DataNode上报数据块信息,并间隔一段时间后再次上报。
2)oev查看Edits文件
(1)基本语法
hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径
(2)案例实操

[molly@hadoop102 current]$ hdfs oev -p XML -i edits_0000000000000000012-0000000000000000013 -o /opt/module/hadoop-3.1.3/edits.xml

[molly@hadoop102 current]$ cat /opt/module/hadoop-3.1.3/edits.xml

将显示的xml文件内容拷贝到Eclipse中创建的xml文件中,并格式化。显示结果如下。

<?xml version="1.0" encoding="UTF-8"?>
<EDITS>
    <EDITS_VERSION>-63</EDITS_VERSION>
    <RECORD>
        <OPCODE>OP_START_LOG_SEGMENT</OPCODE>
        <DATA>
            <TXID>129</TXID>
        </DATA>
    </RECORD>
    <RECORD>
        <OPCODE>OP_ADD</OPCODE>
        <DATA>
            <TXID>130</TXID>
            <LENGTH>0</LENGTH>
            <INODEID>16407</INODEID>
            <PATH>/hello7.txt</PATH>
            <REPLICATION>2</REPLICATION>
            <MTIME>1512943607866</MTIME>
            <ATIME>1512943607866</ATIME>
            <BLOCKSIZE>134217728</BLOCKSIZE>
            <CLIENT_NAME>DFSClient_NONMAPREDUCE_-1544295051_1</CLIENT_NAME>
            <CLIENT_MACHINE>192.168.1.5</CLIENT_MACHINE>
            <OVERWRITE>true</OVERWRITE>
            <PERMISSION_STATUS>
                <USERNAME>molly</USERNAME>
                <GROUPNAME>supergroup</GROUPNAME>
                <MODE>420</MODE>
            </PERMISSION_STATUS>
            <RPC_CLIENTID>908eafd4-9aec-4288-96f1-e8011d181561</RPC_CLIENTID>
            <RPC_CALLID>0</RPC_CALLID>
        </DATA>
    </RECORD>
    <RECORD>
        <OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
        <DATA>
            <TXID>131</TXID>
            <BLOCK_ID>1073741839</BLOCK_ID>
        </DATA>
    </RECORD>
    <RECORD>
        <OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
        <DATA>
            <TXID>132</TXID>
            <GENSTAMPV2>1016</GENSTAMPV2>
        </DATA>
    </RECORD>
    <RECORD>
        <OPCODE>OP_ADD_BLOCK</OPCODE>
        <DATA>
            <TXID>133</TXID>
            <PATH>/hello7.txt</PATH>
            <BLOCK>
                <BLOCK_ID>1073741839</BLOCK_ID>
                <NUM_BYTES>0</NUM_BYTES>
                <GENSTAMP>1016</GENSTAMP>
            </BLOCK>
            <RPC_CLIENTID></RPC_CLIENTID>
            <RPC_CALLID>-2</RPC_CALLID>
        </DATA>
    </RECORD>
    <RECORD>
        <OPCODE>OP_CLOSE</OPCODE>
        <DATA>
            <TXID>134</TXID>
            <LENGTH>0</LENGTH>
            <INODEID>0</INODEID>
            <PATH>/hello7.txt</PATH>
            <REPLICATION>2</REPLICATION>
            <MTIME>1512943608761</MTIME>
            <ATIME>1512943607866</ATIME>
            <BLOCKSIZE>134217728</BLOCKSIZE>
            <CLIENT_NAME></CLIENT_NAME>
            <CLIENT_MACHINE></CLIENT_MACHINE>
            <OVERWRITE>false</OVERWRITE>
            <BLOCK>
                <BLOCK_ID>1073741839</BLOCK_ID>
                <NUM_BYTES>25</NUM_BYTES>
                <GENSTAMP>1016</GENSTAMP>
            </BLOCK>
            <PERMISSION_STATUS>
                <USERNAME>molly</USERNAME>
                <GROUPNAME>supergroup</GROUPNAME>
                <MODE>420</MODE>
            </PERMISSION_STATUS>
        </DATA>
    </RECORD>
</EDITS >

思考:NameNode如何确定下次开机启动的时候合并哪些Edits?

2.3 CheckPoint时间设置

1)通常情况下,SecondaryNameNode每隔一小时执行一次。
[hdfs-default.xml]

<property>
  <name>dfs.namenode.checkpoint.period</name>
  <value>3600s</value>
</property>

2)一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次。

<property>
  <name>dfs.namenode.checkpoint.txns</name>
  <value>1000000</value>
<description>操作动作次数</description>
</property>

<property>
  <name>dfs.namenode.checkpoint.check.period</name>
  <value>60s</value>
<description> 1分钟检查一次操作次数</description>
</property >

2.4 NameNode故障处理(扩展)

NameNode故障后,可以采用如下两种方法恢复数据。
1)将SecondaryNameNode中数据拷贝到NameNode存储数据的目录;
(1)kill -9 NameNode进程
(2)删除NameNode存储的数据(/opt/module/hadoop-3.1.3/data/tmp/dfs/name)

[molly@hadoop102 hadoop-3.1.3]$ rm -rf /opt/module/hadoop-3.1.3/data/dfs/name/*

(3)拷贝SecondaryNameNode中数据到原NameNode存储数据目录

[molly@hadoop102 dfs]$ scp -r molly@hadoop104:/opt/module/hadoop-3.1.3/data/dfs/namesecondary/* ./name/

(4)重新启动NameNode

[molly@hadoop102 hadoop-3.1.3]$ hdfs --daemon start namenode

2)使用-importCheckpoint选项启动NameNode守护进程,从而将SecondaryNameNode中数据拷贝到NameNode目录中。
(1)修改hdfs-site.xml中的

<property>
    <name>dfs.namenode.checkpoint.period</name>
    <value>120</value>
</property>

<property>
    <name>dfs.namenode.name.dir</name>
    <value>/opt/module/hadoop-3.1.3/data/dfs/name</value>
</property>

(2)kill -9 NameNode进程
(3)删除NameNode存储的数据(/opt/module/hadoop-3.1.3/data/dfs/name)

[molly@hadoop102 hadoop-3.1.3]$ rm -rf /opt/module/hadoop-3.1.3/data/dfs/name/*

(4)如果SecondaryNameNode不和NameNode在一个主机节点上,需要将SecondaryNameNode存储数据的目录拷贝到NameNode存储数据的平级目录,并删除in_use.lock文件

[molly@hadoop102 dfs]$ scp -r molly@hadoop104:/opt/module/hadoop-3.1.3/data/dfs/namesecondary ./

[molly@hadoop102 namesecondary]$ rm -rf in_use.lock

[molly@hadoop102 dfs]$ pwd
/opt/module/hadoop-3.1.3/data/dfs

[molly@hadoop102 dfs]$ ls
data  name  namesecondary

(5)导入检查点数据(等待一会ctrl+c结束掉)

[molly@hadoop102 hadoop-3.1.3]$ bin/hdfs namenode -importCheckpoint

(6)启动NameNode

[molly@hadoop102 hadoop-3.1.3]$ hdfs --daemon start namenode

2.5 集群安全模式

1638341980899

1)基本语法
集群处于安全模式,不能执行重要操作(写操作)。集群启动完成后,自动退出安全模式。
(1)bin/hdfs dfsadmin -safemode get (功能描述:查看安全模式状态)
(2)bin/hdfs dfsadmin -safemode enter (功能描述:进入安全模式状态)
(3)bin/hdfs dfsadmin -safemode leave (功能描述:离开安全模式状态)
(4)bin/hdfs dfsadmin -safemode wait (功能描述:等待安全模式状态)
2)案例
模拟等待安全模式
3)查看当前模式

[molly@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -safemode get
Safe mode is OFF

4)先进入安全模式

[molly@hadoop102 hadoop-3.1.3]$ bin/hdfs dfsadmin -safemode enter

5)创建并执行下面的脚本
在/opt/module/hadoop-3.1.3路径上,编辑一个脚本safemode.sh

[molly@hadoop102 hadoop-3.1.3]$ touch safemode.sh
[molly@hadoop102 hadoop-3.1.3]$ vim safemode.sh

#!/bin/bash
hdfs dfsadmin -safemode wait
hdfs dfs -put /opt/module/hadoop-3.1.3/README.txt /
[molly@hadoop102 hadoop-3.1.3]$ chmod 777 safemode.sh

[molly@hadoop102 hadoop-3.1.3]$ ./safemode.sh 

6)再打开一个窗口,执行

[molly@hadoop102 hadoop-3.1.3]$ bin/hdfs dfsadmin -safemode leave

7)观察
8)再观察上一个窗口
Safe mode is OFF
9)HDFS集群上已经有上传的数据了。

2.6 NameNode多目录配置(了解)

1)NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性
2)具体配置如下
(1)在hdfs-site.xml文件中添加如下内容

<property>
<name>dfs.namenode.name.dir</name>
<value>file://$&#123;hadoop.tmp.dir&#125;/dfs/name1,file://$&#123;hadoop.tmp.dir&#125;/dfs/name2</value>
</property>

(2)停止集群,删除三台节点的data和logs中所有数据。

[molly@hadoop102 hadoop-3.1.3]$ rm -rf data/ logs/
[molly@hadoop103 hadoop-3.1.3]$ rm -rf data/ logs/
[molly@hadoop104 hadoop-3.1.3]$ rm -rf data/ logs/

(3)格式化集群并启动。

[molly@hadoop102 hadoop-3.1.3]$ bin/hdfs namenode –format
[molly@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

(4)查看结果

[molly@hadoop102 dfs]$ ll
总用量 12
drwx------. 3 molly molly 4096 12月 11 08:03 data
drwxrwxr-x. 3 molly molly 4096 12月 11 08:03 name1
drwxrwxr-x. 3 molly molly 4096 12月 11 08:03 name2

3 DataNode(面试开发重点)

3.1 DataNode工作机制

1638342000541

(1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。
(3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
(4)集群运行中可以安全加入和退出一些机器。

3.2 数据完整性

思考:如果电脑磁盘里面存储的数据是控制高铁信号灯的红灯信号(1)和绿灯信号(0),但是存储该数据的磁盘坏了,一直显示是绿灯,是否很危险?同理DataNode节点上的数据损坏了,却没有发现,是否也很危险,那么如何解决呢?
如下是DataNode节点保证数据完整性的方法。
(1)当DataNode读取Block的时候,它会计算CheckSum。
(2)如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。
(3)Client读取其他DataNode上的Block。
(4)常见的校验算法 crc(32),md5(128),sha1(160)
(5)DataNode在其文件创建后周期验证CheckSum。

1638342020071

3.3 掉线时限参数设置

需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。

1638342040415

<property>
    <name>dfs.namenode.heartbeat.recheck-interval</name>
    <value>300000</value>
</property>
<property>
    <name>dfs.heartbeat.interval</name>
    <value>3</value>
</property>

3.4 服役新数据节点

1)需求
随着公司业务的增长,数据量越来越大,原有的数据节点的容量已经不能满足存储数据的需求,需要在原有集群基础上动态添加新的数据节点。
2)环境准备
(1)在hadoop104主机上再克隆一台hadoop105主机
(2)修改IP地址和主机名称
(3)删除原来HDFS文件系统留存的文件(/opt/module/hadoop-3.1.3/data和logs)
(4)source一下配置文件

[molly@hadoop105 hadoop-3.1.3]$ source /etc/profile

3)服役新节点具体步骤
(1)直接启动DataNode,即可关联到集群

[molly@hadoop105 hadoop-3.1.3]$ hdfs --daemon start datanode
[molly@hadoop105 hadoop-3.1.3]$ yarn --daemon start nodemanager

1638342075539

(2)在hadoop105上上传文件

[molly@hadoop105 hadoop-3.1.3]$ hadoop fs -put /opt/module/hadoop-3.1.3/LICENSE.txt /

(3)如果数据不均衡,可以用命令实现集群的再平衡

[molly@hadoop102 sbin]$ ./start-balancer.sh
starting balancer, logging to /opt/module/hadoop-3.1.3/logs/hadoop-molly-balancer-hadoop102.out
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved

3.5 退役旧数据节点

3.5.1 添加白名单和黑名单

白名单和黑名单是hadoop管理集群主机的一种机制。
添加到白名单的主机节点,都允许访问NameNode,不在白名单的主机节点,都会被退出。添加到黑名单的主机节点,不允许访问NameNode,会在数据迁移后退出。
实际情况下,白名单用于确定允许访问NameNode的DataNode节点,内容配置一般与workers文件内容一致。 黑名单用于在集群运行过程中退役DataNode节点。
配置白名单和黑名单的具体步骤如下:
1)在NameNode节点的/opt/module/hadoop-3.1.3/etc/hadoop目录下分别创建whitelist 和blacklist文件

[molly@hadoop102 hadoop]$ pwd
/opt/module/hadoop-3.1.3/etc/hadoop
[molly@hadoop102 hadoop]$ touch whitelist
[molly@hadoop102 hadoop]$ touch blacklist
#在whitelist中添加如下主机名称,假如集群正常工作的节点为102 103 104 105
hadoop102
hadoop103
hadoop104
hadoop105

黑名单暂时为空。
2)在hdfs-site.xml配置文件中增加dfs.hosts和 dfs.hosts.exclude配置参数

<!-- 白名单 -->
<property>
<name>dfs.hosts</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/whitelist</value>
</property>
<!-- 黑名单 -->
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value>
</property>

3)分发配置文件whitelist,blacklist,hdfs-site.xml (注意:105节点也要发一份)

[molly@hadoop102 etc]$ xsync hadoop/ 
[molly@hadoop102 etc]$ rsync -av hadoop/ molly@hadoop105:/opt/module/hadoop-3.1.3/etc/hadoop/

4)重新启动集群(注意:105节点没有添加到workers,因此要单独起停)

[molly@hadoop102 hadoop-3.1.3]$ stop-dfs.sh
[molly@hadoop102 hadoop-3.1.3]$ start-dfs.sh
[molly@hadoop105 hadoop-3.1.3]$ hdfs –daemon start datanode

5)在web浏览器上查看目前正常工作的DN节点

1638342091569

3.5.2 黑名单退役

1)编辑/opt/module/hadoop-3.1.3/etc/hadoop目录下的blacklist文件

[molly@hadoop102 hadoop] vim blacklist

添加如下主机名称(要退役的节点)

hadoop105

2)分发blacklist到所有节点

[molly@hadoop102 etc]$ xsync hadoop/ 
[molly@hadoop102 etc]$ rsync -av hadoop/ molly@hadoop105:/opt/module/hadoop-3.1.3/etc/hadoop/

3)刷新NameNode、刷新ResourceManager

[molly@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -refreshNodes
Refresh nodes successful

[molly@hadoop102 hadoop-3.1.3]$ yarn rmadmin -refreshNodes
17/06/24 14:55:56 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.1.103:8033

4)检查Web浏览器,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点

1638342103632

5)等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役

1638342110735

[molly@hadoop105 hadoop-3.1.3]$ hdfs --daemon stop datanode
stopping datanode
[molly@hadoop105 hadoop-3.1.3]$ yarn --daemon stop nodemanager
stopping nodemanager

6)如果数据不均衡,可以用命令实现集群的再平衡

[molly@hadoop102 hadoop-3.1.3]$ sbin/start-balancer.sh 
starting balancer, logging to /opt/module/hadoop-3.1.3/logs/hadoop-molly-balancer-hadoop102.out
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved

注意:不允许白名单和黑名单中同时出现同一个主机名称,既然使用了黑名单blacklist成功退役了hadoop105节点,因此要将白名单whitelist里面的hadoop105去掉。

3.6 DataNode多目录配置

1)DataNode可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本
2)具体配置如下
(1)在hdfs-site.xml文件中添加如下内容

<property>
        <name>dfs.datanode.data.dir</name>
<value>file://$&#123;hadoop.tmp.dir&#125;/dfs/data1,file://$&#123;hadoop.tmp.dir&#125;/dfs/data2</value>
</property>

(2)停止集群,删除三台节点的data和logs中所有数据。

[molly@hadoop102 hadoop-3.1.3]$ rm -rf data/ logs/
[molly@hadoop103 hadoop-3.1.3]$ rm -rf data/ logs/
[molly@hadoop104 hadoop-3.1.3]$ rm -rf data/ logs/

(3)格式化集群并启动。

[molly@hadoop102 hadoop-3.1.3]$ bin/hdfs namenode –format
[molly@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

(4)查看结果

[molly@hadoop102 dfs]$ ll
总用量 12
drwx------. 3 molly molly 4096 4月   4 14:22 data1
drwx------. 3 molly molly 4096 4月   4 14:22 data2
drwxrwxr-x. 3 molly molly 4096 12月 11 08:03 name1
drwxrwxr-x. 3 molly molly 4096 12月 11 08:03 name2