大数据实践(一)数仓采集项目

1 数仓概念

1637651564709

2 项目需求及架构设计

2.1 项目需求分析

1)用户行为数据采集平台搭建(存在日志服务器-用flume采集到数据仓库(hdfs)中)
2)业务数据采集平台搭建(存在mysql,oracle):通过sqoop采集到数据仓库(hdfs)中
3)数据仓库维度建模:建库建表
4)分析,设备 会员 商品 地区 活动等电商核心主题,统计的报表指标近100个。
5)采用即席查询工具,随时进行指标分析
6)对集群性能进行监控,发送异常需要报警
7)元数据管理:管理Hive的元数据(数仓的核心就是hive)
8)质量监控:数仓分析的质量 有没有丢数据等

2.2 项目框架

2.2.1 技术选型

技术选型主要考虑因素:数据量大小,业务需求,行业内经验,技术成熟度、开发维护成本、总成本预算。下面加粗的是本次选择的。

  • 数据采集传输:Flume,Logstash(elk系列),Kafka,Sqoop(开源的),DataX(阿里技术,比Sqoop强大)
  • 数据存储:Mysq,HDFS,HBase,Redis,MongoDB
  • 数据计算:Hive,Tez,Spark,Flink,Storm(Hive底层用Tex或者Spark),Storm较早 不用
  • 数据查询:Presto,Kylin(Apache),Impala,Druid
  • 数据可视化:Echarts(百度的,现在已经是Apache的了),Superset(开源),QuickBI,DataV(后两个都是阿里)
  • 任务调度:Azkaban(Apache的,可视化界面),Oozie(CDH生态,HUE,功能比azkaban强大)
  • 集群监控:Zabbix
  • 元数据管理:Atlas

2.2.2 系统数据流程设计

1637652372079

多个Flume往HDFS中写数据,会有压力,因此中间部署Kafka,来缓冲。

Hive通过写sql对数据进行处理,主要有5层处理,分别是ods,dwd,dws,dwt,ads。

对整个仓库的任务:利用什么时候导入数据等用Azkaban来调度。

2.2.3 框架版本选型

1)如何选择Apache/CDH/HDP版本?

Apache:运维麻烦;组件间兼容性需要自己调研(一般大厂使用,技术实力雄厚,有专业的运维人员)

CDH:国内使用最多的版本,但是CM不开源。2021年开始收费,一个节点1万美金

HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少

目前CDH和HDP已经合并了。

2)框架版本参考

产品版本
Hadoop3.1.3
Flume1.9.0
Kafka2.4.1
Hive3.1.2
Sqoop1.4.6
Java1.8
Zookeeper3.5.7
Presto0.189

2.2.4 服务器选型

服务器选择物理机还是云主机?

1)物理机:

以128G内存,20核物理CPU,40线程,戴尔品牌单台报价4W出头,一般物理机寿命5年左右。

需要有专业的运维任意,平均一个月1万。电费也是不少的开销。

2)云主机:

以阿里云为例,差不多配置,每年5W。运维工作都有阿里云完成,运维相对较轻松。

3)企业选择

金融有限公司和阿里没有直接冲突的公司选择阿里云。

中小公司,为了融资上市,选择阿里云,拉到融资后买物理机。

有长期打算,资金比较足,选择物理机。

2.2.5 集群资源规划设计

1)如何确认集群规模?(假设服务器8T磁盘,128G内存)

(1)每天日活跃用户100万,每人一天平均100条:100万*100条=1亿条

(2)每条日志1k左右,每天1亿条:100000000/1024/1024=约100G

(3)半年内不扩容服务器来算:100G*180天=约18T

(4)保留3副本:18T*3=54T

(5)保留20%-30%buf=54T/0.7=77T

(6)算到这里:约8T*10台服务器

2)测试集群服务器规划

服务名称子服务服务器 hadoop102服务器 hadoop103服务器 hadoop104
HDFSNameNode
DataNode
SecondaryNameNode
YarnNodeManager
Resourcemanager
ZookeeperZookeeper Server
Flume(采集日志)Flume
KafkaKafka
Flume(消费Kafka)Flume
HiveHive
MySQLMySQL
SqoopSqoop
PrestoCoordinator
Worker
AzkabanAzkabanWebServer
AzkabanExecutorServer
DruidDruid
Kylin
HbaseHMaster
HRegionServer
Superset
Atlas
SolrJar
服务数总计1899

3 数据生成模块

3.1 目标数据

我们要收集和分析的数据主要包括页面数据、事件数据、曝光数据、启动数据和错误数据。

3.1.1 页面

页面数据主要记录一个页面的用户访问情况,包括访问时间、停留时间、页面路径等信息。

1)所有页面id如下

home("首页"),
category("分类页"),
discovery("发现页"),
top_n("热门排行"),
favor("收藏页"),
search("搜索页"),
good_list("商品列表页"),
good_detail("商品详情"),
good_spec("商品规格"),
comment("评价"),
comment_done("评价完成"),
comment_list("评价列表"),
cart("购物车"),
trade("下单结算"),
payment("支付页面"),
payment_done("支付完成"),
orders_all("全部订单"),
orders_unpaid("订单待支付"),
orders_undelivered("订单待发货"),
orders_unreceipted("订单待收货"),
orders_wait_comment("订单待评价"),
mine("我的"),
activity("活动"),
login("登录"),
register("注册");

2)所有页面对象类型如下:

sku_id("商品skuId"),
keyword("搜索关键词"),
sku_ids("多个商品skuId"),
activity_id("活动id"),
coupon_id("购物券id");

3)所有来源类型如下:

promotion("商品推广"),
recommend("算法推荐商品"),
query("查询结果商品"),
activity("促销活动");

3.1.2 事件

事件数据主要记录应用内一个具体操作行为,包括操作类型、操作对象、操作对象描述等信息。

1)所有动作类型如下:

favor_add("添加收藏"),
favor_canel("取消收藏"),
cart_add("添加购物车"),
cart_remove("删除购物车"),
cart_add_num("增加购物车商品数量"),
cart_minus_num("减少购物车商品数量"),
trade_add_address("增加收货地址"),
get_coupon("领取优惠券");

注:对于下单、支付等业务数据,可从业务数据库获取。

2)所有动作目标类型如下:

sku_id(“商品”),
coupon_id(“购物券”);

3.1.3 曝光

曝光数据主要记录页面所曝光的内容,包括曝光对象,曝光类型等信息。

1)所有曝光类型如下:

promotion("商品推广"),
recommend("算法推荐商品"),
query("查询结果商品"),
activity("促销活动");

2)所有曝光对象类型如下:

sku_id("商品skuId"),
activity_id("活动id");

3.1.4 启动

启动数据记录应用的启动信息。

1)所有启动入口类型如下:

icon("图标"),
notification("通知"),
install("安装后启动");

3.1.5 错误

错误数据记录应用使用过程中的错误信息,包括错误编号及错误信息。

3.2 数据埋点

3.2.1 主流埋点方式(了解)

目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点三种。

代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。

可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。(三方埋点技术:神策大数据,GrowingIO)

全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

3.2.2 埋点数据日志结构

我们的日志结构大致可分为两类,一是普通页面埋点日志,二是启动日志。

普通页面日志结构如下,每条日志包含了,当前页面的页面信息,所有事件(动作)、所有曝光信息以及错误信息。除此之外,还包含了一系列公共信息,包括设备信息,地理位置,应用信息等,即下边的common字段。

1)普通页面埋点日志格式

{
  "common": {                  -- 公共信息
    "ar": "230000",              -- 地区编码
    "ba": "iPhone",              -- 手机品牌
    "ch": "Appstore",            -- 渠道
    "is_new": "1",--是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0"md": "iPhone 8",            -- 手机型号
    "mid": "YXfhjAYH6As2z9Iq", -- 设备id
    "os": "iOS 13.2.9",          -- 操作系统
    "uid": "485",                 -- 会员id
    "vc": "v2.1.134"             -- app版本号
  },
"actions": [                     --动作(事件)  
    {
      "action_id": "favor_add",   --动作id
      "item": "3",                   --目标id
      "item_type": "sku_id",       --目标类型
      "ts": 1585744376605           --动作时间戳
    }
  ],
  "displays": [
    {
      "displayType": "query",        -- 曝光类型
      "item": "3",                     -- 曝光对象id
      "item_type": "sku_id",         -- 曝光对象类型
      "order": 1,                      --出现顺序
      "pos_id": 2                      --曝光位置
    },
    {
      "displayType": "promotion",
      "item": "6",
      "item_type": "sku_id",
      "order": 2, 
      "pos_id": 1
    },
    {
      "displayType": "promotion",
      "item": "9",
      "item_type": "sku_id",
      "order": 3, 
      "pos_id": 3
    },
    {
      "displayType": "recommend",
      "item": "6",
      "item_type": "sku_id",
      "order": 4, 
      "pos_id": 2
    },
    {
      "displayType": "query ",
      "item": "6",
      "item_type": "sku_id",
      "order": 5, 
      "pos_id": 1
    }
  ],
  "page": {                       --页面信息
    "during_time": 7648,        -- 持续时间毫秒
    "item": "3",                  -- 目标id
    "item_type": "sku_id",      -- 目标类型
    "last_page_id": "login",    -- 上页类型
    "page_id": "good_detail",   -- 页面ID
    "sourceType": "promotion"   -- 来源类型
  },
"err":{                     --错误
"error_code": "1234",      --错误码
    "msg": "***********"       --错误信息
},
  "ts": 1585744374423  --跳入时间戳
}

2)启动日志格式

启动日志结构相对简单,主要包含公共信息,启动信息和错误信息。

{
  "common": {
    "ar": "370000",
    "ba": "Honor",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "Honor 20s",
    "mid": "eQF5boERMJFOujcp",
    "os": "Android 11.0",
    "uid": "76",
    "vc": "v2.1.134"
  },
  "start": {   
    "entry": "icon",         --icon手机图标  notice 通知   install 安装后启动
    "loading_time": 18803,  --启动加载时间
    "open_ad_id": 7,        --广告页ID
    "open_ad_ms": 3449,    -- 广告总共播放时间
    "open_ad_skip_ms": 1989   --  用户跳过广告时点
  },
"err":{                     --错误
"error_code": "1234",      --错误码
    "msg": "***********"       --错误信息
},
  "ts": 1585744304000
}

3.2.3 埋点数据上报时机

埋点数据上报时机包括两种方式。

方式一,在离开该页面时,上传在这个页面产生的所有数据(页面、事件、曝光、错误等)。优点,批处理,减少了服务器接收数据压力。缺点,不是特别及时。

方式二,每个事件、动作、错误等,产生后,立即发送。优点,响应及时。缺点,对服务器接收数据压力比较大。

3.3 服务器和JDK准备

3.3.1 服务器准备

安装hadoop集群,分别安装hadoop102、hadoop103、hadoop104三台主机。

3.3.2 阿里云服务器准备(可选)

3.3.3 JDK准备

1)卸载现有JDK(3台节点)

[molly@hadoop102 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
[molly@hadoop103 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
[molly@hadoop104 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

2)用SecureCRT工具将JDK导入到hadoop102的/opt/software文件夹下面

3) “alt+p”进入sftp模式

4)选择jdk1.8拖入工具

5)在Linux系统下的opt目录中查看软件包是否导入成功

[molly@hadoop102 software]# ls /opt/software/

看到如下结果:

jdk-8u212-linux-x64.tar.gz

6)解压JDK到/opt/module目录下

[molly@hadoop102 software]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

7)配置JDK环境变量

​ (1)新建/etc/profile.d/my_env.sh文件

[molly@hadoop102 module]# sudo vim /etc/profile.d/my_env.sh

添加如下内容,然后保存(:wq)退出

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

​ (2)让环境变量生效

[molly@hadoop102 software]$ source /etc/profile.d/my_env.sh

8)测试JDK是否安装成功

[molly@hadoop102 module]# java -version
#如果能看到以下结果、则Java正常安装
java version "1.8.0_212"

9)分发JDK

[molly@hadoop102 module]$ xsync /opt/module/jdk1.8.0_212/

10)分发环境变量配置文件

[molly@hadoop102 module]$ sudo /home/molly/bin/xsync /etc/profile.d/my_env.sh

11)分别在hadoop103、hadoop104上执行source

[molly@hadoop103 module]$ source /etc/profile.d/my_env.sh
[molly@hadoop104 module]$ source /etc/profile.d/my_env.sh

3.3.4 环境变量配置说明

Linux的环境变量可在多个文件中配置,如/etc/profile,/etc/profile.d/*.sh,/.bashrc,/.bash_profile等,下面说明上述几个文件之间的关系和区别。

bash的运行模式可分为login shell和non-login shell。

例如,我们通过终端,输入用户名、密码,登录系统之后,得到就是一个login shell,而当我们执行以下命令ssh hadoop103 command,在hadoop103执行command的就是一个non-login shell。

这两种shell的主要区别在于,它们启动时会加载不同的配置文件,login shell启动时会加载/etc/profile,/.bash_profile,/.bashrc,non-login shell启动时会加载~/.bashrc。

而在加载/.bashrc(实际是/.bashrc中加载的/etc/bashrc)或/etc/profile时,都会执行如下代码片段,

1637657318225

因此不管是login shell还是non-login shell,启动时都会加载/etc/profile.d/*.sh中的环境变量。

3.4 模拟数据

3.4.1 使用说明

1)将application.yml、gmall2020-mock-log-2021-01-22.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下

(1)创建applog路径

[molly@hadoop102 module]$ mkdir /opt/module/applog

(2)上传文件

2)配置文件

(1)application.yml文件

可以根据需求生成对应日期的用户行为日志。

[molly@hadoop102 applog]$ vim application.yml

修改如下内容

# 外部配置打开
# 外部配置打开
logging.config: "./logback.xml"
#业务日期
mock.date: "2020-06-14"

#模拟数据发送模式
#mock.type: "http"
#mock.type: "kafka"
mock.type: "log"

#http模式下,发送的地址
mock.url: "http://hdp1/applog"

#kafka模式下,发送的地址
mock:
  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
  kafka-topic: "ODS_BASE_LOG"

#启动次数
mock.startup.count: 200
#设备最大值
mock.max.mid: 500000
#会员最大值
mock.max.uid: 100
#商品最大值
mock.max.sku-id: 35
#页面平均访问时间
mock.page.during-time-ms: 20000
#错误概率 百分比
mock.error.rate: 3
#每条日志发送延迟 ms
mock.log.sleep: 10
#商品详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"
#领取购物券概率
mock.if_get_coupon_rate: 75
#购物券最大id
mock.max.coupon-id: 3
#搜索关键词  
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"

(2)path.json,该文件用来配置访问路径

根据需求,可以灵活配置用户点击路径。

来到主页-搜索-上篇-下单-。。

[
  {"path":["home","good_list","good_detail","cart","trade","payment"],"rate":20 },
  {"path":["home","search","good_list","good_detail","login","good_detail","cart","trade","payment"],"rate":40 },
  {"path":["home","mine","orders_unpaid","trade","payment"],"rate":10 },
  {"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","trade","payment"],"rate":5 },
  {"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","home"],"rate":5 },
  {"path":["home","good_detail"],"rate":10 },
  {"path":["home"  ],"rate":10 }
]

(3)logback配置文件

可配置日志生成路径,修改内容如下

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/applog/log" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>$&#123;LOG_HOME&#125;/app.%d&#123;yyyy-MM-dd&#125;.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.atgugu.gmall2020.mock.log.util.LogUtil"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error"  >
        <appender-ref ref="console" />
    </root>
</configuration>

3)生成日志

(1)进入到/opt/module/applog路径,执行以下命令

[molly@hadoop102 applog]$ java -jar gmall2020-mock-log-2021-01-22.jar

(2)在/opt/module/applog/log目录下查看生成日志

[molly@hadoop102 log]$ ll

3.4.2 集群日志生成脚本

在生成日志的时候模拟多台服务器生产的日志。在hadoop102的/home/molly目录下创建bin目录,这样脚本可以在服务器的任何目录执行。

[molly@hadoop102 ~]$ echo $PATH
/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/molly/.local/bin:/home/molly/bin

​ 1)在/home/molly/bin目录下创建脚本lg.sh

[molly@hadoop102 bin]$ vim lg.sh

​ 2)在脚本中编写如下内容

#!/bin/bash
for i in hadoop102 hadoop103; do
    echo "========== $i =========="
    ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-01-22.jar >/dev/null 2>&1 &"
done 

注:

(1)/opt/module/applog/为jar包及配置文件所在路径

(2)/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

标准输入0:从键盘获得输入 /proc/self/fd/0

标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

3)修改脚本执行权限

[molly@hadoop102 bin]$ chmod u+x lg.sh

4)将jar包及配置文件s上传至hadoop103的/opt/module/applog/路径

5)启动脚本

[molly@hadoop102 module]$ lg.sh 

6)分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据

[molly@hadoop102 logs]$ ls
app.2020-06-14.log
[molly@hadoop103 logs]$ ls
app.2020-06-14.log

4 数据采集模块

4.1 集群所有进程查看脚本

1)在/home/molly/bin目录下创建脚本xcall.sh

[molly@hadoop102 bin]$ vim xcall.sh

2)在脚本中编写如下内容

#! /bin/bash

for i in hadoop102 hadoop103 hadoop104
do
    echo --------- $i ----------
    ssh $i "$*"
done

3)修改脚本执行权限

[molly@hadoop102 bin]$ chmod 777 xcall.sh

4)启动脚本

[molly@hadoop102 bin]$ xcall.sh jps

4.2 Hadoop安装

详见:Hadoop 教程(二)安装hadoop集群-完全分布式部署

1)集群规划:

服务器hadoop102服务器hadoop103服务器hadoop104
HDFSNameNode DataNodeDataNodeDataNode SecondaryNameNode
YarnNodeManagerResourcemanager NodeManagerNodeManager

注意:尽量使用离线方式安装。第一次启动需要格式化namenode.

[molly@hadoop102 bin]hdfs namenode -format

4.2.1 项目经验之HDFS存储多目录

datanode和namenode都可以多目录存储。namenode不同目录的数据都是一样的,所以这里我们不配namenode的多目录。但是datanode的多目录中每个目录存储的数据是不一样的,可以多目录(磁盘挂载到的对应目录。因此用不同磁盘来存储Datanode,实现方式就是:datanode配置多目录)

1)生产环境服务器磁盘情况

从当前服务器可以看到:4个磁盘挂载在不同的目录。

1637661734977

2)在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题。

HDFS的DataNode节点保存数据的路径由dfs.datanode.data.dir参数决定,其默认值为file://${hadoop.tmp.dir}/dfs/data,若服务器有多个磁盘,必须对该参数进行修改。如服务器磁盘如上图所示,则该参数应修改为如下的值。

<property>
  <name>dfs.datanode.data.dir</name>
<value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>

注意:每台服务器挂载的磁盘不一样,所以每个节点的多目录配置可以不一致。单独配置即可。

4.2.2 集群数据均衡

1)节点间数据均衡:就是102 103 104之间的

下面由于人工操作会导致三台节点的存储数据不均衡,如下图所示:1637662122810

解决办法:开启数据均衡命令:

start-balancer.sh -threshold 10

对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。

停止数据均衡命令:

stop-balancer.sh

2)磁盘间数据均衡

hadoop3.x的新特性:针对磁盘间的数据均衡

(1)生成均衡计划(我们只有一块磁盘,不会生成计划)

hdfs diskbalancer -plan hadoop103

(2)执行均衡计划

hdfs diskbalancer -execute hadoop103.plan.json

(3)查看当前均衡任务的执行情况

hdfs diskbalancer -query hadoop103

(4)取消均衡任务

hdfs diskbalancer -cancel hadoop103.plan.json

4.2.3 项目经验之支持LZO压缩配置

1)hadoop本身并不支持lzo压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译。

2)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/

[molly@hadoop102 common]$ pwd
/opt/module/hadoop-3.1.3/share/hadoop/common
[molly@hadoop102 common]$ ls
hadoop-lzo-0.4.20.jar

3)同步hadoop-lzo-0.4.20.jar到hadoop103、hadoop104

[molly@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar

4)core-site.xml增加配置支持LZO压缩

<configuration>
  <property><name>io.compression.codecs</name><value>
​      org.apache.hadoop.io.compress.GzipCodec,
​      org.apache.hadoop.io.compress.DefaultCodec,
​      org.apache.hadoop.io.compress.BZip2Codec,
​      org.apache.hadoop.io.compress.SnappyCodec,
​      com.hadoop.compression.lzo.LzoCodec,
​      com.hadoop.compression.lzo.LzopCodec
​    </value>
  </property>
  <property><name>io.compression.codec.lzo.class</name><value>com.hadoop.compression.lzo.LzoCodec</value>
  </property>
</configuration>

5)同步core-site.xml到hadoop103、hadoop104

[molly@hadoop102 hadoop]$ xsync core-site.xml

6)启动及查看集群

[molly@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[molly@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

4.2.4 测试LZO

(1)执行wordcount程序

[molly@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /lzo-output

1637662603236

4.2.4 项目经验之LZO创建索引

1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。

hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo

2)测试

​ (1)将bigtable.lzo(200M)上传到集群的根目录

[molly@hadoop102 module]$ hadoop fs -mkdir /input
[molly@hadoop102 module]$ hadoop fs -put bigtable.lzo /input

1637662802344

(2)执行wordcount程序

[molly@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output1

允许发现切片只有一个,因此想起原理,Lzo切片是依赖于索引的,因此我们需要建索引

(3)对上传的LZO文件建索引

[molly@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo

1637662962636

(4)再次执行WordCount程序

这里注意需要指定inputformat类为LzoText对应的类com.hadoop.mapreduce.LzoTextInputFormat

[molly@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output2

发现切片数为2。查看历史服务器去查看具体切片信息。1637662988690

map执行过程中切片信息。1637663231181

3)注意:如果以上任务,在运行过程中报如下异常

Container [pid=8468,containerID=container_1594198338753_0001_01_000002] is running 318740992B beyond the 'VIRTUAL' memory limit. Current usage: 111.5 MB of 1 GB physical memory used; 2.4 GB of 2.1 GB virtual memory used. Killing container.
Dump of the process-tree for container_1594198338753_0001_01_000002 :

解决办法:在hadoop102的/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml文件中增加如下配置,然后分发到hadoop103、hadoop104服务器上,并重新启动集群。

<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
  <name>yarn.nodemanager.pmem-check-enabled</name>
  <value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>

4.2.5 项目经验之基准测试

1) 测试HDFS写性能

​ 测试内容:向HDFS集群写10个128M的文件

[molly@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 6 -fileSize 128MB

2020-04-16 13:41:24,724 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
2020-04-16 13:41:24,724 INFO fs.TestDFSIO:       Date & time: Thu Apr 16 13:41:24 CST 2020
2020-04-16 13:41:24,724 INFO fs.TestDFSIO:     Number of files: 10
2020-04-16 13:41:24,725 INFO fs.TestDFSIO: Total MBytes processed: 1280
2020-04-16 13:41:24,725 INFO fs.TestDFSIO:    Throughput mb/sec: 8.88
2020-04-16 13:41:24,725 INFO fs.TestDFSIO: Average IO rate mb/sec: 8.96
2020-04-16 13:41:24,725 INFO fs.TestDFSIO:  IO rate std deviation: 0.87
2020-04-16 13:41:24,725 INFO fs.TestDFSIO:   Test exec time sec: 67.61

2)测试HDFS读性能

测试内容:读取HDFS集群10个128M的文件

[molly@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
2020-04-16 13:43:38,857 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
2020-04-16 13:43:38,858 INFO fs.TestDFSIO:   Date & time: Thu Apr 16 13:43:38 CST 2020
2020-04-16 13:43:38,859 INFO fs.TestDFSIO:         Number of files: 10
2020-04-16 13:43:38,859 INFO fs.TestDFSIO:  Total MBytes processed: 1280
2020-04-16 13:43:38,859 INFO fs.TestDFSIO:       Throughput mb/sec: 85.54
2020-04-16 13:43:38,860 INFO fs.TestDFSIO:  Average IO rate mb/sec: 100.21
2020-04-16 13:43:38,860 INFO fs.TestDFSIO:   IO rate std deviation: 44.37
2020-04-16 13:43:38,860 INFO fs.TestDFSIO:      Test exec time sec: 53.61

3)删除测试生成数据

[molly@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

4)使用Sort程序评测MapReduce(要求性能特别好,普通性能不要去跑)

(1)使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数

[molly@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data

(2)执行Sort程序

[molly@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data

(3)验证数据是否真正排好序了

[molly@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

4.2.6 项目经验之Hadoop参数调优

1)HDFS参数调优hdfs-site.xml

NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。

对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。

<property>
  <name>dfs.namenode.handler.count</name>
  <value>10</value>
</property>

dfs.namenode.handler.count=1637663793390 ,比如集群规模为8台时,此参数设置为41。

2)YARN参数调优yarn-site.xml

(1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive

面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。

(2)解决办法:

内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。

(a)yarn.nodemanager.resource.memory-mb

表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。

(b)yarn.scheduler.maximum-allocation-mb

单个任务可申请的最多物理内存量,默认是8192(MB)。

4.3 Zookeeper安装

4.3.1 安装ZK

详见:

集群规划

服务器hadoop102服务器hadoop103服务器hadoop104
ZookeeperZookeeperZookeeperZookeeper

4.3.2 ZK集群启动停止脚本

1)在hadoop102的/home/molly/bin目录下创建脚本

[molly@hadoop102 bin]$ vim zk.sh

#在脚本中编写如下内容

#!/bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo ---------- zookeeper $i 启动 ------------
        ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo ---------- zookeeper $i 停止 ------------    
        ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
    done
};;
"status"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo ---------- zookeeper $i 状态 ------------    
        ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
    done
};;
esac

2)增加脚本执行权限

[molly@hadoop102 bin]$ chmod u+x zk.sh

3)Zookeeper集群启动脚本

[molly@hadoop102 module]$ zk.sh start

4)Zookeeper集群停止脚本

[molly@hadoop102 module]$ zk.sh stop

4.4 Kafka安装

4.4.1 Kafka集群安装

详见:kafka学习笔记(一) kafka搭建

配置文件:

log.dirs=/opt/moudule/kafka 2.11-2.4.1/datas
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

集群规划:

服务器hadoop102服务器hadoop103服务器hadoop104
KafkaKafkaKafkaKafka

4.4.2 Kafka集群启动停止脚本

1)在/home/molly/bin目录下创建脚本kf.sh

[molly@hadoop102 bin]$ vim kf.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
    done
};;
esac

2)增加脚本执行权限

[molly@hadoop102 bin]$ chmod u+x kf.sh

3)kf集群启动脚本

[molly@hadoop102 module]$ kf.sh start

查看zookeeper信息1637664623705

4)kf集群停止脚本

[molly@hadoop102 module]$ kf.sh stop

4.4.3 Kafka常用命令

1)查看Kafka Topic列表

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
[molly@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
[molly@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe

2)创建Kafka Topic

进入到/opt/module/kafka/目录下创建日志主题

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_log

3)删除Kafka Topic

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_log

4)Kafka生产消息

[molly@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic topic_log
\>hello world
\>molly molly

5)Kafka消费消息

[molly@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

–from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

6)查看Kafka Topic详情

[molly@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \
--describe --topic topic_log

4.4.4 项目经验之Kafka压力测试

1)Kafka压测

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh

2)Kafka Producer压力测试

(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下

[molly@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

说明:

record-size是一条信息有多大,单位是字节。
num-records是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。

(2)Kafka会打印下面的信息

100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.

参数解析:本例中一共写入10w条消息,吞吐量为9.14 MB/sec,每次写入的平均延迟为187.68毫秒,最大的延迟为424.00毫秒。

3)Kafka Consumer压力测试

Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

[molly@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1

参数说明:

–zookeeper 指定zookeeper的链接信息
–topic 指定topic的名称
–fetch-size 指定每次fetch的数据的大小
–messages 总共要消费的消息个数

测试结果说明:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153

开始测试时间,测试结束数据,共消费数据9.5368MB,吞吐量2.0714MB/s,共消费100010条,平均每秒消费21722.4153条。

4.4.5 项目经验之Kafka机器数量计算

Kafka机器数量(经验公式)=2x(峰值生产速度x副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量=2(502/100)+ 1=3台

4.4.6 项目经验值Kafka分区数计算

1)创建一个只有1个分区的topic

2)测试这个topic的producer吞吐量和consumer吞吐量。

3)假设他们的值分别是Tp和Tc,单位可以是MB/s。

4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)

例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;

分区数=100 / 20 =5分区

https://blog.csdn.net/weixin_42641909/article/details/89294698

分区数一般设置为:3-10个

5 项目1 采集用户行为数据

如下图所示,用户行为经过埋点进行收集然后存放到logserver上,这个时候利用Flume(第一层)从日志服务器logserver上采集数据送到kafka中,再通过一个flume(第二层)接收,最后存储到HDFS中。

再看flume架构:

第一层flume,我们source选择为taildirSource,channel选Kafka Channel(这里不需要sink,因为KafkaChanel直接将数据存到Kafka中了)。

第二层flume:我们source选择为KafkaSource,channel选fileChannel,sink选择HDFS Sink。

其中flume安装详见:flume学习笔记(一) flume搭建

1637724868076

日志采集Flume集群规划

我们将第一层flume安装在hadoop102和hadoop103上,第二层flume安装在hadoop104.

服务器hadoop102服务器hadoop103服务器hadoop104
Flume(采集日志)(第一层)Flume(第一层) Flume(第二层 Flum

5.1 第一层flume采集

5.1.1 项目经验之Flume组件选型

1637722930078

Flume直接读log日志的数据,log日志的格式是app.yyyy-mm-dd.log。注意其中logInterceptor主要对原始日志进行初步数据处理,删除空数据。

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势

TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

Spooling Directory Source监控目录,支持断点续传。

(2)batchSize大小如何设置?

+答:Event 1K左右时,500-1000合适(默认为100)

2)Channel

采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

5.1.2 日志采集Flume配置

1)Flume的具体配置如下:

​ (1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

[molly@hadoop102 conf]$ vim file-flume-kafka.conf

在文件配置如下内容

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.molly.flume.interceptor.ETLInterceptor$Builder
#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

注意:com.molly.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

5.1.3 Flume拦截器

在第一层flume中对原始数据进行清洗

1)创建Maven工程flume-interceptor

2)创建包名:com.molly.flume.interceptor

3)在pom.xml文件中添加如下配置

maven-compiler-plugin是打包插件。com.alibaba注意加compile,把该组件打到包中。

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4)在com.molly.flume.interceptor包下创建JSONUtils类

package com.molly.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
public class JSONUtils &#123;
    public static boolean isJSONValidate(String log)&#123;
        try &#123;
            JSON.parse(log);
            return true;
        &#125;catch (JSONException e)&#123;
            return false;
        &#125;
    &#125;
&#125;

5)在com.molly.flume.interceptor包下创建LogInterceptor类

package com.molly.flume.interceptor;
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
public class ETLInterceptor implements Interceptor &#123;
    @Override
    public void initialize() &#123;
    &#125;
    @Override
    public Event intercept(Event event) &#123;
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        if (JSONUtils.isJSONValidate(log)) &#123;
            return event;
        &#125; else &#123;
            return null;
        &#125;
    &#125;
    @Override
    public List<Event> intercept(List<Event> list) &#123;
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext())&#123;
            Event next = iterator.next();
            if(intercept(next)==null)&#123;
                iterator.remove();
            &#125;
        &#125;
        return list;
    &#125;
    public static class Builder implements Interceptor.Builder&#123;
        @Override
        public Interceptor build() &#123;
            return new ETLInterceptor();
        &#125;
        @Override
        public void configure(Context context) &#123;
       &#125;
    &#125;
    @Override
    public void close() &#123;
    &#125;
&#125;

6)打包

1637725114820

7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

[molly@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

8)分发Flume到hadoop103、hadoop104

[molly@hadoop102 module]$ xsync flume/

9)hadoop102消费Flume数据

为了查看第一次flume是否起作用,我们开启一个kafka消费端来消费kafkaChannel中的数据。

[molly@hadoop102~]kafka-console-consumer.sh --topic topic_log --bootstrap-server hadoop102:9092

10)分别在hadoop102、hadoop103上启动Flume

[molly@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[molly@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[molly@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf  -n a1 -Dflume.root.logger=INFO,console

11)观看9)中的kafka消费端有数据在消费。

5.1.4 日志采集Flume启动停止脚本

1)在/home/molly/bin目录下创建脚本f1.sh

[molly@hadoop102 bin]$ vim f1.sh

​ 在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};;    
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

说明2:awk 默认分隔符为空格

说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

2)增加脚本执行权限

[molly@hadoop102 bin]$ chmod u+x f1.sh

3)f1集群启动脚本

[molly@hadoop102 module]$ f1.sh start

4)f1集群停止脚本

[molly@hadoop102 module]$ f1.sh stop

集群规划

服务器hadoop102服务器hadoop103服务器hadoop104
Flume(消费Kafka)Flume

5.2 第二层flume采集

集群规划,第二层flume是消费Kafka数据的Flume,部署在hadoop104上。

服务器hadoop102服务器hadoop103服务器hadoop104
Flume(消费Kafka)Flume

5.2.1 项目经验之Flume组件选型

第二次Flume主要作用是消费Kafka中的数据,然后存储到HDFS中,因此Source选择KafkaSource,sink选择HDFSsink。同时在source端使用一个拦截器:拦截器作用是获取日志中的实际时间。

1637738855416

1)FileChannel和MemoryChannel区别

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

选型:

金融类公司、对钱要求非常准确的公司通常会选择FileChannel

传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

2)FileChannel优化

通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

(1)文件在达到128M时会滚动生成新文件

(2)文件创建超3600秒时会滚动生成新文件

5.2.2 Flume拦截器

由于flume默认会用linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间

1)在com.molly.flume.interceptor包下创建TimeStampInterceptor类

package com.molly.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor &#123;

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() &#123;

    &#125;

    @Override
    public Event intercept(Event event) &#123;

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    &#125;

    @Override
    public List<Event> intercept(List<Event> list) &#123;
        events.clear();
        for (Event event : list) &#123;
            events.add(intercept(event));
        &#125;

        return events;
    &#125;

    @Override
    public void close() &#123;

    &#125;

    public static class Builder implements Interceptor.Builder &#123;
        @Override
        public Interceptor build() &#123;
            return new TimeStampInterceptor();
        &#125;

        @Override
        public void configure(Context context) &#123;
        &#125;
    &#125;
&#125;

2)重新打包

1637739135017

3)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

[molly@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

4)分发Flume到hadoop103、hadoop104

[molly@hadoop102 module]$ xsync flume/

5.2.3 日志消费Flume配置

1)Flume的具体配置如下:

​ (1)在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[molly@hadoop104 conf]$ vim kafka-flume-hdfs.conf

在文件配置如下内容

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.molly.flume.interceptor.TimeStampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

5.2.4 日志消费Flume启动停止脚本

1)在/home/molly/bin目录下创建脚本f2.sh

[molly@hadoop102 bin]$ vim f2.sh

​ 在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac

2)增加脚本执行权限

[molly@hadoop102 bin]$ chmod u+x f2.sh

3)f2集群启动脚本

[molly@hadoop102 module]$ f2.sh start

4)f2集群停止脚本

[molly@hadoop102 module]$ f2.sh stop

5.3 项目经验之Flume内存优化

1)问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed

java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤:

(1)在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS=”-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote”

(2)同步配置到hadoop103、hadoop104服务器

[molly@hadoop102 conf]$ xsync flume-env.sh

3)Flume内存参数设置及优化

JVM heap一般设置为4G或更高

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

5.4 采集通道启动/停止脚本

5.4.1 数据通道测试

根据需求分别生成2020-06-14和2020-06-15日期的数据

1)修改/opt/module/applog/application.yml中业务日期为2020-06-14

#业务日期

mock.date=2020-06-14

2)执行脚本,生成2020-06-14日志数据

[molly@hadoop102 ~]$ lg.sh

3)再次修改/opt/module/applog/application.yml中业务日期2020-06-15

#业务日期

mock.date=2020-06-15

4)执行脚本,生成2020-06-15日志数据

[molly@hadoop102 ~]$ lg.sh

5)在这个期间,不断观察Hadoop的HDFS路径上是否有数据

1637740975327

5.4.2 采集通道启动/停止脚本

1)在/home/molly/bin目录下创建脚本cluster.sh

[molly@hadoop102 bin]$ vim cluster.sh

在脚本中填写如下内容

#!/bin/bash
case $1 in
"start"){
        echo ================== 启动 集群 ==================
        #启动 Zookeeper集群
        zk.sh start
        #启动 Hadoop集群
        hdp.sh start
        #启动 Kafka采集集群
        kf.sh start
        #启动 Flume采集集群
        f1.sh start
        #启动 Flume消费集群
        f2.sh start
        };;
"stop"){
        echo ================== 停止 集群 ==================

        #停止 Flume消费集群
        f2.sh stop
        #停止 Flume采集集群
        f1.sh stop
        #停止 Kafka采集集群
        kf.sh stop
        #停止 Hadoop集群
        hdp.sh stop
        #停止 Zookeeper集群
        zk.sh stop
};;
esac

2)增加脚本执行权限

[molly@hadoop102 bin]$ chmod u+x cluster.sh

3)cluster集群启动脚本

[molly@hadoop102 module]$ cluster.sh start

4)cluster集群停止脚本

[molly@hadoop102 module]$ cluster.sh stop

6 项目2 采集业务数据

6.1 电商业务流程

电商的业务流程可以以一个普通用户的浏览足迹为例进行说明,用户点开电商首页开始浏览,可能会通过分类查询也可能通过全文搜索寻找自己中意的商品,这些商品无疑都是存储在后台的管理系统中的。

当用户寻找到自己中意的商品,可能会想要购买,将商品添加到购物车后发现需要登录,登录后对商品进行结算,这时候购物车的管理和商品订单信息的生成都会对业务数据库产生影响,会生成相应的订单数据和支付数据。

订单正式生成之后,还会对订单进行跟踪处理,直到订单全部完成。

电商的主要业务流程包括用户前台浏览商品时的商品详情的管理,用户商品加入购物车进行支付时用户个人中心&支付服务的管理,用户支付完成后订单后台服务的管理,这些流程涉及到了十几个甚至几十个业务数据表,甚至更多。

6.2 电商常识

6.2.1 SKU和SPU

SKU=Stock Keeping Unit(库存量基本单位)。现在已经被引申为产品统一编号的简称,每种产品均对应有唯一的SKU号。

SPU(Standard Product Unit):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合。

例如:iPhoneX手机就是SPU。一台银色、128G内存的、支持联通网络的iPhoneX,就是SKU。

SPU表示一类商品。同一SPU的商品可以共用商品图片、海报、销售属性等。

6.2.2 平台属性和销售属性

1.平台属性

1637745317255

2.销售属性

1637745332754

6.3 业务数据采集架构

从2.2.2可以看到整体架构;对于需求2 采集业务数据的结构图如下所示:

1637746045465

业务数据库是直接存储到mysql数据库的,我们可以通过sqoop组件使用JDBC将数据传输到HDFS上存储。

sqoop安装教程见:

Hive安装教程见:

6.4 同步策略

数据同步策略的类型包括:全量同步、增量同步、新增及变化同步、特殊情况

Ø 全量表:存储完整的数据。

Ø 增量表:存储新增加的数据。

Ø 新增及变化表:存储新增加的数据和变化的数据。

Ø 特殊表:只需要存储一次。

6.4.1 全量同步策略

1637746273106

6.4.2 增量同步策略

1637746298966

6.4.3 新增及变化策略

1637746311874

6.4.4 特殊策略

某些特殊的表,可不必遵循上述同步策略。

例如没变化的客观世界的数据(比如性别,地区,民族,政治成分,鞋子尺码)可以只存一份。

6.5 业务数据导入HDFS

6.5.1 分析表同步策略

在生产环境,个别小公司,为了简单处理,所有表全量导入。

中大型公司,由于数据量比较大,还是严格按照同步策略导入数据。

1637746489890

6.5.2 业务数据首日同步脚本

1)脚本编写

(1)在/home/atguigu/bin目录下创建

[molly@hadoop102 bin]$ vim mysql_to_hdfs_init.sh

添加如下内容:

#! /bin/bash
APP=gmall
sqoop=/opt/module/sqoop/bin/sqoop
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
   do_date=$2
else 
   echo "请传入日期参数"
   exit
fi 
import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop102:3306/$APP \
--username root \
--password 123456 \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 where \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
  import_data order_info "select
                            id, 
                            total_amount, 
                            order_status, 
                            user_id, 
                            payment_way,
                            delivery_address,
                            out_trade_no, 
                            create_time, 
                            operate_time,
                            expire_time,
                            tracking_no,
                            province_id,
                            activity_reduce_amount,
                            coupon_reduce_amount,                            
                            original_total_amount,
                            feight_fee,
                            feight_fee_reduce      
                        from order_info"
}

6.5.3 项目经验

Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用–input-null-string和–input-null-non-string两个参数。导入数据时采用–null-string和–null-non-string。

6.6 Hive进行业务数据访问

Hive安装教程详见:https://m01ly.github.io/2020/11/14/bigdata-hive1/

后面就是通过Hive去对数据进行初步的分析。

文章目录
  1. 1 数仓概念
  2. 2 项目需求及架构设计
    1. 2.1 项目需求分析
    2. 2.2 项目框架
      1. 2.2.1 技术选型
      2. 2.2.2 系统数据流程设计
      3. 2.2.3 框架版本选型
      4. 2.2.4 服务器选型
      5. 2.2.5 集群资源规划设计
  3. 3 数据生成模块
    1. 3.1 目标数据
      1. 3.1.1 页面
      2. 3.1.2 事件
      3. 3.1.3 曝光
      4. 3.1.4 启动
      5. 3.1.5 错误
    2. 3.2 数据埋点
      1. 3.2.1 主流埋点方式(了解)
      2. 3.2.2 埋点数据日志结构
      3. 3.2.3 埋点数据上报时机
    3. 3.3 服务器和JDK准备
      1. 3.3.1 服务器准备
      2. 3.3.2 阿里云服务器准备(可选)
      3. 3.3.3 JDK准备
      4. 3.3.4 环境变量配置说明
    4. 3.4 模拟数据
      1. 3.4.1 使用说明
      2. 3.4.2 集群日志生成脚本
  4. 4 数据采集模块
    1. 4.1 集群所有进程查看脚本
    2. 4.2 Hadoop安装
      1. 4.2.1 项目经验之HDFS存储多目录
      2. 4.2.2 集群数据均衡
      3. 4.2.3 项目经验之支持LZO压缩配置
      4. 4.2.4 测试LZO
      5. 4.2.4 项目经验之LZO创建索引
      6. 4.2.5 项目经验之基准测试
      7. 4.2.6 项目经验之Hadoop参数调优
    3. 4.3 Zookeeper安装
      1. 4.3.1 安装ZK
      2. 4.3.2 ZK集群启动停止脚本
    4. 4.4 Kafka安装
      1. 4.4.1 Kafka集群安装
      2. 4.4.2 Kafka集群启动停止脚本
      3. 4.4.3 Kafka常用命令
      4. 4.4.4 项目经验之Kafka压力测试
      5. 4.4.5 项目经验之Kafka机器数量计算
      6. 4.4.6 项目经验值Kafka分区数计算
  5. 5 项目1 采集用户行为数据
    1. 5.1 第一层flume采集
      1. 5.1.1 项目经验之Flume组件选型
      2. 5.1.2 日志采集Flume配置
      3. 5.1.3 Flume拦截器
      4. 5.1.4 日志采集Flume启动停止脚本
    2. 5.2 第二层flume采集
      1. 5.2.1 项目经验之Flume组件选型
      2. 5.2.2 Flume拦截器
      3. 5.2.3 日志消费Flume配置
      4. 5.2.4 日志消费Flume启动停止脚本
    3. 5.3 项目经验之Flume内存优化
    4. 5.4 采集通道启动/停止脚本
      1. 5.4.1 数据通道测试
      2. 5.4.2 采集通道启动/停止脚本
  6. 6 项目2 采集业务数据
    1. 6.1 电商业务流程
    2. 6.2 电商常识
      1. 6.2.1 SKU和SPU
      2. 6.2.2 平台属性和销售属性
    3. 6.3 业务数据采集架构
    4. 6.4 同步策略
      1. 6.4.1 全量同步策略
      2. 6.4.2 增量同步策略
      3. 6.4.3 新增及变化策略
      4. 6.4.4 特殊策略
    5. 6.5 业务数据导入HDFS
      1. 6.5.1 分析表同步策略
      2. 6.5.2 业务数据首日同步脚本
      3. 6.5.3 项目经验
    6. 6.6 Hive进行业务数据访问