朴素贝叶斯算法

1、摘要

在所有的机器学习分类算法中,朴素贝叶斯和其他绝大多数的分类算法都不同。对于大多数的分类算法,比如决策树,KNN,逻辑回归,支持向量机等,他们都是判别方法,也就是直接学习出特征输出Y和特征X之间的关系,要么是决策函数Y=f(X),要么是条件分布P(Y|X)。但是朴素贝叶斯却是生成方法,也就是直接找出特征输出Y和特征X的联合分布P(X,Y),然后用P(Y|X)=P(X,Y)/P(X)得出。

2、分类问题综述

对于分类问题,其实谁都不会陌生,说我们每个人每天都在执行分类操作一点都不夸张,只是我们没有意识到罢了。例如,当你看到一个陌生人,你的脑子下意识判断TA是男是女;你可能经常会走在路上对身旁的朋友说“这个人一看就很有钱、那边有个非主流”之类的话,其实这就是一种分类操作。

从数学角度来说,分类问题可做如下定义:

已知集合:C0bdB9.pngC0bw7R.png,确定映射规则y=f(x),使得任意C0bDtx.png有且仅有一个C0b61O.png使得C0bRnH.png成立。(不考虑模糊数学里的模糊集情况)

其中C叫做类别集合,其中每一个元素是一个类别,而I叫做项集合,其中每一个元素是一个待分类项,f叫做分类器。分类算法的任务就是构造分类器f。

这里要着重强调,分类问题往往采用经验性方法构造映射规则,即一般情况下的分类问题缺少足够的信息来构造100%正确的映射规则,而是通过对经验数据的学习从而实现一定概率意义上正确的分类,因此所训练出的分类器并不是一定能将每个待分类项准确映射到其分类,分类器的质量与分类器构造方法、待分类数据的特性以及训练样本数量等诸多因素有关。

例如,医生对病人进行诊断就是一个典型的分类过程,任何一个医生都无法直接看到病人的病情,只能观察病人表现出的症状和各种化验检测数据来推断病情,这时医生就好比一个分类器,而这个医生诊断的准确率,与他当初受到的教育方式(构造方法)、病人的症状是否突出(待分类数据的特性)以及医生的经验多少(训练样本数量)都有密切关系。

3、贝叶斯分类的基础——贝叶斯定理

每次提到贝叶斯定理,我心中的崇敬之情都油然而生,倒不是因为这个定理多高深,而是因为它特别有用。这个定理解决了现实生活里经常遇到的问题:已知某条件概率,如何得到两个事件交换后的概率,也就是在已知P(A|B)的情况下如何求得P(B|A)。这里先解释什么是条件概率:

P(A|B)表示事件B已经发生的前提下,事件A发生的概率,叫做事件B发生下事件A的条件概率。其基本求解公式为:C0HteA.png

贝叶斯定理之所以有用,是因为我们在生活中经常遇到这种情况:我们可以很容易直接得出P(A|B),P(B|A)则很难直接得出,但我们更关心P(B|A),贝叶斯定理就为我们打通从P(A|B)获得P(B|A)的道路。

下面不加证明地直接给出贝叶斯定理:C0HsyQ.png

4、朴素贝叶斯分类的原理与流程

朴素贝叶斯分类是一种十分简单的分类算法,叫它朴素贝叶斯分类是因为这种方法的思想真的很朴素,朴素贝叶斯的思想基础是这样的:对于给出的待分类项,求解在此项出现的条件下各个类别出现的概率,哪个最大,就认为此待分类项属于哪个类别。通俗来说,就好比这么个道理,你在街上看到一个黑人,我问你你猜这哥们哪里来的,你十有八九猜非洲。为什么呢?因为黑人中非洲人的比率最高,当然人家也可能是美洲人或亚洲人,但在没有其它可用信息下,我们会选择条件概率最大的类别,这就是朴素贝叶斯的思想基础。

朴素贝叶斯分类的正式定义如下:
C0LpdA.png

那么现在的关键就是如何计算第3步中的各个条件概率。我们可以这么做:

1、找到一个已知分类的待分类项集合,这个集合叫做训练样本集。

2、统计得到在各类别下各个特征属性的条件概率估计。即C0LiJP.md.png

3、如果各个特征属性是条件独立的,则根据贝叶斯定理有如下推导:C0Lkz8.png

因为分母对于所有类别为常数,因为我们只要将分子最大化皆可。又因为各特征属性是条件独立的,所以有:

C0LVsg.png

根据上述分析,朴素贝叶斯分类的流程可以由下图表示(暂时不考虑验证):

C0Lnds.md.png

可以看到,整个朴素贝叶斯分类分为三个阶段:

第一阶段——准备工作阶段,这个阶段的任务是为朴素贝叶斯分类做必要的准备,主要工作是根据具体情况确定特征属性,并对每个特征属性进行适当划分,然后由人工对一部分待分类项进行分类,形成训练样本集合。这一阶段的输入是所有待分类数据,输出是特征属性和训练样本。这一阶段是整个朴素贝叶斯分类中唯一需要人工完成的阶段,其质量对整个过程将有重要影响,分类器的质量很大程度上由特征属性、特征属性划分及训练样本质量决定。

第二阶段——分类器训练阶段,这个阶段的任务就是生成分类器,主要工作是计算每个类别在训练样本中的出现频率及每个特征属性划分对每个类别的条件概率估计,并将结果记录。其输入是特征属性和训练样本,输出是分类器。这一阶段是机械性阶段,根据前面讨论的公式可以由程序自动计算完成。

第三阶段——应用阶段。这个阶段的任务是使用分类器对待分类项进行分类,其输入是分类器和待分类项,输出是待分类项与类别的映射关系。这一阶段也是机械性阶段,由程序完成。

5、朴素贝叶斯分类实例:检测SNS社区中不真实账号

下面讨论一个使用朴素贝叶斯分类解决实际问题的例子,为了简单起见,对例子中的数据做了适当的简化。

这个问题是这样的,对于SNS社区来说,不真实账号(使用虚假身份或用户的小号)是一个普遍存在的问题,作为SNS社区的运营商,希望可以检测出这些不真实账号,从而在一些运营分析报告中避免这些账号的干扰,亦可以加强对SNS社区的了解与监管。

如果通过纯人工检测,需要耗费大量的人力,效率也十分低下,如能引入自动检测机制,必将大大提升工作效率。这个问题说白了,就是要将社区中所有账号在真实账号和不真实账号两个类别上进行分类,下面我们一步一步实现这个过程。

首先设C=0表示真实账号,C=1表示不真实账号。

5.1、确定特征属性及划分

这一步要找出可以帮助我们区分真实账号与不真实账号的特征属性,在实际应用中,特征属性的数量是很多的,划分也会比较细致,但这里为了简单起见,我们用少量的特征属性以及较粗的划分,并对数据做了修改。

我们选择三个特征属性:a1:日志数量/注册天数,a2:好友数量/注册天数,a3:是否使用真实头像。在SNS社区中这三项都是可以直接从数据库里得到或计算出来的。

下面给出划分:a1:{a<=0.05, 0.05<a<0.2, a>=0.2},a1:{a<=0.1, 0.1<a<0.8, a>=0.8},a3:{a=0(不是),a=1(是)}。

5.2、获取训练样本

这里使用运维人员曾经人工检测过的1万个账号作为训练样本。

5.3、计算训练样本中每个类别的频率

用训练样本中真实账号和不真实账号数量分别除以一万,得到:

C0LwJx.png

5.4、计算每个类别条件下各个特征属性划分的频率

C0LDSK.png

5.5、使用分类器进行鉴别

下面我们使用上面训练得到的分类器鉴别一个账号,这个账号使用非真实头像,日志数量与注册天数的比率为0.1,好友数与注册天数的比率为0.2。

C0LceH.png

可以看到,虽然这个用户没有使用真实头像,但是通过分类器的鉴别,更倾向于将此账号归入真实账号类别。这个例子也展示了当特征属性充分多时,朴素贝叶斯分类对个别属性的抗干扰性。

6、朴素贝叶斯分类实例:性别分类的例子

本例摘自维基百科,关于处理连续变量的另一种方法。

下面是一组人类身体特征的统计资料。

  性别  身高(英尺) 体重(磅)  脚掌(英寸)

  男    6       180     12
  男    5.92     190     11
  男    5.58     170     12
  男    5.92     165     10
  女    5       100     6
  女    5.5      150     8
  女    5.42     130     7
  女    5.75     150     9

已知某人身高6英尺、体重130磅,脚掌8英寸,请问该人是男是女?

根据朴素贝叶斯分类器,计算下面这个式子的值。

P(身高|性别) x P(体重|性别) x P(脚掌|性别) x P(性别)

这里的困难在于,由于身高、体重、脚掌都是连续变量,不能采用离散变量的方法计算概率。而且由于样本太少,所以也无法分成区间计算。怎么办?

这时,可以假设男性和女性的身高、体重、脚掌都是正态分布,通过样本计算出均值和方差,也就是得到正态分布的密度函数。有了密度函数,就可以把值代入,算出某一点的密度函数的值。

比如,男性的身高是均值5.855、方差0.035的正态分布。所以,男性的身高为6英尺的概率的相对值等于1.5789(大于1并没有关系,因为这里是密度函数的值,只用来反映各个值的相对可能性)。
C0LhfP.png

有了这些数据以后,就可以计算性别的分类了。

  P(身高=6|男) x P(体重=130|男) x P(脚掌=8|男) x P(男)
    = 6.1984 x e-9

  P(身高=6|女) x P(体重=130|女) x P(脚掌=8|女) x P(女)
    = 5.3778 x e-4

可以看到,女性的概率比男性要高出将近10000倍,所以判断该人为女性。

7、分类器的评价

虽然后续还会提到其它分类算法,不过这里我想先提一下如何评价分类器的质量。

首先要定义,分类器的正确率指分类器正确分类的项目占所有被分类项目的比率。

通常使用回归测试来评估分类器的准确率,最简单的方法是用构造完成的分类器对训练数据进行分类,然后根据结果给出正确率评估。但这不是一个好方法,因为使用训练数据作为检测数据有可能因为过分拟合而导致结果过于乐观,所以一种更好的方法是在构造初期将训练数据一分为二,用一部分构造分类器,然后用另一部分检测分类器的准确率。

HIVE 总结

HIVE 总结:

内部表(管理表)

内部表也称之为 MANAGED_TABLE;默认存储在/user/hive/warehouse下,也可以通过location指定;
**重点**:删除表时也会删除元数据和数据本身

外部表(托管表)

外部表也称之为 EXTERNAL_TABLE;在创建表时可以自己指定目录位置(location);
**重点**:删除表时只会删除元数据不会删除数据本身

分区表:

分区表实际上就是对应一个hdfs文件系统上的独立的文件夹,该文件夹下是该分区的所有数据文件。
hive中的分区就是分目录,把大的数据集根据业务分割成更小的数据集。
**作用**:在查询时通过where子句中的表达式来选择查询所需要的指定分区,这样的查询效率会提高很高。
分区表的修复:
面试题:
    方法一[msck repair table emp]:加入创建了一个hive分区表,然后用hdfs的方式去-mkdir分区字段的目录,然后向这个目录下-put数据文件,然后去select,会显示表的内容为空,但是数据在的,这个时候查看元数据的partitions里面是没有这个分区字段的,此时可以用 **msck repair table emp** 命令来进行修复后就可以查到内容
    方法二[alter table emp add partition(day = 15)]

hive中的高级查询:

1. order by  ---> 对全局数据的排序,仅仅只有一个reduce ***数据量比较大的时候慎用!!!
2. sort by --->对每一个reduce内部数据进行排序,对于全局结果集来说不是排序 使用前先设置reduce的个数:set mapreduce.job.reduces = 3
3. distribute by ---> 作用就是分区(partition) ,类似于MapReduce中分区partition,对数据进行分区,结合sort by进行使用 如:select * from emp distribute by depno sort by empno asc;
    注意:distribute by 要放在 sort by 前面[先分区再排序]
4.cluster by ---> 当distribute by 和 sort by字段相同时可以使用cluster by

UDF【user defined function】:

hive自带了很多UDF,如:max、min、split,但是往往在开发中不能满足我们的也无需求
UDF 一进一出;         
UDAF(aggregation) 多进一出;  
UDTF(table-generating) 一进多出

UDF开发步骤:

1.继承hive.ql.UDF类
2.实现一个或多个evaluate方法 即:重载
3.把程序打成jar包 
4. [jar包在本地]: 
   4.11 add jar /xxx/xxx.jar 
   4.12 create temporary function funcname as "类的全路径" 
   [jar包在hdfs] 
   4.2 create function funcname as "类的全路径" using jar 'hdfs://xxx/xx.jar'

UDF开发注意事项:

1.UDF必须要有返回值,不能为void,且可以返回null
2.UDF中常用Text/LongWritable等类型,不推荐使用java类型,因为hive底层是用MapReduce,而mr的实现里面都是他自己独有的可序列化类型

Hive的优化:

优化1.数据压缩 [使用snappy格式] **snappy是谷歌开源的

作用:数据量小,减少网络IO流
hive底层还是走的MapReduce,也就是压缩mr阶段的数据;
具体--> 1.1 CompressedInput上传之前进行压缩;  然后客户端进行InputSplit分片,这个阶段不需要配置,因为如果是压缩后的snappy文件,map会自动进行解压
       1.2map接收数据后进行DecompressInput[解压缩]对数据进行处理
       1.3 map阶段SpillToDisk的时候进行压缩 
               配置:mapreduce.map.output.compress=true;
    mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Defaul tCodec
       1.4 reduce 来map的disk取数据后进行解压缩
       1.5 reduce 进行数据汇总 然后 进行压缩 输出[CompressReduceOutput]
       配置:mapreduce.output.fileoutputformat.compress=true;
    mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Defaul tCodec

优化2.数据的存储格式【storage format】:列式存储的Parquet是Apache的顶级项目

我们一般select name,age from table group by xx..都是查询的列,如果读取文件时按照列来读取,那么效率会高很多
SequenceFile、TextFile:按行存储数据
ParquetFile、ORCFile :按列存储数据 【ORC里面是 strip index,索引查找】
**重要**测试显示存储在hdfs上的相同文件 create table name(...) stored as orc;和 create...stored as textfile;文件大小比例:orc : text = 1:7 大大的减少了存储的数据大小
小结:1.orc和parquet的存储格式使得存储相同的数据占得空间变小,尤其是orc
     2.而列式存储相对于text、sequence的行式存储查询也是很占优势的
以上两点:压缩+数据存储格式 两者结合效果更好
create table page_views_orc_snappy(...) row format ... stored as orc tblproperties ('orc.compress'='SNAPPY');
insert into table page_views_orc_snappy select * from page_views;
//注释:这个插入语句会执行三个job: 查询page_views + 转化为orc格式 + snappy compress[压缩]

总结:

在实际的项目开发中,hive表的数据
    *存储格式
        orcfile / parquet
    *数据压缩
        snappy

优化3.Fetch-Task 值改为 more

存在的问题分析:select * from t 不会走mr,select name from t 就会走mr,为什么?
因为:
    在hive-default.xml里面有个<property>是hive.fetch.task.conversion = minimal
[默认的--只有三种情况下不会走MapReduce:1.select * from t 2.select * from t where partitionname = xx 3.select * from t limit n]
我们可以在hive-site.xml里配置这个属性的值为 more 

优化4.hive高级优化

1.大表拆成子表 
    create table table1... AS select * from table2
    比如订单表,都是几百列,当我们不同需求的时候只用一些字段的时候,就到大表创建的字表里取查询,这样就比较快
2.外部表 + 分区表   [因为有可能多个部门同时在用这些数据] + [一般二级分区表较多--月/日]
  create internal table name ... partitioned by (month string,day string) row format delimited...
3.数据存储格式 + 压缩   [orcfile/parquetfile   +   snappy]

    set parquet.compress = snappy;
    create table order_parquet_snappy(...) 
    row format ... 
    stored as parquet 
    as select * from order;
4.SQL语句的优化

优化5.数据倾斜问题:

Join:
    1.Shuffle/Reduce/Commen Join
        连接发生的阶段是 Reduce Task
        一般是大表对大表
        每个表的数据都是从文件中读取的
    2.Map Join
        连接发生在 Map Task
        一般是小表对大表
        大表的数据从文件中读取,小表的数据从内存中读取
        用到了 DistributedCache,把小表缓存到了个个节点的内存中
    3.SMB Join   [Sort + Merge + Bucket]
        一般在大公司里会用到,其实就是对reduce join的优化,因为两个大表都特别大,那么会吃不消的
        set hive.enforce.bucketing = true;
        set mapreduce.job.reduces = 4  [举例]
        create table tablename(...) clustered by (cid) into 4 buckets row format ...
        load data inpath 'xxx' into ...实质上是 hdfs dfs -put xxx 不会走MapReduce的
        所以插入分桶表bucket需要用 insert into table table_bucket select * from xxx cluster by (cid),这样才会走mr
    4.group by 和 count(distinct)容易造成数据倾斜 //todo:待解决

hdfs dfs -du -h /root/xxx 是看目录或者文件有多大

Markdown 编辑阅读器

欢迎使用 Cmd Markdown 编辑阅读器


nihao

nihao

nihao

232

nidshfisdfjksdfjksdf
dsfjaksdfj df
&dfsdf
nihao

CY82sH.md.png

我们理解您需要更便捷更高效的工具记录思想,整理笔记、知识,并将其中承载的价值传播给他人,Cmd Markdown 是我们给出的答案 —— 我们为记录思想和分享知识提供更专业的工具。 您可以使用 Cmd Markdown:

  • 整理知识,学习笔记
  • 发布日记,杂文,所见所想
  • 撰写发布技术文稿(代码支持)
  • 撰写发布学术论文(LaTeX 公式支持)

cmd-markdown-logo

除了您现在看到的这个 Cmd Markdown 在线版本,您还可以前往以下网址下载:

Windows/Mac/Linux 全平台客户端

请保留此份 Cmd Markdown 的欢迎稿兼使用说明,如需撰写新稿件,点击顶部工具栏右侧的 新文稿 或者使用快捷键 Ctrl+Alt+N


什么是 Markdown

Markdown 是一种方便记忆、书写的纯文本标记语言,用户可以使用这些标记符号以最小的输入代价生成极富表现力的文档:譬如您正在阅读的这份文档。它使用简单的符号标记不同的标题,分割不同的段落,粗体 或者 斜体 某些文字,更棒的是,它还可以

1. 制作一份待办事宜 Todo 列表

  • 支持以 PDF 格式导出文稿
  • 改进 Cmd 渲染算法,使用局部渲染技术提高渲染效率
  • 新增 Todo 列表功能
  • 修复 LaTex 公式渲染问题
  • 新增 LaTex 公式编号功能

2. 书写一个质能守恒公式[^LaTeX]

$$E=mc^2$$

3. 高亮一段代码[^code]

1
2
3
4
5
6
7
@requires_authorization
class SomeClass:
pass

if __name__ == '__main__':
# A comment
print 'hello world'

4. 高效绘制 流程图

1
2
3
4
5
6
7
8
st=>start: Start
op=>operation: Your Operation
cond=>condition: Yes or No?
e=>end

st->op->cond
cond(yes)->e
cond(no)->op

5. 高效绘制 序列图

1
2
3
Alice->Bob: Hello Bob, how are you?
Note right of Bob: Bob thinks
Bob-->Alice: I am good thanks!

6. 高效绘制 甘特图

1
2
3
4
5
6
7
8
9
10
11
12
13
title 项目开发流程
section 项目确定
需求分析 :a1, 2016-06-22, 3d
可行性报告 :after a1, 5d
概念验证 : 5d
section 项目实施
概要设计 :2016-07-05 , 5d
详细设计 :2016-07-08, 10d
编码 :2016-07-15, 10d
测试 :2016-07-22, 5d
section 发布验收
发布: 2d
验收: 3d

7. 绘制表格

项目 价格 数量
计算机 \$1600 5
手机 \$12 12
管线 \$1 234

8. 更详细语法说明

想要查看更详细的语法说明,可以参考我们准备的 Cmd Markdown 简明语法手册,进阶用户可以参考 Cmd Markdown 高阶语法手册 了解更多高级功能。

总而言之,不同于其它 所见即所得 的编辑器:你只需使用键盘专注于书写文本内容,就可以生成印刷级的排版格式,省却在键盘和工具栏之间来回切换,调整内容和格式的麻烦。Markdown 在流畅的书写和印刷级的阅读体验之间找到了平衡。 目前它已经成为世界上最大的技术分享网站 GitHub 和 技术问答网站 StackOverFlow 的御用书写格式。


什么是 Cmd Markdown

您可以使用很多工具书写 Markdown,但是 Cmd Markdown 是这个星球上我们已知的、最好的 Markdown 工具——没有之一 :)因为深信文字的力量,所以我们和你一样,对流畅书写,分享思想和知识,以及阅读体验有极致的追求,我们把对于这些诉求的回应整合在 Cmd Markdown,并且一次,两次,三次,乃至无数次地提升这个工具的体验,最终将它演化成一个 编辑/发布/阅读 Markdown 的在线平台——您可以在任何地方,任何系统/设备上管理这里的文字。

1. 实时同步预览

我们将 Cmd Markdown 的主界面一分为二,左边为编辑区,右边为预览区,在编辑区的操作会实时地渲染到预览区方便查看最终的版面效果,并且如果你在其中一个区拖动滚动条,我们有一个巧妙的算法把另一个区的滚动条同步到等价的位置,超酷!

2. 编辑工具栏

也许您还是一个 Markdown 语法的新手,在您完全熟悉它之前,我们在 编辑区 的顶部放置了一个如下图所示的工具栏,您可以使用鼠标在工具栏上调整格式,不过我们仍旧鼓励你使用键盘标记格式,提高书写的流畅度。

tool-editor

3. 编辑模式

完全心无旁骛的方式编辑文字:点击 编辑工具栏 最右侧的拉伸按钮或者按下 Ctrl + M,将 Cmd Markdown 切换到独立的编辑模式,这是一个极度简洁的写作环境,所有可能会引起分心的元素都已经被挪除,超清爽!

4. 实时的云端文稿

为了保障数据安全,Cmd Markdown 会将您每一次击键的内容保存至云端,同时在 编辑工具栏 的最右侧提示 已保存 的字样。无需担心浏览器崩溃,机器掉电或者地震,海啸——在编辑的过程中随时关闭浏览器或者机器,下一次回到 Cmd Markdown 的时候继续写作。

5. 离线模式

在网络环境不稳定的情况下记录文字一样很安全!在您写作的时候,如果电脑突然失去网络连接,Cmd Markdown 会智能切换至离线模式,将您后续键入的文字保存在本地,直到网络恢复再将他们传送至云端,即使在网络恢复前关闭浏览器或者电脑,一样没有问题,等到下次开启 Cmd Markdown 的时候,她会提醒您将离线保存的文字传送至云端。简而言之,我们尽最大的努力保障您文字的安全。

6. 管理工具栏

为了便于管理您的文稿,在 预览区 的顶部放置了如下所示的 管理工具栏

tool-manager

通过管理工具栏可以:

发布:将当前的文稿生成固定链接,在网络上发布,分享
新建:开始撰写一篇新的文稿
删除:删除当前的文稿
导出:将当前的文稿转化为 Markdown 文本或者 Html 格式,并导出到本地
列表:所有新增和过往的文稿都可以在这里查看、操作
模式:切换 普通/Vim/Emacs 编辑模式

7. 阅读工具栏

tool-manager

通过 预览区 右上角的 阅读工具栏,可以查看当前文稿的目录并增强阅读体验。

工具栏上的五个图标依次为:

目录:快速导航当前文稿的目录结构以跳转到感兴趣的段落
视图:互换左边编辑区和右边预览区的位置
主题:内置了黑白两种模式的主题,试试 黑色主题,超炫!
阅读:心无旁骛的阅读模式提供超一流的阅读体验
全屏:简洁,简洁,再简洁,一个完全沉浸式的写作和阅读环境

8. 阅读模式

阅读工具栏 点击 或者按下 Ctrl+Alt+M 随即进入独立的阅读模式界面,我们在版面渲染上的每一个细节:字体,字号,行间距,前背景色都倾注了大量的时间,努力提升阅读的体验和品质。

9. 标签、分类和搜索

在编辑区任意行首位置输入以下格式的文字可以标签当前文档:

标签: 未分类

标签以后的文稿在【文件列表】(Ctrl+Alt+F)里会按照标签分类,用户可以同时使用键盘或者鼠标浏览查看,或者在【文件列表】的搜索文本框内搜索标题关键字过滤文稿,如下图所示:

file-list

10. 文稿发布和分享

在您使用 Cmd Markdown 记录,创作,整理,阅读文稿的同时,我们不仅希望它是一个有力的工具,更希望您的思想和知识通过这个平台,连同优质的阅读体验,将他们分享给有相同志趣的人,进而鼓励更多的人来到这里记录分享他们的思想和知识,尝试点击 (Ctrl+Alt+P) 发布这份文档给好友吧!


再一次感谢您花费时间阅读这份欢迎稿,点击 (Ctrl+Alt+N) 开始撰写新的文稿吧!祝您在这里记录、阅读、分享愉快!

作者 @ghosert
2016 年 07月 07日

[^LaTeX]: 支持 LaTeX 编辑显示支持,例如:$\sum_{i=1}^n a_i=0$, 访问 MathJax 参考更多使用方法。

[^code]: 代码高亮功能支持包括 Java, Python, JavaScript 在内的,四十一种主流编程语言。

nihao

Hive 数据倾斜解决方案

Hive 数据倾斜解决方案:

1.调节参数
    hive.map.aggr=true  Map端部分聚合,相当于combiner
    hive.groupby.skewindata=true
        有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询计划会有两个mr job,
        第一个job中,map的输出结果集合会随机分不到reduce中,每个reduce做部分聚合操作,
        并输出结果,这样的处理结果是相同的groupByKey会分到不同的reduce中,从而达到敷在君更的目的;
        第二个job再根据预处理的数据结果按照GroupByKey分不到redcue,最终完成聚合
2. 小表和大表进行join操作
    使用map join让小的维度表(1000条以下的记录数)先进内存[distributedCache],在map端完成reduce

3.1 空值产生的数据倾斜
    赋予空值新的key值
        select *
        from log a
        left join users b
        on
        case when 
                a.user_id is null 
                then 
                concat('hive',rand())
                else a.user_id
                end = b.user_id
    好处:这个方法只有一个job,把空值的key变成了字符串加上随机数,就能把倾斜的数据分到不同的reduce上,
    解决数据倾斜问题。
3.2 不同数据类型关联产生数据倾斜
    比如 用户表user_id为int类型,log表中的user_id 字段既有int也有string类型,
    当按照user_id进行两个表的join操作时,默认的hash操作会按照int类型的id来进行分配,
    这样就会导致所有string类型的id记录都分配到一个reducer中
    解决:把数字类型转换成字符串类型
        select * 
        from 
            users a
        left outer join logs b
        on
        a.user_id = cast(b.user_id as string)
3.3 users表有600w+的记录,把users分发到所有的map也是不小的开销,而且map join不支持这么大的小表,
如果用普通的join,又会碰到数据倾斜的问题
    解决:
        select * from log a
            left outer join 
                (
                    select d.* from (select distinct user_id from log) c
                    join users d
                    on c.user_id = d.user_id
                ) x
            on a.user_id = x.user_id;

spark解决数据倾斜:

1.增加并行度,也就是增加task的个数,可以缓解数据倾斜 这样可以将分配到统一task上的key散开,
2.自定义 partition 默认是 HashPartition,这样可以将不同的key分配到多个task上,
但是也只是缓解,而且也不灵活,不能解决同一个key数据量很好的场景
3.将reduce端join变成map端join -- broadcast 
    优势:
        避免了shuffle,彻底解除了数据倾斜
    劣势:
        要求join的一侧数据集合足够小,适用于join,不适用于聚合
4.在数据倾斜的key前面加前缀,让这个key分不到不同的task中
    将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,
    另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N    倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。
    最后将两次Join的结果集通过union合并,即可得到全部Join结果。

随笔

复习知识点:
spar全部知识点,kafka,hdfs,hive及预处理,azkaban,hbase(会搭集群,roykey,会设计建表,有哪些特性),redis,zookeeper java基础(罗海清脑图复习)

在hive中:
select * from 表名:
是查询该表名的所有字段记录

describe formatted 表名:
是查看该表的详细信息,而并查看不了表中的数据

十大经典数据挖掘算法之一:
kmeans
7、cos余弦相似度和欧式距离的区

solid converter PDF软件下载破解
Name: SolidConverterPDFv9
E-mail: user@ru.ru
Organization: any
Unlock Code: KTGK

HTTP 协议中 URI 和 URL 有什么区别?
统一资源标志符URI就是在某一规则下能把一个资源独一无二地标识出来
统一资源定位符URL同样标识出了唯一的一个人,起到了URI的作用,所以URL是URI的子集
可以用身份证号是uri(包含了url)可以确定一个人,而地址也可以确定一个人

本质而言Kylin麒麟系统就是Ubuntu 13.04

艾维奇电音大师:
代表之作:”wake me up” “levels” “x you” “a sky full of stars” “lay me down”

查看某个后台进程:
ps aux | grep redis

查看所有正在使用的端口:
netstat -ntlp

idea破解教程:http://blog.csdn.net/qq_38637558/article/details/78914772

http://tlias-stu.boxuegu.com/#/index
博学谷

mkdir -p /export/server
rm -rf jdk-8u65-linux-x64.tar.gz
mv zookeeper-3.4.5 zookeeper

js自调用匿名函数:
(function(){})();

shell命令:
-p 表示递归
-f 表示覆盖原有文件目录
-w 表示写的命令

有时间研究一下matlab
脱敏

有时间研究一下在简书,csdn,51cto,主要是github和脸书发表代码和作品的流程,还有有时间玩下阿里,腾讯的服务器,买个域名练练手熟悉一下。

菜鸟教程
http://www.runoob.com/linux/linux-tutorial.html
http://www.runoob.com/mysql/mysql-tutorial.html

https://www.csdn.net/nav/cloud
http://blog.csdn.net/superzyl

在线json生成java实体类
https://www.bejson.com/

https://www.cnblogs.com/aipan/p/7770611.html

启用hadoop本地模式:
在hive中设置(常用)
set hive.exec.mode.local.auto=true;

boss直聘、
拉钩

解决面试题中的脑筋急转弯
小袁搜题
作业帮

自古评论出奇才,
内涵佳句随口来。
若是生在隋唐代,
哪有诗仙李太白?

内事问百度,外事问谷歌,床事问天涯,绿事问虎扑”

C:\Windows\System32\drivers\etc hosts目录

31773766
31773767

http://blog.csdn.net/superzyl
周老师的CSDN博客,里面有老师总结的50列sql面试题,面试前做一做

hourenren 13:52:23
在定义变量的时候是引用,如int &a = b; a为b的一个引用
在表达式中为取地址如int *a = &b; a位指向b整型的一个指针
hourenren 14:15:41
https://ideone.com/4okUHi
hourenren 14:16:07
在线编译C++代码

八爪鱼爬虫

datagrip 智能sql编辑器

https://www.cnblogs.com/zhangyinhua/p/8037599.html
jsoup文档讲解

https://www.boxuegu.com/course/free/
redis免费学习面试热点

https://www.cnblogs.com/lizichao1991/p/7809156.html

http://blog.csdn.net/yao970953039/article/details/62047755

Intellij IDEA 2017 debug断点调试技巧与总结详解篇
http://blog.csdn.net/qq_27093465/article/details/64124330

import scala.collection.mutable.Map
scala导包

https://www.iteblog.com/archives/1542.html
定时在线激活idea

在线pdf转word网站
http://app.xunjiepdf.com/pdf2word

乔布简历,一个挺不错的简历样式模板网站

快递单号:
0491 2969 6061

某个zookeeper挂掉了,解决方案:
cd /export/data/zkdata
rm -rf v zoo
再重新启动zookeeper即可

linux中\转义空格
///两个//转义/,目录

农历2月12号 爸爸
农历4月初九 妈妈
农历四月一号 姐姐

13787692647

http://my.tv.sohu.com/us/254995980/79868928.shtml
hadoop年薪23万学员分享面试经验

万里面试
kafka 和spark 都会问的很多
数据库ods 层数据怎么清楚?

面试常见问题:
链接:https://pan.baidu.com/s/1hIWUx01oOcilf2O_p_Fg3Q?密码:7z7s

搭建外网服务器网站:
https://www.vultr.com/?ref=7323908
用crt连上后,依次输入以下三条命令:

wget –no-check-certificate -O shadowsocks-all.sh https://raw.githubusercontent.com/teddysun/shadowsocks_install/master/shadowsocks-all.sh

chmod +x shadowsocks-all.sh

./shadowsocks-all.sh 2>&1 | tee shadowsocks-all.log

图像识别物体
https://github.com/AlexeyAB/darknet

leetcode 刷题网站

l kafka删除 topic
bin/kafka-topics.sh –delete –zookeeper zk01:2181 –topic test
需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除或者直接重启。

你可以通过命令:./bin/kafka-topics –zookeeper 【zookeeper server】 –list 来查看所有topic

C:\Users\RongYue.jupyter\jupyter_notebook_config.py

kill和kill -9 和区别(原理)
有时候我们使用kill无法杀掉一个进程,但是用kill -9却可以,kill的作用是向进程发送一个信号(并没有说是杀掉进程哈)。具体发送什么信号由后面接的参数决定。
kill默认参数是TERM。也就是说,如果没指定具体的信号作为参数,则默认使用kill TERM pid。因此kill pid是可以杀掉一个进程
大多数信号可以被捕获的。而TERM信号就是在这个大多数里的,一些进程可能为了特殊的用途捕获了TERM信号,导致了你使用kill pid时无法杀掉进程。 另外《APUE》中也强调了,有两个信号不能被捕获,SIGKILL 和SIGSTOP
没错,kill -9 就是向进程发送SIGKILL信号

spark 的几种运行模式
spark的yarn管理资源不够用了怎么办
azkaban任务提交我只修改任务里的一部分,如何避免每次都上传zip包
shuffle的排序算法 归并排序
azkaban调度失败了该怎么检查和恢复,dug的思路
spark运行内存不够会发生什么,如何解决
你们集群中hdfs和yarn的使用率是多少
hive的调优,数据量大的情况和不仅仅数据倾斜

有没有做过cdh的升级和改造,cdh的版本
集群部署规划情况,内存,大小,磁盘,
工作中接触的数据量大小,我答500–1g

思维题,1亿个用户每个用户都有一个动态的打分值(1–100),不用排序,如果快速知道排名前十的用户是谁

JVM总结相关笔记

JVM

JVM的参数设置

1. -Xms 初始堆大小  -Xmx 最大堆大小  [一般这两个值设置的是一样的,防止GC后出现内存抖动]
2. -Xmn 年轻代大小
    a) 整个堆的大小 = 年轻代大小 + 老年代大小 + 持久代大小
    b) 持久代一般固定大小为64M
    c) 所以,增大年轻代后,将会较小老年代的大小。这个值对系统性能影响较大,
       Sun官方推荐配置为整个堆的3/8
3. -XX:NewSize 初始年轻代大小     -XX:MaxNewSize 最大年轻代大小
4. -XX:NewRatio 老年代和年轻代的比值
5. -XX:SurvivorRatio 设置年轻代中Eden区与Survivor区的大小比值 
   [默认是8:1:1,比如设置为6,那么就是 6:2:2]
6. -XX:MaxTenuringThreshold 设置年轻代的对象被回收多少次后才进入老年代,默认15次
    [控制对象在经过多少次minor GC后进入老年代,此参数只在Serial串行GC时有效]
7. -XX:PermSize 初始持久代大小  -XX:MaxPermSize 持久代最大值 
    [这个跟 -Xms 和 -Xmx 堆大小设置一样,都是相等,为了防止内存抖动]

调优

JVM调优主要针对内存管理方面,包括控制各个代的大小,GC策略。
由于GC开始垃圾回收时会挂起应用线程,严重影响性能,调优的目的就是
为了尽量降低GC所导致的应用线程暂停时间、减少Full GC的次数。

代调优:

 1) 避免新生代大小设置过小
     当新生代设置过小时,会产生两种比较明显的现象:
         一是minor GC次数频繁
         二是可能导致minor GC对象直接进入老年代。当老年代内存不足时,会出发Full GC。
2) 避免新生代设置过大
    新生代设置过大,会带来两个问题:
        一是老年代变小,可能导致Full GC频繁执行;
        二是minor GC 执行回收的时间大幅度增加
那怎么选择年轻代的大小呢?[分不同的应用场景]
    a. 响应时间优先的应用:
        尽可能设大,知道接近系统的最低响应时间限制(根据实际情况选择)。


说明:新new的对象会首先会进入年轻代的Eden中(如果对象太大可能直接进入年老代),在GC之前对象是存在Eden和from中的,进行GC的时候Eden中的对象被拷贝到To这样一个survive空间(survive(幸存)空间:包括from和to,他们的空间大小是一样的,又叫s1和s2)中(有一个拷贝算法),From中的对象(算法会考虑经过GC幸存的次数)到一定次数 阈值(如果说每次GC之后这个对象依旧在Survive中存在,GC一次他的Age就会加1,默认15就会放到OldGeneration。但是实际情况比较复杂,有可能没有到阈值就从Survive区域直接到Old Generation区域。)

1.哪些需要回收? —-> java堆内存、方法区内存

2.什么时候回收? —–>

2.1:引用计数法【引用count+1,引用失效时count-1,为0时不被引用】
【如果两个对象相互引用而且都没有被使用了,那么会造成内存泄漏】。

2.2:可达性分析【从根节点搜索,如果没有搜索到就是没有被使用,所以互相引用且搜索不到的也会被清除】

怎么回收?—-> 垃圾回收算法[3种]

1.标记清除算法 [Mark-Sweep]

遍历所有的GC Root,分别标记处可达的对象和不可达的对象,然后将不可达的对象回收。
**缺点**是:效率低、回收得到的空间不连续 【当比较大的对象被创建时由于被回收的是不连续的,
所以被回收的空间就存不下,造成了浪费】

2.标记整理算法

将所有可用的对象往前移动[标记谁是活跃对象,整理,会把内存对象整理成一棵树一个连续的空间],这样会很耗资源

3.复制算法

将内存分为两块,每次只使用一块。当这一块内存满了,就将还存活的对象复制到另一块上,并且严格按照内存地址排列,然后把已使用的那块内存统一回收。
**优点**是:能够得到连续的内存空间 
**缺点**是:浪费了一半内存
年轻代使用的是复制整理算法
有一点需要注意:

Spark性能调优

Spark性能调优:

1.分配更多的资源 – 性能调优的王道

真实项目里的脚本:
    bin/spark-submit \
    --class com.xx.xx \
    --num-executors 80 \
    --driver-memory 6g \
    --executor-cores 3 \
    --master yarn-cluster \
    --queue root.default \
    --conf spark.yarn.executor.memoryOverhead=2048 \
    --conf spark.core.connection.ack.waite.timeout=300 \
    /usr/xx/xx.jar \
    args
分配资源之前,要先看机器能够给我们提供多少资源,并且最大化的利用这些资源才是最好的;
1.standalone模式:
    根据实际情况分配spark作业资源,比如每台机器4G,2个cpu,20台机器
2.spark-on-yarn模式:
    要看spark作业要提交到的资源队列,大概有多少资源?
SparkContext、DAGScheduler、taskScheduler,会将我们的算子切割成大量的task提交到Application的executor上去执行,比如分配给其中一个的executor100个task,他的cpu只有2个,那么并行只能执行2个task,如果cpu为5个,那么久先执行5个再执行5个
a.增加executor的数量:
    如果executor的数量比较少,那么意味着可以并行执行的task的数量就比较少,
    就意味着Application的并行执行能力比较弱;
        比如:
            有3个executor,每个executor有2个cup core,
            那么能够并行执行的task就是6个,6个执行完换下一批6个
    增加executor的个数后,并行执行的task就会变多,性能得到提升
b.增加每个executor的cpu core
        比如:
            之前:3个exexutor,每个executor的cpu core为2个,那么并执行的task是6个
                 把cpu core增加到5个,那么并行执行的task就是15个,提高了性能
c.增加每个executor的内存量:
    1.如果需要对RDD进行cache,那么更多的内存就能缓存更多的数据,
      将更少的数据写入磁盘,甚至不写入磁盘,减少了磁盘IO。
    2.对于shuffle操作,reduce端会需要内存来存储拉取过来的数据进行聚合,如果内存不够,
      也会写入磁盘,增加executor内存,就会有更少的数据写入磁盘,较少磁盘IO,提高性能。
    3.对于task的执行,需要创建很多对象,如果内存比较小,可能导致JVM堆内存满了,
      然后频繁的GC,垃圾回收,minorGC和fullGC,速度会很慢,如果加大内存,
      带来更少的GC,速度提升。

2.调节并行度

并行度:spark作业中,各个stage的task个数,也就代表了saprk作业中各个阶段[stage]的并行度。
[spark作业,Application,jobs,action会触发一个job,每个job会拆成多个stage,发生shuffle的时候,会拆出一个stage]
如果不调节并行度,导致并行度过低,会怎么样???
比如:
    1.我们通过上面的分配好资源后,有50个executor,每个executor10G内存,每个executor有3个cpu core
      基本已经达到了集群或者yarn的资源上限
    2. 50个executor * 3个cpu = 150个task,即可以并行执行150个task;
      而我们设置的时候只设置了100个并行度的task,这时候每个executor分配到2个task,同时运行task的数量只有100个,导致每个executor剩下的1个cpu core在那空转,浪费资源。
    资源虽然够了,但是并行度没有和资源相匹配,导致分配下去的资源浪费掉了!!!
    **合理的并行度设置,应该要设置的足够大,大到完全合理的利用集群资源!而且减少了每个task要处理的数据量[比如150g的数据分别分发给100个task处理就是每个task处理1.5G,但是如果是150个task的话,每个task就处理1G]**
总结:
    a. task数量,至少设置成与Spark application的总cpu数相同(理想状态下,比如150个cpu core,分配150个task,差不多同时运行完毕)
    b.官方推荐做法:task的数量设置成 Spark application的cpu core的个数的3~5倍!!
      比如总共150个cpu core,设置成300~500个task
      为什么这么设置呢???
           实际情况下和理想状态下是不一样的,因为有些task运行的快,有些运行的慢,
           比如有些运行需要50s,有些需要几分钟运行完毕,如果刚好设置task数量和cpu core的数量相同,可能会导致资源的浪费;
           比如150个task,10个运行完了,还有140个在运行,那么势必会导致10个cpu core的闲置,
           所以如果设置task的数量为cpu数量的2~3倍,一旦有task运行完,另一个task就会立刻补上来,
           尽量让cpu core不要空闲,提升了spark作业运行效率,提升性能。

    c.如何设置一个 Spark application的并行度???
        SparkConf sc = new SparkConf()
                       .set("spark.default.parallelism","500");

3.重构RDD架构及RDD持久化:

默认情况下 RDD出现的问题:             
                                              RDD4
                                            /
           hdfs --> RDD1 --> RDD2 -->RDD3
                                            \
                                              RDD5
    以上情况: 执行RDD4和RDD5的时候都会从RDD1到RDD2然后到RDD3,执行期间的算子操作,
    而不会说到RDD3的算子操作后的结果给缓存起来,所以这样很麻烦,
    出现了RDD重复计算的情况,导致性能急剧下降!
结论:
    对于要多次计算和使用的公共RDD,一定要进行持久化!
    持久化也就是:BlockManager将RDD的数据缓存到内存或者磁盘上,后续无论对这个RDD进行多少次计算,都只需要到缓存中取就ok了。
    持久化策略:
        rdd.persist(StorageLevel.xx()) 或者 cache
        1.优先会把数据缓存到内存中 -- StorageLevel.MEMORY_ONLY
        2.如果纯内存空间无法支撑公共RDD的数据时,就会优先考虑使用序列化的机制在纯内存中存储,
        将RDD中的每个partition中的数据序列化成一个大的字节数组,也就是一个对象,
        序列化后,大大减少了内存空间的占用。-- StorageLevel.MEMORY_ONLY_SER
            序列化唯一的缺点:在获取数据的时候需要反序列化
        3.如果序列化纯内存的方式还是导致OOM,内存溢出的话,那就要考虑磁盘的方式。
          内存+磁盘的普通方式(无序列化) -- StorageLevel.MEMORY_AND_DISK
        4.如果上面的还是无法存下的话,就用 内存+磁盘+序列化 -- StorageLevel.MEMORY_AND_DISK_SER
另:在机器内存**极度充足**的情况下,可以使用双副本机制,来持久化,保证数据的高可靠性
    如果机器宕机了,那么还有一份副本数据,就不用再次进行算子计算了。[锦上添花--一般不要这么做] -- StorageLevel.MEMORY_ONLY_SER_2

4.广播大变量 [sc.broadcast(rdd.collect)]

问题情景:
    当我们在写程序用到外部的维度表的数据进行使用的时候,程序默认会给每个task都发送相同的这个数据,
    如果这个数据为100M,如果我们有1000个task,100G的数据,通过网络传输到task,
    集群瞬间因为这个原因消耗掉100G的内存,对spark作业运行速度造成极大的影响,性能想想都很可怕!!!
解决方案:
    sc.broadcast(rdd.collect)
    分析原理:
        [BlockManager:负责管理某个executor上的内存和磁盘上的数据]
        广播变量会在Driver上有一份副本,当一个task使用到广播变量的数据时,会在自己本地的executor的BlockManager去取数据,
        发现没有,BlockManager就会到Driver上远程去取数据,并保存在本地,
        然后第二个task需要的时候来找BlockManager直接就可以找到该数据,
        executor的Blockmanager除了可以到Driver远程的取数据,
        还可能到邻近节点的BlockManager上去拉取数据,越近越好!
    举例说明:
        50个executor,1000个task,外部数据10M,
        默认情况下,1000个task,1000个副本,10G数据,网络传输,集群中10G的内存消耗
        如果使用广播,50个executor,500M的数据,网络传输速率大大增加,
        10G=10000M 和 500M的对比 20倍啊。。。
**之前的一个测试[真实]:
        没有经过任何调优的spark作业,运行下来16个小时,
        合理分配资源+合理分配并行度+RDD持久化,作业下来5个小时,
        非常重要的一个调优Shuffle优化之后,2~3个小时,
        应用了其他的性能调优之后,JVM调参+广播等等,30分钟
        16个小时 和 30分钟对比,太可怕了!!!性能调优真的真的很重要!!!

5.Kryo序列化机制:

默认情况下,Spark内部使用java的序列化机制

ObjectOutPutStream/ObjectInputStream,对象输入输出流机制来进行序列化
这种序列化机制的好处:
    处理方便,只是需要在算子里使用的变量是实现Serializable接口即可
缺点在于:
    默认序列化机制效率不高,序列化的速度比较慢,序列化后的数据占用内存空间还比较大

解决:手动进行序列化格式的优化:Kryo [spark支持的]

Kryo序列化机制比默认的java序列化机制速度快,
序列化后的数据更小,是java序列化后数据的 1/10 。
所以Kryo序列化后,会让网络传输的数据更少,在集群中耗费的资源大大减少。

Kryo序列化机制:[一旦启用,会生效的几个地方]
    a.算子函数中使用到的外部变量[比如广播的外部维度表数据]
        优化网络传输性能,较少集群的内存占用和消耗
    b.持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
        将每个RDD partition序列化成一个大的字节数组时,就会使用Kryo进一步优化序列化的效率和性能。
        持久化RDD占用的内存越少,task执行的时候,创建的对象,不至于频繁的占满内存,频繁的GC
    c.shuffle
        在stage间的task的shuffle操作时,节点与节点之间的task会通过网络拉取和传输数据,
        此时这些数据也是可能需要序列化的,就会使用Kryo

实现Kryo:

step1. 在SparkConf里设置 new SparkConf()
                       .set("spark.serializer","org.apache.spark.serializer.KyroSerializer")
                       .registerKryoClasses(new Class[]{MyCategory.class})
    [Kryo之所以没有没有作为默认的序列化类库,就是因为Kryo要求,如果要达到它的最佳效果的话]
    [一定要注册我们自定义的类,不如:算子函数中使用到了外部自定义的对象变量,这时要求必须注册这个类,否则Kyro就达不到最佳性能]
step2. 注册使用到的,需要Kryo序列化的一些自定义类

6.使用FastUtil优化数据格式:

FastUtil是什么??

fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。 
fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。 
fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。 
fastutil尽量提供了在任何场景下都是速度最快的集合类库。

Spark中FastUtil运用的场景??

1.如果算子函数使用了外部变量,
    第一可以使用broadcast广播变量优化;
    第二可以使用Kryo序列化类库,提升序列化性能和效率;
    第三如果外部变量是某种比较大的集合(Map、List等),可以考虑fastutil来改写外部变量,
        首先从源头上就减少了内存的占用,通过广播变量进一步减少内存占用,
        再通过Kryo类库进一步减少内存占用
    避免了executor内存频繁占满,频繁唤起GC,导致性能下降的现象

使用步骤:

step1:导入pom依赖
    <dependency>
        <groupId>fastutil</groupId>
            <artifactId>fastutil</artifactId>
        <version>5.0.9</version>
    </dependency>
step2:
    List<Integer> => IntList
     基本都是类似于IntList的格式,前缀就是集合的元素类型,
     特殊的就是Map,Int2IntMap,代表了Key-Value映射的元素类型

7.调节数据本地化等待时长:

问题发生的场景:

spark在Driver上,对Application的每一个stage的task分配之前,
都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;
spark的分配算法:
    a.优先把每一个task正好分配到他要计算的数据所在的节点,这样的话不用在网络间传输数据
    b.但是,task没有机会分配到数据所在的节点上,为什么呢???
        因为那个节点上的计算资源和计算能力都满了,这个时候 spark会等待一段时间,
        默认情况下是3s钟,到最后,实在等不了了,就会选择一个较差的本地化级别,
        比如说会把task分配到靠他要计算的数据的节点最近的节点,然后进行计算
    c.对于b来说肯定要发生网络传输,task会通过其所在节点的executor的BlockManager来获取数据,
    BlockManager发现自己本地没有,就会用getRemote()的方法,通过TransferService(网络数据传输组件)
    从数据所在节点的BlockManager中获取数据,通过网络传输给task所在的节点
总结:
  我们肯定是希望看到 task和数据都在同一个节点上,直接从本地的executor中的BlockManager中去获取数据,
  纯内存或者带点IO,如果通过网络传输,那么大量的网络传输和磁盘IO都是性能的杀手

本地化的级别类型:

1.PROCESS_LOCAL: 进程本地化,代码和数据都在同一个进程中,也就是在同一个executor进程中,
  task由executor来执行,数据在executor的BlockManager中,性能最好
2.NODE_LOCAL: 节点本地化,比如说一个节点上有两个executor,其中一个task需要第一个executor的数据,
  但是他被分配给了第二个executor,他会找第二个executor的BlockManager去取数据,但是没有,
  BlockManager会去第一个的executor的BlockManager去取数据,这是发生在进程中的
3.NOPREF: 数据从哪里获取都一样,没有好坏之分
4.RACK_LOCAL: 数据在同一个机架上的不同节点上,需要进行网络间的数据传输
5.ANY: 数据可能在集群中的任何地方,而且不在同一个机架,这种性能最差!!

开发时的流程:

观察spark作业时的日志,先测试,先用client模式,在本地就可以看到比较全的日志。
日志里面会显示:starting task...,PROCESS_LOCAL,或者是NODE_LOCAL,观察大部分数据本地化的级别
如果发现大部分都是PROCESS_LOCAL的级别,那就不用调了,如果大部分都是NODE_LOCAL或者ANY,那就要调节一下等待时长了
要反复调节,反复观察本地化级别是否提升,查看spark作业运行的时间有没有缩短
不要本末倒置,如果是 本地化级别提升了,但是因为大量的等待时间,spark作业的运行时常变大了,这就不要调节了
spark.locality.waite
spark.locality.waite.process
spark.locality.waite.node
spark.locality.waite.rack
默认等待时长都是3s
设置方法:
    new SparkConf().set("spark.locality.waite","10")//不要带s

8.JVM调优:1个executor对应1个JVM进程

A. 降低cache操作的内存占比

JVM模块:
    每一次存放对象的时候都会放入eden区域,其中有一个survivor区域,另一个survivor区域是空闲的[新生代],
    当eden区域和一个survivor区域放满了以后(spark运行产生的对象太多了),
    就会触发minor gc,小型垃圾回收,把不再使用的对象从内存中清空,给后面新创建的对象腾出空间
    清理掉了不在使用的对象后,还有一部分存活的对象(还要继续使用的对象),
    将存活的对象放入空闲的那个survivor区域里,这里默认eden:survivor1: survivor2 = 8:1:1,
    假如对象占了1.5放不下survivor区域了,那么就会放到[老年代]里;
    假如JVM的内存不够大的话,可能导致频繁的新生代内存满溢,频繁的进行minor gc,
    频繁的minor gc会导致短时间内,有些存活的对象,多次垃圾回收都没有回收掉,
    会导致这种短生命周期的对象(其实是不一定要长期使用的对象)年龄过大,
    垃圾回收次数太多,还没有回收到,就已经跑到了老年代;
    老年代中可能会因为内存不足,囤积一大堆短生命周期的对象(本来应该在年轻代中的),
    可能马上就要回收掉的对象,此时可能造成老年代内存满溢,造成频繁的full gc(全局/全面垃圾回收),full gc就会去老年代中回收对象;
    由于full gc算法的设计,是针对老年代中的对象,数量很少,满溢进行full gc的频率应该很少,
    因此采取了不太复杂的但是耗费性能和时间的垃圾回收算法。full gc 很慢很慢;
    full gc 和 minor gc,无论是快还是慢,都会导致JVM的工作线程停止工作,即 stop the world,
    简言之:gc的时候,spark停止工作,等待垃圾回收结束;
在spark中,堆内存被分为了两块:
    一块是专门用来给RDD cache和persist操作进行RDD数据缓存用的;
    一块是给spark算子函数的运行使用的,存放函数中自己创建的对象;
默认情况下,给RDD cache的内存占比是60%,但是在某些情况下,比如RDD cache不那么紧张,
而task算子函数中创建的对象过多,内存不太大,导致频繁的minor gc,甚至频繁的full gc,
导致spark频繁的暂停工作,性能影响会非常大,
解决办法:
    集群是spark-onyarn的话就可以通过spark ui来查看,spark的作业情况,
    可以看到每个stage的运行情况,包括每个task的运行时间,gc时间等等,
    如果发现gc太频繁,时间太长,此时可以适当调节这个比例;
总结:
    降低cache的内存占比,大不了用persist操作,选择将一部分的RDD数据存入磁盘,
    或者序列化方式Kryo,来减少RDD缓存的内存占比;
    对应的RDD算子函数的内存占比就增多了,就可以减少minor gc的频率,同时减少full gc的频率,提高性能
具体实现:0.6->0.5->0.4->0.2
    new SparkConf().set("spark.storage.memoryFraction","0.5")

B. executor堆外内存与连接时常

1. executor堆外内存[off-heap memory]:
   场景:
        比如两个stage,第二个stage的executor的task需要第一个executor的数据,
        虽然可以通过Driver的MapOutputTracker可以找到自己数据的地址[也就是第一个executor的BlockManager],
        但是第一个executor已经挂掉了,关联的BlockManager也没了,就没办法获取到数据;

    有时候,如果你的spark作业处理的数据量特别大,几亿数据量;
    spark作业一运行,是不是报错诸如:shuffle file cannot find,executor task lost,out of memory,
    这时候可能是executor的堆外内存不够用了,导致executor在运行的时候出现了内存溢出;
    导致后续的stage的task在运行的时候,可能从一些executor中拉取shuffle map output 文件,
    但是executor已经挂掉了,关联的BlockManager也没有了,所以可能会报shuffle output file not found,resubmitting task,executor lost,spark作业彻底失败;
  这个时候就可以考虑调节executor的堆外内存,堆外内存调节的比较大的话,也会提升性能;

    怎么调价堆外内存的大小??
        在spark-submit 的脚本中添加 
                    --conf spark.yarn.executor.memoryOverhead=2048
        注意:这个设置是在spark-submit脚本中,不是在 new SparkConf()里设置的!!!
        这个是在spark-onyarn的集群中设置的,企业也是这么设置的!
        默认情况下,堆外内存是300多M,我们在项目中通常都会出现问题,导致spark作业反复崩溃,
        我们就可以调节这个参数 ,一般来说至少1G(1024M),有时候也会2G、4G,
        来避免JVM oom的异常问题,提高整体spark作业的性能
2. 连接时常的等待:
          知识回顾:如果JVM处于垃圾回收过程,所有的工作线程将会停止,相当于一旦进行垃圾回收,
          spark/executor就会停止工作,无法提供响应
   场景:
          通常executor优先会从自己关联的BlockManager去取数据,如果本地没有,
          会通过TransferService,去远程连接其他节点上的executor的BlockManager去取;
          如果这个远程的executor正好创建的对象特别大,特别多,频繁的让JVM的内存满溢,进行垃圾回收,
          此时就没有反应,无法建立网络连接,会有卡住的现象。spark默认的网络连接超时时间是60s,
          如果卡住60秒都无法建立网络连接的话,就宣布失败;
          出现的现象:偶尔会出现,一串fileId诸如:hg3y4h5g4j5h5g5h3 not found,file lost,
          报错几次,几次都拉取不到数据的话,可能导致spark作业的崩溃!
          也可能会导致DAGScheduler多次提交stage,TaskScheduler反复提交多次task,
          大大延长了spark作业的运行时间
  解决办法:[注意是在shell脚本上不是在SparkConf上set!!]
          spark-submit 
                       --conf spark.core.connection.ack.waite.timeout=300

9.shuffle调优

shuffle的概念以及场景
    什么情况下会发生shuffle??
        在spark中,主要是这几个算子:groupByKey、reduceByKey、countByKey、join等
    什么是shuffle?
        a) groupByKey:把分布在集群中各个节点上的数据中同一个key,对应的values都集中到一块,
        集中到集群中的同一个节点上,更严密的说就是集中到一个节点上的一个executor的task中。
        集中一个key对应的values后才能交给我们处理,<key,iterable<value>>
          b) reduceByKey:算子函数对values集中进行reduce操作,最后变成一个value
          c) join  RDD<key,value>    RDD<key,value>,只要两个RDD中key相同的value都会到一个节点的executor的task中,供我们处理
      以reduceByKey为例:

9.1. shuffle调优之 map端合并输出文件

默认的shuffle对性能有什么影响??
    实际生产环境的条件:
        100个节点,每个节点一个executor:100个executor,每个executor2个cpu core,
        总格1000个task,平均到每个executor是10个task;按照第二个stage的task个数和第一个stage的相同,
        那么每个节点map端输出的文件个数就是:10 * 1000 = 10000 个
        总共100个节点,总共map端输出的文件数:10000 * 100 = 100W 个
        100万个。。。太吓人了!!!
    shuffle中的写磁盘操作,基本上是shuffle中性能消耗最严重的部分,
    通过上面的分析可知,一个普通的生产环境的spark job的shuffle环节,会写入磁盘100万个文件,
    磁盘IO性能和对spark作业执行速度的影响,是极其惊人的!!
    基本上,spark作业的性能,都消耗在了shuffle中了,虽然不只是shuffle的map端输出文件这一部分,但是这也是非常大的一个性能消耗点。
怎么解决?
    开启map端输出文件合并机制:
        new SparkConf().set('spark.shuffle.consolidateFiles','true')
    实际开发中,开启了map端输出文件合并机制后,有什么变化?
        100个节点,100个executor,
        每个节点2个cpu core,
        总共1000个task,每个executor10个task,
        每个节点的输出文件个数:
            2*1000 = 2000 个文件
        总共输出文件个数:
            100 * 2000 = 20万 个文件
        相比开启合并之前的100万个,相差了5倍!!
合并map端输出文件,对spark的性能有哪些影响呢?
    1. map task写入磁盘文件的IO,减少:100万 -> 20万个文件
    2. 第二个stage,原本要拉取第一个stage的task数量文件,1000个task,第二个stage的每个task都会拉取1000份文件,走网络传输;合并以后,100个节点,每个节点2个cpu,第二个stage的每个task只需要拉取 100 * 2 = 200 个文件,网络传输的性能大大增强
    实际生产中,使用了spark.shuffle.consolidateFiles后,实际的调优效果:
        对于上述的生产环境的配置,性能的提升还是相当可观的,从之前的5个小时 降到了 2~3个小时
总结:
    不要小看这个map端输出文件合并机制,实际上在数据量比较大的情况下,本身做了前面的优化,
    executor上去了 -> cpu core 上去了 -> 并行度(task的数量)上去了,但是shuffle没调优,
    这时候就很糟糕了,大量的map端输出文件的产生,会对性能有比较恶劣的影响 

9.2. map端内存缓冲与reduce端内存占比

spark.shuffle.file.buffer,默认32k
spark.shuffle.memoryFraction,占比默认0.2
调优的分量:
    map端内存缓冲和reduce端内存占比,网上对他俩说的是shuffle调优的不二之选,其实这是不对的,
    因为以实际的生产经验来说,这两个参数没那么重要,但是还是有一点效果的,
    就像是很多小的细节综合起来效果就很明显了,

原理:

map:
    默认情况下,shuffle的map task输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区中,
    这个缓冲区默认大小是32k,每一次,当内存缓冲区满溢后,才会进行spill操作,溢写到磁盘文件中
reduce:
    reduce端task,在拉取数据之后,会用hashmap的数据格式来对每个key对应的values进行汇聚,
    针对每个key对应的value,执行我们自定义的聚合函数的代码,比如_+_,(把所有values相加)
    reduce task,在进行汇聚、聚合等操作的时候,实际上,使用的就是自己对应的executor的内存,
    executor(jvm进程,堆),默认executor内存中划分给reduce task进行聚合的比例是20%。
    问题来了,内存占比是20%,所以很有可能会出现,拉取过来的数据很多,那么在内存中,
    放不下,这个时候就会发生spill(溢写)到磁盘文件中取.

如果不调优会出现什么问题??

默认map端内存缓冲是32k,
默认reduce端聚合内存占比是20%
如果map端处理的数据比较大,而内存缓冲是固定的,会出现什么问题呢?
    每个task处理320k,32k的内存缓冲,总共向磁盘溢写10次,
    每个task处理32000k,32k的内存缓冲,总共向磁盘溢写1000次,
    这样就造成了多次的map端往磁盘文件的spill溢写操作,发生大量的磁盘IO,降低性能
map数据量比较大,reduce端拉取过来的数据很多,就会频繁的发生reduce端聚合内存不够用,
频繁发生spill操作,溢写到磁盘上去,这样一来,磁盘上溢写的数据量越大,
后面进行聚合操作的时候,很可能会多次读取磁盘中的数据进行聚合
默认情况下,在数据量比较大的时候,可能频繁的发生reduce端磁盘文件的读写;
这两点是很像的,而且有关联的,数据量变大,map端肯定出现问题,reduce也出现问题,
出的问题都是一样的,都是磁盘IO频繁,变多,影响性能

调优解决:

我们要看spark UI,
    1. 如果公司用的是standalone模式,那么很简单,把spark跑起来,会显示sparkUI的地址,
    4040端口号,进去看,依次点击可以看到,每个stage的详情,有哪些executor,有哪些task,
    每个task的shuffle write 和 shuffle read的量,shuffle的磁盘和内存,读写的数据量
    2. 如果是yarn模式提交,从yarn的界面进去,点击对应的application,进入spark ui,查看详情
如果发现磁盘的read和write很大,就意味着要调节一下shuffle的参数,进行调优,
首先当然要考虑map端输出文件合并机制
   调节上面两个的参数,原则是:
      spark.shuffle.buffer,每次扩大一倍,然后看看效果,64k,128k
    spark.shuffle.memoryFraction,每次提高0.1,看看效果
不能调节的过大,因为你这边调节的很大,相对应的其他的就会变得很小,其他环节就会出问题
调节后的效果:
    map task内存缓冲变大了,减少了spill到磁盘文件的次数;
    reduce端聚合内存变大了,减少了spill到磁盘的次数,而且减少了后面聚合时读取磁盘的数量
    new SparkConf()
    .set("spark.shuffle.file.buffer","64")
    .set("spark.shuffle.file.memoryFraction","0.3")

10.算子调优

1.算子调优之MapPartitons提升map的操作性能
在spark中最近本的原则:每个task处理RDD中的每一个partition
优缺点对比:
    普通Map:
        优点:比如处理了一千条数据,内存不够了,那么就可以将已经处理的一千条数据从内存里面垃圾回收掉,
        或者用其他办法腾出空间;通常普通的map操作不会导致内存OOM异常;
        缺点:比如一个partition中有10000条数据,那么function会执行和计算一万次
    MapPartitions:
        优点:一个task仅仅会执行一次function,一次function接收partition中的所有数据
        只要执行一次就可以了,性能比较高
        缺点:对于大数据量来说,比如一个partition100万条数据,一次传入一个function后,
        可能一下子内存就不够了,但是又没办法腾出空间来,可能就OOM,内存溢出
那么什么时候使用MapPartitions呢?
    当数据量不太大的时候,都可以使用MapPartitions来操作,性能还是很不错的,
    不过也有经验表明用了MapPartitions后,内存直接溢出,
    所以在项目中自己先估算一下RDD的数据量,以每个partition的量,还有分配给executor的内存大小,
    可以试一下,如果直接OOM了,那就放弃吧,如果能够跑通,那就可以使用。
2.算子调优之filter之后 filter 之后 用 coalesce来减少partition的数量
默认情况下,RDD经过filter之后,RDD中每个partition的数据量会不太一样,(原本partition里的数据量可能是差不多的)
问题:
    1.每一个partition的数据量变少了,但是在后面进行处理的时候,
    还是要和partition的数量一样的task数量去处理,有点浪费task计算资源
    2.每个partition的数据量不一样,后面会导致每个处理partition的task要处理的数据量不一样,
    这时候很容易出现**数据倾斜**
    比如说,有一个partition的数据量是100,而另一个partition的数据量是900,
    在task处理逻辑一样的情况下,不同task要处理的数据量可能差别就到了9倍,甚至10倍以上,
    同样导致速度差别在9倍或者10倍以上
    这样就是导致了有的task运行的速度很快,有的运行的很慢,这就是数据倾斜。
解决:
    针对以上问题,我们希望把partition压缩,因为数据量变小了,partition完全可以对应的变少,
    比如原来4个partition,现在可以变成2个partition,那么就只要用后面的2个task来处理,
    不会造成task资源的浪费(不必要针对只有一点点数据的partition来启动一个task进行计算)
    避免了数据倾斜的问题
3.算子调优之使用foreachPartition优化写入数据库性能
默认的foreach有哪些缺点?
    首先和map一样,对于每条数据都要去调一次function,task为每个数据,都要去执行一次task;
    如果一个partition有100万条数据,就要调用100万次,性能极差!
    如果每条数据都要创建一个数据库连接,那么就要创建100万个数据库连接,
    但是数据库连接的创建和销毁都是非常耗性能的,虽然我们用了数据库连接池,只要创建固定数量的连接,
    还是得多次通过数据库连接,往数据库里(mysql)发送一条sql语句,mysql需要去执行这条sql语句,
    有100万条数据,那么就是要发送100万次sql语句;
用了foreachPartition以后,有哪些好处?
    1.对于我们写的函数就调用一次就行了,一次传入一个partition的所有数据
    2.主要创建或者获取一个数据库连接就可以了
    3.只要向数据里发送一条sql语句和一组参数就可以了
在实际开发中,我们都是清一色使用foreachPartition算子操作,
但是有个问题,跟mapPartitions操作一样,如果partition的数据量非常大,
比如真的是100万条,那几本就不行了!一下子进来可能会发生OOM,内存溢出的问题
一组数据的对比:
    生产环境中:
        一个partition中有1000条数据,用foreach,跟用foreachPartition,
        性能提高了2~3分;
数据库里是:
    for循环里preparestatement.addBatch
    外面是preparestatement.executeBatch
4.算子调优之repartition解决SparkSQL低并行度的问题
并行度: 我们是可以自己设置的
    1.spark.default.parallelism
    2.sc.textFile(),第二个参数传入指定的数量(这个方法用的非常少)
在生产环境中,我们是要自己手动设置一下并行度的,官网推荐就是在spark-submit脚本中,
指定你的application总共要启动多少个executor,100个,每个executor多少个cpu core,
2~3个,假设application的总cpu core有200个;
官方推荐设置并行度要是总共cpu core个数的2~3倍,一般最大值,所以是 600;
设置的这个并行度,在哪些情况下生效?哪些情况下不生效?
    1.如果没有使用SparkSQL(DataFrame)的话,那么整个spark应用的并行度就是我们设置的那个并行度
    2.如果第一个stage使用了SparkSQL从Hive表中查询了一些数据,然后做了一些transformatin的操作,
    接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle之后,做了一些transformation的操作
    如果Hive表对应了20个block,而我们自己设置的并行度是100,
    那么第一个stage的并行度是不受我们控制的,就只有20个task,第二个stage的才是我们设置的并行度100个
问题出在哪里了?
    SparkSQL 默认情况下,我们是没办法手动设置并行度的,所以可能造成问题,也可能不造成问题,
    SparkSQL后面的transformation算子操作,可能是很复杂的业务逻辑,甚至是很复杂的算法,
    如果SparkSQL默认的并行度设置的很少,20个,然后每个task要处理为数不少的数据量,
    还要执行很复杂的算法,这就导致第一个stage特别慢,第二个stage 1000个task,特别快!
解决办法:
    repartition:
        使用SparkSQL这一步的并行度和task的数量肯定是没办法改变了,但是可以将SparkSQL查出来的RDD,
        使用repartition算子进行重新分区,比如分多个partition,20 -> 100个;
        然后从repartition以后的RDD,并行度和task数量,就会按照我们预期的来了,
        就可以避免在跟SparkSQL绑定在一起的stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑

5.算子操作reduceByKey:

reduceByKey相较于普通的shuffle操作(不如groupByKey),他的一个特点就是会进行map端的本地聚合;
对map端给下个stage每个task创建的输出文件中,写数据之前,就会进行本地的combiner操作,也就是多每个key的value,都会执行算子函数(_+_),减少了磁盘IO,较少了磁盘空间的占用,在reduce端的缓存也变少了

11.troubleshooting之控制reduce端缓冲大小以避免内存溢出(OOM)

new SparkConf().set("spark.reducer.maxSizeInFlight","24") //默认是48M
Map端的task是不断地输出数据的,数据量可能是很大的,
    但是其reduce端的task,并不是等到Map端task将属于自己的那个分数据全部写入磁盘后,再去拉取的
    Map端写一点数据,reduce端task就会去拉取一小部分数据,立刻进行后面的聚合,算子函数的应用;
    每次reduce能够拉取多少数据,是由reduce端buffer来定,因为拉取过来的数据都是放入buffer中的,
    然后采用后面的executor分配的堆内存占比(0.2),去进行后续的聚合,函数操作
reduce端buffer 可能会出现什么问题?
    reduce端buffer默认是48M,也许大多时候,还没有拉取满48M,也许是10M,就计算掉了,
    但是有时候,Map端的数据量特别大,写出的速度特别快,reduce端拉取的时候,全部到达了自己缓冲的最大极限48M,全部填满,
    这个时候,再加上reduce端执行的聚合函数代码,可能会创建大量的对象,也许一下子内存就撑不住了,就会造成OOM,reduce端的内存就会造成内存泄漏
如何解决?
    这个时候,我们应该减少reduce端task缓冲的大小,我们宁愿多拉取几次,但是每次同时能拉取到reduce端每个task的数据量比较少,就不容易发生OOM,比如调成12M;
    在实际生产中,这种问题是很常见的,这是典型的以性能换执行的原理,
    reduce的缓冲小了,不容易造成OOM了,但是性能一定是有所下降的,你要拉取的次数多了,
    就会走更多的网络IO流,这时候只能走牺牲性能的方式了;
曾经一个经验:
    曾经写了一个特别复杂的spark作业,写完代码后,半个月就是跑步起来,里面各种各样的问题,
    需要进行troubleshooting,调节了十几个参数,其中里面就有reduce端缓冲的大小,最后,
    总算跑起来了!

12. troubleshooting之解决JVM GC导致的shuffle拉取文件失败:

过程:
    第一个stage的task输出文件的同时 ,会像Driver上记录这些数据信息,然后下一个stage的task想要得到上个stage的数据,
    就得像Driver所要元数据信息,然后去像上一个的stage的task生成的文件中拉取数据。
问题场景:
    在spark作业中,有时候经常出现一种情况,就是log日志报出:shuffle file not found..,
    有时候他会偶尔出现一次,有的时候出现一次后重新提交stage、task,重新执行一遍 就好了。
分析问题:
    executor在JVM进程中,可能内存不太够用,那么此时就很可能执行GC,minor gc 或者 full gc,
    总之一旦发生gc后,就会导致所有工作线程全部停止,比如BlockManager,基于netty的网络通信。
    第二个stage的task去拉取数据的时候,上一个executor正好在进行gc,就导致拉取了半天也没拉取到数据,
    那为什么第二次提交stage的时候,就又可以了呢?
        因为第二次提交的时候,上一个executor已经完成了gc。
解决:
    spark.shuffle.io.maxRetries 3[默认3次]
        shuffle 文件拉取时,如果没有拉取到,最多或者重试几次,默认3次
    spark.shuffle.io.retryWait 5s [默认5s]
        每一次重新拉取文件的时间间隔,默认5s
    默认情况下,第一个stage的executor正在漫长的full gc,第二个stage的executor尝试去拉取数据,
    结果没拉取到,这样会反复重试拉取3次,中间间隔时间5s,也就是总共15s,拉取不成功,就报 shuffle file not found
        我们可以增大上面两个参数的值:
            spark.shuffle.io.maxRetries 60次
            spark.shuffle.io.retryWait 60s
            最多可以忍受一个小时没有拉取到shuffle file,这只是一个设置最大的可能值,
            full gc 也不可能一个小时都没结束把,
            这样就解决了因为gc 而无法拉取到数据的问题

13. troubleshooting之解决yarn-cluster模式的JVM栈内存溢出问题

yarn-cluster运行流程:
    1.本地机器执行spark-submit脚本[yarn-cluster模式],提交spark application给resourceManager
    2. resourceManager找到一个节点[nodeManager]启动applicationMaster[Driver进程]
    3. applicationMaster找resourceManager申请executor
    4. resourceManager分配container(内存+cpu)
    5. applicationMaster找到对应nodeManager申请启动executor
    6. nodeManager启动executor
    7. executor找applicationMaster进行反向注册
    到这里为止,applicationMaster(Driver)就知道自己有哪些资源可以用(executor),
    然后就会去执行job,拆分stage,提交stage的task,进行task调度,
    分配到各个executor上面去执行。
yarn-client 和 yarn-cluster的区别:
    yarn-client模式Driver运行在本地机器上;yarn-cluster模式Driver是运行在yarn集群上的某个nodeManager节点上的;
    yarn-client模式会导致本地机器负责spark作业的调用,所以网卡流量会激增,yarn-cluster没有这个问题;
    yarnclient的Driver运行在本地,通常来说本地机器和yarn集群都不会在一个机房,所以性能不是特别好;
    yarn-cluster模式下,Driver是跟yarn集群运行在一个机房内,性能上也会好很好;
实践经验碰到的yarn-cluster的问题:
    有时候运行了包含spark sql的spark作业,可能会遇到 在yarn-client上运行好好地,在yarn-cluster模式下,
    可能无法提交运行,会报出JVM的PermGen(永久代)的内存溢出-OOM;
    Yarn-client模式下,Driver是运行在本地机器的,spark使用的JVM的PerGen的配置,是本地的spark-class文件,
    (spark客户端是默认有配置的),JVM的永久代大小默认是128M,这个是没问题的;
    但是在Yarn-cluster模式下,Driver是运行在yarn集群的某个节点上的,使用的是没有经过配置的默认设置82M(PerGen永久代大小)
    spark sql内部会进行很负责的sl语义解析、语法树的转换,特别复杂,在这种情况下,如果sql特别复杂,
    很可能会导致性能的消耗,内存的消耗,可能对PermGen永久代的内存占比就很大
    所以此时,如果对PermGen的内存占比需求多与82M,但是又小于128M,就会出现类似上面的情况,
    yarn-client可以正常运行因为他的默认permgen大小是128M,但是yarn-cluster的默认是82M,就会出现PermGen OOM -- PermGen out of memory
解决:
    spark-submit脚本中加入参数:
        --conf spark.driver.extraJavaOptions='-XX:PermSize=128M -XX:MxPermSize=256M'
        这样就设置了永久代的大小默认128M,最大256M,那么这样的话,就可以保证spark作业不会出现上面的PermGen out of memory

客户端向HDFS读写数据流程

客户端向HDFS写数据流程

1. 客户端向Namenode请求上传文件 /aaa/xxx.log
2. Namenode 检查自己的元数据,看看元数据下有没有这个目录,假设满足条件(没有这个目录可以上传)
    响应客户端的请求[客户端可以上传]
3. 客户端RPC请求上传1个Block(0-128M),请返回DataNode(返回几个DataNode是客户端配置的,默认3个)
4. 返回(dn1,dn3,dn4)给客户端
    为什么要给客户端返回这三台机器呢?
        考虑因素: 空间/距离
            1. 第1个副本(dn1),要看DataNode的空间(剩余存储空间)和距离(同一个机架上的距离一样,而不是谁的网线长短)
            假如这几台机器的空间差不多,NameNode就随机返回一台;
            2. 第2个副本(dn3),考虑跨机架挑选1个DataNode,增加副本的可靠性
            3. 第3个副本就在第1个副本同机架另外挑选1台DataNode存放
        怎么能让NameNode知道哪个datanode在1个机架,哪个datanode在另1个机架,这个要配个文件,
        配机架感知。[就是在配置文件里写死了,哪个机架有哪几台DataNode(网咯拓扑)]
5. 客户端会找最近的(将要上传副本到的那台机器)一台DataNode[假如找的是dn1],请求建立Block传输通道channel[本地流,以供写入数据]
    5.1 dn1接收到请求看到还有2台机器(dn3、dn4),dn1就向dn3发送请求建立管道
    5.2 dn3接收到请求看到还有一台dn4,就向dn4发送请求建立管道
6.1 dn4建立管道成功后,给dn3发送响应,建立管道成功
6.2 dn3再给dn1发送响应,建立管道成功
    这样一来,管道 PipeLine就建立成功了
7. 客户端读取本地的1块数据(默认128M),向dn1传输数据(数据是以1个1个packet[默认64k]的形式上传的)
    packet是以chunk(Byte)为单位校验
    7.1 每个DataNode写入磁盘之前都会有1个缓冲区(ByteBuf),先写入缓冲区,再写入本地磁盘
         因为每个packet只有64k,所以dn写入之后,基本上后面的dn3和dn4也都复制上传完成了[几乎同步完成]
         每个DataNode上传成功packet 都会向前面的DataNode反馈说上传成功,第一个DataNode才给客户端说上传成功
    假如说是后面两台DataNode节点没有上传成功,但是只要是第1台上传成功了,客户端就认为上传成功了
    [因为NameNode最后会帮客户端异步去复制,省得客户端一直阻塞,说没成功再来,没成功再来];
    假如第1台也失败了,客户端就会找NameNode,告诉他这台DataNode不行,再换,然后重新返回DataNode,重新上传;
8. 上传完了第1个Block,客户端会再次向NameNode发送请求说要上传第2块Block,然后重新走刚刚的流程
总结:
    这样一来,NameNode的元数据就记录了,上传的文件的 路径、文件有几个Block、Block副本在哪些机器

客户端向HDFS读数据流程

1. 客户端向NameNode请求下载/aa/xx.log
2. NameNode的元数据上面查看/aa/xx.log,然后查询得到:3个Block(Block1,Block2,Block3)、{Block1:dn1,dn3,dn4 Block2:dn1,dn4,dn5...}
    并返回给客户端目标文件的元数据信息
3. 客户端要下载第1个Block,然后就去找副本所在节点距离它最近的(dn1),建立管道,请求读取Block1
4. dn1建立本地流,fileInpuStream,dn1建立 NIO socket - socketOutPutStream
5. 客户端建立socketInputStream,还有fileOutPutStream 写入本地 如:c:/xxx.log
6. dn1 传输通过通道传输数据
Block1上传到客户端完成后,客户端会再去找第2块Block 跟找Block1是一样的,还有Block3

MR Yarn的提交流程

Yarn:
    只负责程序运行所需资源的分配回收等调度任务,与应用程序的内部运行工作机制完全无关,
    所以Yarn已经成为一个通用的资源调度平台,许许多多的运算框架都可以借助他来实现资源管理,
    比如:MR/SPARK/FLINK/TEZ...
NodeManager分配container基于:
Linux的资源隔离机制cgroup  比如:docker也是基于此

1. 客户端所在节点,运行jar包,main方法里,job.submit() 
  YarnRunner(Proxy[实现了ClientProtocol])找Yarn集群的老大ResourceManager申请提交1个Application
2. ResourceManager返回Application提交资源路径:hdfs://xxx/xx.staging 和 application_id
3. YarnRunner提交job运行所需的资源文件,提交到hdfs://xxx/xx.staging/application_id
                                                                                    /job.split
                                                                                    /job.xml
                                                                                    wc.jar
4. 客户端通过RPC调用 告诉ResourceManager 资源提交完毕,申请运行mrAppMaster [后面的事情就和客户端没关系了]
ResourceManager运行的时候不止是接收这1个程序,还可能接收其他的程序提交,所以ResourceManager也有不同的调度策略[三种策略]:
    4.1 FIFO[先进先出]:任务在队列里,要先运行完第1个,再运行第2个
        [这个时候是可以接收其他的任务提交的,只不过要等][这个在老版本里是默认的]
    4.2 Fair:每个job提交后,都会分配一点点资源给他们
    4.3 capacity:第1个job提交之后,把资源全部给他,当第2个job来的时候,第1个job的一些task可能就已经运行完了,
        这样的话空余出来的资源就可以给后来提交的job了,如果后来的job需要的资源很少,可能直接就运行完了,运行完了,            还可以把这些资源给之前的应用继续使用
        [新版本的默认就是capacity调度策略]
5. 将用户的请求初始化成一个task
6. 因为nodeManager和NamenodeManager一直保持着心跳,所以会去领取任务(task),假如nm1领取到任务
7. nm1就会生成1个container容器[分配了cpu + ram],到HDFS上下载资源并启动mrAppMatser
8. mrAppMatser向NamenodeManager申请运行MapTask的容器
9. 假如这时候nm2和nm3领取到task任务,然后分别在各自的节点生成1个容器[这个容器就是1个进程叫做YarnChild,可以通过jps查看]
10. mrAppMatser发送程序启动脚本[java -cp...]给nm2和nm3来运行MapTask
        mrAppMatser会监管这些MapTask的运行情况,如果哪个MapTask运行失败,
        他还会去找NamenodeManager重新申请1个nodeManager来生成container来运行MapTask
        如果他发现有一个MapTask运行的特别慢,他还会去重新申请1个新的nodeManager生成container,
        两个同时处理一个切片,看哪个先执行完,就用哪个的结果文件作为reduce的输入 **[推测执行]**
11. 当MapTask运行完成之后,mrAppMatser知道MapTask的输出文件[分区且排序,不如有3个分区]在哪[container里的工作目录]
12. mrAppMatser向NamenodeManager申请3个容器,来运行reduceTask程序
13. 这3个容器运行的reduce task会向相对应map task的nodeManager拉取相对应分区的数据过来
14. reduce 对数据进行merge + 排序 ,调用reduce方法 ,输出结果
15. 当mrAppMatser执行完了之后就会去找ResourceManager申请注销自己

总结hadoop1和hadoop2的比较:

a) 1里面没有yarn,只有jobTracker,负责资源调度和应用运算流程管理,
如果后面还有任务提交,还是他来管理和调度资源,这样的话都在1个节点上,
很消耗性能,而且当jobTracker挂掉之后,任务都不能运行了。
b) 2里面有yarn,只要把任务提交给yarn ,然后yarn进行资源管理就可以了,
appMaster来负责应用运算流程管理,当一个appMaster挂了之后,也不会影响
其他应用的作业                                                 

kafka总结相关笔记

kafka - scala 语言写的 版本 1.0.0 scala 2.11 官方推荐

kafka 是什么?

1.kafka是一个消息队列(生产者消费者模式)
2.目标:构建企业中统一的、高通量、低延时的消息平台
3.大多的是消息队列(消息中间件)都是基于JMS标准实现的,Kafka类似于JMS的实现

kafka 有什么用?(消息队列有什么用?)

作为缓冲,来异构、解耦系统
    a. 用户注册需要多个步骤,每个步骤执行都需要很长时间,代表用户等待时间是所有步骤的累计时间
    b. 为了减少用户等待的时间,使用并行执行,有多少步骤,就开启多少个线程来执行
        代表用户等待时间是所有步骤中耗时最多的那个步骤时间
    c. 问题:开启多个线程执行每个步骤,如果以一个步骤执行异常,或者严重超时,
        用户的等待时间就不可控了
使用消息队列来保证
    1.注册时,立刻返回成功
    2.发送注册成功的消息到消息平台
    3.对注册信息感兴趣的程序,可以消费消息

kafka的基本架构


kafka cluster:由多个服务器组成,每个服务器单独的名字broker(server)
kafka producer:生产者、负责生产数据
kafka consumer:消费者、负责消费数据
kafka topic:主题,一类消息的名称。存储数据时将一类数据存放在某个topic下,消费数据也是消费一类数据
订单系统:创建一个topic,叫做order
用户系统:创建一个topic,叫做user
商品系统:创建一个topic,叫做product

配置kafka需要修改配置文件的三个地方:
1.broker.id
2.数据存放的目录,注意:目录如果不存在,需要新建
3.zookeeper的地址信息
查看kafka集群
由于kafka集群没有UI界面,需要借助外部工具,来查看kafka的集群
这个工具是一个java程序,必须要安装好jdk --- ZooInspector
1) 创建一个订单的topic。
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order
2)编写代码启动一个生产者,生产数据
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic order
3) 编写代码启动给一个消费者,消费数据
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic order

kafka原理

1.分片与副本机制

分片:
    当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,
    存放在多台服务器上。每个服务器上的数据,叫做一个分片。
        问题:
            如果一个partition中有10T数据,如何存放?是放在一个文件还是多个文件?
                kafka的解决方案是多个文件!
                这里说的多个文件就是segment段
                    [我们在kafka配置文件中配置的数据存储目录:/export/data/kafka]
                    里面有topicname-index [如:order-0,意思是order这个topic的第0个副本]
                    这个order-0目录下就是存放着诸如:
                        00000000000000000.index
                        00000000000000000.log
                        segment段包含了这两个文件,segment段默认是1G大小
                        segment段中有两个核心的文件.log和.index,当log文件等于1G的时候,
                        新的数据会写入到下个segment中,同时我们也可以看到segment段中有.timeindex文件生成,
                        而且通过查看,可以看到一个segment段差不多会存储70万条数据。


如上图所述:

                *Segment文件命名规则:
                    partition全局的第一个segment从0开始,后续每个sgment文件名为上一个segment文件最后一条消息的offset值。
                    数值最大的为64位long大小,19位数字字符长度,没有数字用0填充。
                *索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中
                message的物理偏移地址
                结论:
                    kafka查找segment file 只需要两步
                        1.先找到这个数据对应的segment[通过查看segment的区间,offset在哪个segment段中]
                        2.根据某个segment段中的.index 索引文件中查找该条数据所在.log文件中的位置
        kafka为什么要对文件进行切分,保存多个文件中?
            kafka作为消息中间件,只负责消息的临时存储,并不是永久存储,
            需要删除过期的数据。
            如果将所有的数据都存放在一个文件中,要删除过期数据的时候,就麻烦了。
            因为文件有日期属性,删除过期数据,只需要根据文件的日属性删除就好了

副本:
    当数据只保存一份的时候,有丢失的风险,为了更好的容错和容灾,
    将数据拷贝几份,保存到不同的机器上。

kafka生产数据的分发策略

kafka在生产数据的实惠,有一个数据分发策略。默认的情况使用DefaultPartitoner.class类
这个类就定义数据分发的策略。
    1.如果用指定partition,生产就不会条用DefaultPartitoner.partition()方法,直接发到指定的分区
    [这种不常用!]
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }

    2.当用户指定key,使用hash算法。如果key一直不变,同一个key算出来的hash值是一个固定值。
    如果是固定值,这种hash取模就没意义。
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

    3.当用户既没有指定partition也没有指定key时,使用轮询[round-robin]的方式发送数据
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }

kafka-消费者的负载均衡

举例说明:[问题重现]
    当每秒钟有400条数据过来,分了3个partition来存储,但是只有1个消费者并且每秒能消费100条,这样的话,生产者的速度很快,但是消费者跟不上,怎么办?
    造成了数据大量滞后和延时!
解决:
    多几个消费者,共同来消费数据
        比如3个消费者来共同消费数据这样就解决了。
新的问题:
    消费组中消费者的数量和partition的数量一致,但是消费者消费的熟读还是跟不上[比如每个消费者只能消费100条],怎么办?
        再加个消费者吗? 答案是 no!!!!
        因为根据kafka负载均衡策略规定,多出来的笑着是处于空闲状态的!
    也就是1个partition只能被1个消费者消费
真是解决办法:
    要么修改topic的partition数量;
    要么减少消费者处理时间,提高处理速度;

kafka消息不丢失机制:

1.producer端消息不丢失机制:

如果有多个副本,就需要选择一个leader出来,负责消息的读写请求。
比如:
    有一条数据经过partitioner.class计算把数据发送给了broker2[producerRecord ---> 2]
    关于ack的响应有3个状态值:
        0:生产者只管发数据,并不关心数据是否丢失
        1:partition的leader收到数据后,就返回响应码状态
        -1:所有的从节点和leader都收到数据后,才返回响应码状态
    问题:
        如果broker端一直不给ack状态码,producer永远不知道是否成功。
            producer可以设置一个超时时间  10s,超过时间就认为失败。
    问题又来了:
        如果一条消息发送一次,得到一次ack相应,在大量数据情况下会占用很多带宽怎么办?
    解决:
        生产者将数据线缓存到producer端,达到一定的数量阈值或者时间阈值之后发送
        [比如:设置缓冲池中可以放2万条数据,或者等待时间设置成500ms]
    问题:
        如果设置buffer,按照500条每个批次发送数据到broker,但是broker迟迟不给相应,buffer中的数据如何处理?
        [而且producer端还源源不断的生产数据,这时候就造成了阻塞情况]
    解决:
        可以对buffer进行设置,如果满了,并不确定是否发送,
        如果需要继续生产数据,就可以选择buffer清空,或者不清空 [消息不丢失,一般会设置不清空]
    同步模式和异步模式:[异步就是没有设置缓冲池 buffer]
        在同步模式下:
            1. 生产者等待10s,如果broker没有给出ack响应,就认为失败。
            2. 生产者重置3次,如果还没响应,就报错。
        在异步模式下:[认为设置]
            1. 现将数据保存在生产者端的buffer中。buffer大小是2万条
            2. 满足数据阈值或者数量阈值其中的一个条件就可以发送数据
            3. 发送一批数据的大小是500条
        如果broker迟迟不给ack,而buffer又满了,开发者自己设置是否直接清空buffer中的数据。

2.broker端消息不丢失机制:

broker端的消息不丢失,其实就是用partition副本机制来保证的。

3.consumer端消息不丢失机制:

partition中有个segment段,log和index文件,log文件存放的是消息本身。
index文件存放的是消息offset值和存放在log文件的哪儿文件。
问题:[在kafka 0.8版本之前consumer消费数据的offset值是保存在zookeeper上的,但是
        这样会导致一种现象,就是consumer已经消费完了,但是等还没把offset值保存到zookeeper
        上的时候,consumer挂了,再次重启后,那么就会出现**重复**消费的问题]
解决:
    kafka从0.8版本以后,offset的值是保存在了kafka的内置topic上,
    这样就不会造成重复消费的问题了

补充:

Consumer Group [CG]:
    kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段
    一个topic可以有多个consumer group。topic的消息会复制(不是真的复制,只是概念上的)到所有的CG
    但每个partition只会把消息发给该CG中的一个consumer。
    用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic
broker:
    一台kafka服务器就是一个broker,一个集群由多个broker组成。
    一个broker可以容纳多个topic。
Partition:
    为了负债均衡
    kafka只保证按照一个partition中的顺序将消息发给consumer,
    不保证一个topic的整体(多个partition)的顺序。
leader:
    每一个replication集合中的partition都会选出一个唯一的leader,所有的读写请求都是由leader处理,
    其他的replicas从leader处把数据更新同步到本地。每个cluster当中会选举出一个broker来担任controller,
    负责处理partition的leader选举,协调partition迁移等工作。
ISR(In-Sync-Replica):
    是Replicas的一个子集,表示目前Alive且与leader能够‘catch-up’的replicas集合。
    由于读写都是首先落到leader上,
    所以一般来说通过同步机制从leader上拉取数据的replica都会和leader有一些延迟
    [包括延迟时间和延迟条数2个维度]
    任意一个超过阈值都会把该replica提出ISR。每个Partition都有他自己独立的ISR。
配置文件当中配置的kafka多久删除数据
The minimum age of a log file to be eligible for deletion
log.retention.hours=168 
定时检查周期,发现数据存了超过上面配置的时间,就干数据
log.retention.check.interval.ms=30000

点击流日志分析流程

点击流日志分析流程

流程图如下:

原始数据:
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
字段解析:
1、访客 ip 地址: 58.215.204.118
2、访客用户信息: - -
3、请求时间:[18/Sep/2013:06:51:35 +0000]
4、请求方式:GET
5、请求的 url:/wp-includes/js/jquery/jquery.js?ver=1.10.2 6、请求所用协议:HTTP/1.1
7、响应码:304
8、返回的数据流量:0
9、访客的来源 url:http://blog.fens.me/nodejs-socketio-chat/
10 、 访 客 所 用 浏 览 器 : Mozilla/5.0 (Windows NT 5.1; rv:23.0) Firefox/23.0
Gecko/20100101

1.根据原始数据生成点击流模型 PageViews 和 Visits

PageViews:
    根据IP判断是否是同一用户,根据前后两条日志时间相差是否在30分钟内,
    判断访问日志是否是属于同一个session[会话],按照时间顺序标上步骤,
    这样就构成了一条访问轨迹线;
Visits:
    侧重于体现用户在一次session中的进入离开时间、进入离开页面,
    还有统计出在本次session中用户总共访问了几个页面

2.漏斗模型

逐层递减

3.常见指标:

骨灰级指标:
    IP:1天内访问网站的不重复IP总数
    PV[PageView]:用户每打开1次网页,记录1个PV
    UV[Unique Pageview]:1天以内,访问网站不重复的用户数据(以cookie为依据),1天内同1访客多次访问网站只被计算1次
基础级指标:
    访问次数:
        访客从进入网站到离开网站一系列活动极为一次访问,也就是session
    网站停留时间:
        访问者在网站上花费的时间
    页面停留时间:
        访问者在某个特定页面或某组网页上所花费的时间
复合级指标:
    人均浏览页面:
        浏览次数/独立访客数  --体现网站对访客的吸引程度
    二跳率:
        二跳率的概念是当网站页面展开后,用户在页面上产生的首次点击被称为“二跳”,二跳的次数即为“二跳量”。二跳量与到达量(进入网站的人)的比值称为页面的二跳率。
    跳出率:
        跳出率是指在只访问了入口页面(例如网站首页)就离开的访问量与所产生总访问量的百分比。跳出率计算公式:跳出率=访问一个页面后离开网站的次数/总访问次数。
    二跳率越高越好,跳出率越低越好。
4.基础分析(PV,IP,UV)
5.来源分析
6.受访分析
7.访客分析
    终端详情[PC,移动端]新老访客、忠诚度、活跃度
8.转化路径分析

数据处理流程:

> 数据采集
    Flume采集需要在配置文件里配置source、channel、sink:
        source:
            1.spoolDir的作用是:监控文件夹,如果有新的文件产生,采集开始
            2.exec tail -f access.log 只能监听文件追加的内容
    以上1和2都没办法满足我们的log日志采集,因为既要监控文件也要监控文件夹,
    **好在Flume1.7的稳定版本提供了TAILDIR类型的source,
    可以监控一个目录,并且使用正则表达式匹配目录中的文件名进行实时收集,具体配置详情如下:
        a1.sources.r1.type = TAILDIR
        a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
        a1.sources.r1.filegroups = f1 f2
        a1.sources.r1.filegroups.f1 = /var/log/test1/example.log 
        a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
    解释:
        filegroups:指定 filegroups,可以有多个[每个还可以使用正则表达式来匹配],
        以空格分隔;(TailSource 可以同时监控 tail 多个目录中的文件)
      **positionFile:解决了机器重启后无法**断点续传**的问题[检查点文件会以 json 格式保存已经 tail 文件的位置]

> 数据预处理
    通过MapReduce程序对采集到的原始日志数据进行预处理,
    比如清洗,格式整理,滤除脏数据等,并且梳理成点击流模型数据
        1.一般来说,开发中针对不合法的数据,我们不是直接删除,而是打个标签 比如true或者false,
        因为这些数据可能对这个场景是无用的,但是对其他场景是有用的
        2.编写MapReduce程序,只有map没有reduce,因为输入一条数据,处理完后直接输出不需要聚合,setReduceNum = 0 ,输出的结果文件就是part-m
        3.编写相对应的Javabean的时候要实现Writable接口,
        重写toString方法的时候是按照Hive的默认分隔符'\001'进行分割的,
        导入hive表的时候直接按照默认的分隔符就ok了,
        注意:readFields 和 write 的方法写得时候要一致对应
        4.业务要求状态码为400以上的设置valid为false,还有时间不合法[为null或者双引号]
        5.过滤掉静态资源[图片/css/js],这个标准是根据业务来定的,一般会在mapper的set up 
        初始化方法里定义hashSet来存这些准则,然后进行标记清除
        6.PageViews数据生成: (session[UUID] + stayTime + step)
        ---------------------------------------------------------------------------------------
                    Session + IP + 地址 + 时间 + 访问页面 + URL + 停留时长 + 第几步
        ---------------------------------------------------------------------------------------
            a) 编写MapReduce程序,map端以Ip为key,Javabean为value,发送到reduce,
            reduce端进行values的排序,这里需要遍历values,每一次都new
            一个新的Javabean然后进行赋值[因为Javabean是引用类型,然后添加入新的ArrayList中,
            如果不重新new 一个新的对象的话,那么到最后ArrayList里面的对象就是同一个,因为他们指向的都是同一块堆内存!!!],
            再把这个Javabean添加到新的ArrayList中
            b) 按照时间排序,然后对新的ArrayList进行排序,Collections.sort(beansList,new Comparabtor(javabean){中间获取时间来进行升序排序})
            c) 从有序的beans中分辨出歌词visit,并对一次visit中所访问的page按顺序标号step
                核心思想:比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session[生成UUID],否则属于不同的session
                只有1条的和大于1条的,他们的默认时间都是60秒
        7.Visits模型:数据来自PageViews模型
        ---------------------------------------------------------------------------------------
        Session + 起始时间 + 结束时间 + 进入页面URL + 离开页面URL + 访问页面数 + 停留时长 + IP + referer
        ---------------------------------------------------------------------------------------
             编写MapReduce程序:
                 mapper端:k为session,value:javabean
                 reduce端:
                     以step进行排序


> 数据入库
    将预处理之后的数据导入到Hive仓库中相应的库和表中
> 数据分析
    项目的核心内容,即根据需求开发ETL分析语句,得出各种统计结果
> 工作流调度:
    简单的任务调度:
        可以使用Linux的crontab -e 来设置调度,但是其缺点是无法设置依赖
    复杂的任务调度:
        推荐使用 : azkaban [Java语言实现的,他有管理页面,配置起来比较简单]
            azkaban是由LinkedIn公司推出的一个批量工作流任务调度器,
            用于在一个工作流内以一个特定的顺序运行一组工作和流程。
            使用job配置文件简历任务之间的依赖关系,并提供了一个已使用的web用户界面维护和跟踪工作流。
            支持command、Java、Hive、pig、Hadoop,而且是基于java开发,代码结构清晰,抑郁二次开发
                azkaban的组成:
                    1.mysql服务器
                        用于存储项目、日志或者执行计划[执行周期等]之类的信息
                    2.web服务器:
                        使用jetty[开源的serverlet容器]对外提供web服务,使用户可以通过wen页面方便管理
                    3.executor服务器:
                        负责具体的工作流的提交、执行
           配置azkaban步骤:[cluster模式]
                   1.生成keystore证书文件,mv到webserver下
                   2.配置为年修改一下时区:Asia/Shanghai
                   3.配置数据库mysql
                   4.配置user admin的登录
          使用azkaban的步骤:
                  1.创建a.job文件并且已经配置b.job ,里面的type为command,中间可以配置的dependencies=b,然后command=xxxx
                    期间要把这些job打成zip包,通过web提交上去配置立刻执行还是scheduler定期执行
                      # a.job
                    type=command
                    dependencies=b
                    command=echo hahaha
                这样的话a的job任务就会等待b结束后再执行
                2.hdfs操作任务
                    command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz
                  3.MapReduce操作任务
                      command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop jar hadoop-mapreduce- examples-2.6.1.jar wordcount /wordcount/input /wordcount/azout
                  4.hive操作任务
                      执行一个命令是 command=/xx/hive -e 'show tables'
                      执行一个文件,里面是hive sql语句, commmand=/xx/hive -f 'test.sql'
                      Hive 脚本: test.sql
                        type=command
                        command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop jar hadoop-mapreduce- examples-2.6.1.jar wordcount /wordcount/input /wordcount/azout
                        use default;
                        drop table aztest;
                        create table aztest(id int,name string) row format delimited fields terminated by ',';
                        load data inpath '/aztest/hiveinput' into table aztest;
                        create table azres as select * from aztest;
                        insert overwrite directory '/aztest/hiveoutput' select count(1) from aztest;
        不推荐使用: ooize[虽然是Apache旗下的,但是工作流的过程是编写大量的XML文件配置,而且代码复杂度比较高,不易于二次开发]
> 数据展现
    将分析所得到的数据进行数据可视化,一般通过图表[百度的echarts]进行展示

模块开发-数据仓库的设计

1.纬度建模
    纬度表(demension)
        通常指 按照类别、区域或者时间等等来分析,维度表数据比较固定,数据量小
     事实表
        事实表的设计是以能够正确记录历史信息为准则也就是一条一条的数据,就像是消费记录里面有product_id
        维度表的设计是以能够以合适的角度来聚合主题内容为准则  这边有product_id对应的产品信息
2.纬度建模三种模式:
    2.1 星型模式 [像星星一样]
            由一个事实表和一组维度表组成    
                比如:
                    事实表里有地域主键、时间键、部门键、产品键 对应有4个维度表相关联
    2.2 雪花模式[不常用,因为不容易维护!!!]
            在星型模式基础上,维度表还有维度表
    2.3 星座模式 [开发常用!!!]
        基于多张事实表,而且共享纬度信息

本项目数据仓库的设计:

1.事实表的设计 ods_weblog_orgin => 对应mr清洗完之后的数据 【窄表】和【宽表或者明细表】
    窄表:对应原始数据表,字段跟数据中一一对应,但是不利于分析
---------------------------------------------------------------------------------------------------------------
        valid  remote_addr remote_user time_local  request status  body_bytes_sent http_referer  http_user_agent
        是否有效 访客IP         访客用户信息  请求时间     请求url  响应码    相应字节数       来源url         访客终端信息
---------------------------------------------------------------------------------------------------------------
    宽表:把某些融合各种信息的字段 提取出不同的信息作为新的字段
        相对于之前的窄表 字段增加了,所以叫宽表,
        比如时间戳,如果是之前的话 '2018-09-09 18:09:09'这种时间不利于分析,
        如果分成年,月,日,那么分析时直接group by day 或者 year 或者day 就ok了
        还有referer_url也是如此,可以拆分为host或者参数之类的

2.维度表的设计如:
    时间维度 t_dim_time: date_key year month day hour 
    访客地域纬度t_dim_area: area_ID 北京 上海 广州 深圳
    终端类型维度 t_dim_termination: uc firefox chrome safari ios android
    网站栏目纬度 t_dim_section: 进口食品、生鲜日配、时令果蔬、奶制品、
                                休闲保健、酒饮冲调茶叶、粮油副食、母婴玩具、个护清洁、家具家电
    维度表的数据一般要结合业务情况自己写脚本按照规则生成,也可以用工具来生成,方便后续关联分析
    比如事先生成时间维度表中的数据,跨度从业务需求的日期到当前的日期即可,具体根据分析粒度,
    库生成年,季,月,周,天,时等相关信息,用于分析

数据仓库三层架构:

ods层:数据就是通过mr清洗过的数据,带有标签valid或者标识的数据
    1.创建ODS层数据表
        1.1. 原始日志数据表 :创建按照时间来分区的hive 分区表
            drop table if exists ods_weblog_origin;
            create table ods_weblog_origin
            (
            valid string,remote_addr string,remote_user string, time_local string,request string,status string, body_bytes_sent string, http_referer string, http_user_agent string
            )
            partitioned by (datestr string)
            row format delimited 
            fields terminated by '\001';
        1.2. 点击流模型 PageViews表
            drop table if exists ods_click_pageviews;
            create table ods_click_pageviews
            (
            session string,remote_addr string,remote_user string, time_local string,request string,visit_step string, page_staylong string, http_referer string, http_user_agent string, body_bytes_sent string, status string
            )
            partitioned by (datestr string)
            row format delimited 
            fields terminated by '\001';
        1.3. 点击流模型 Visits
            drop table if exists ods_click_visits;
            create table ods_click_visits
            (
            )
            partitioned by (datestr string)
            row format delimited 
            fields terminated by '\001';
        1.4. 维度表创建(这里举例:时间,年、月、日、时)
            drop table if exists t_dim_time;
            create table t_dim_time 
            (
            date_key int,...
            )
            row format delimited 
            fields terminated by ',';
        1.5 创建明细宽表 ods_weblog_detail 时间可以明细为 年月日时分秒,
            referer_url 可以明细为 host、path、query、queryid
            从预清洗后的表中得到这些数据,如果是referer_url 需要使用Hive里定义的函数:
                lateral view parse_url_tuple(正则表达式)这个方法,自动把url转换为host、path等
            如果是时间拓宽明细表的话 就是 substring
dw层:ods通过ETL处理之后得到dw层
        多维度统计PV总量:
            b) 与时间维度表关联查询
                insert into table dw_pvs_everyday select count(*) as pvs,a.month as month,a.day as day 
                from 
                (select distinct month, day from t_dim_time) a 
                join 
                ods_weblog_detail b 
                on a.month=b.month and a.day=b.day group by a.month,a.day;
            c) 按照referer维度进行统计每小时各来访 url 产生的 PV 量
                insert into table dw_pvs_referer_everyhour partition(datestr='20130918')
                select http_referer,ref_host,month,day,hour,count(1) as pv_referer_cnt
                from 
                ods_weblog_detail
                group by http_referer,ref_host,month,day,hour
                having ref_host is not null
                order by hour asc,day asc,month asc,pv_referer_cnt desc;
            d) 人均浏览量
                统计今日所有来访者平均请求的页面数。
                    insert into table dw_avgpv_user_everyday
                    select 
                    '20130918',sum(b.pvs)/count(b.remote_addr) 
                    from
                    (select remote_addr,count(1) as pvs from ods_weblog_detail where datestr='20130918' group by remote_addr) b; 
            e)特别重要:分组求TopN ************非常重要**********
                row_number()函数
                    row_number() over (partition by xxx order by xxx) rank
                insert into table dw_pvs_refhost_topn_everyhour partition(datestr='20130918') 
                select t.hour,t.od,t.ref_host,t.ref_host_cnts 
                from(
                select ref_host,ref_host_cnts,concat(month,day,hour) as hour,row_number() over (partition by concat(month,day,hour) order by ref_host_cnts desc
                ) as od 
                from 
                dw_pvs_refererhost_everyhour) t 
                where od<=3;
            f) 受访分析(从页面的角度分析)
                热门页面统计
                    统计每日最热门的页面 top10
                        insert into table dw_hotpages_everydayselect '20130918',a.request,a.request_counts 
                        from
                        (select request as request,count(request) as request_counts 
                        from 
                        ods_weblog_detail 
                        where datestr='20130918' 
                        group by request having request is not null
                        ) a 
                        order by a.request_counts desc 
                        limit 10;
            g) 每小时独立访客及其产生的 pv
                insert into table dw_user_dstc_ip_h 
                select remote_addr,count(1) as pvs,concat(month,day,hour) as hour 
                from 
                ods_weblog_detail Where datestr='20130918' 
                group by concat(month,day,hour),remote_addr;
                    在以上的结果基础上,统计每小时独立访客总数
                        select count(1) as dstc_ip_cnts,hour from dw_user_dstc_ip_h group by hour;
                    统计每日独立访客总数
                        select remote_addr,count(1) as counts,concat(month,day) as day 
                        from 
                        ods_weblog_detail Where datestr='20130918' 
                        group by concat(month,day),remote_addr;
                    统计每月独立访客总数
                        select 
                        remote_addr,count(1) as counts,month 
                        from 
                        ods_weblog_detail 
                        group by month,remote_addr;
            h) 每日新访访客 today left join old ***************非常重要*************
                insert into table dw_user_new_d partition(datestr='20130918') 
                select tmp.day as day,tmp.today_addr as new_ip 
                from 
                ( select today.day as day,today.remote_addr as today_addr,old.ip as old_addr from (select distinct remote_addr as remote_addr,"20130918" as day from ods_weblog_detail where datestr="20130918") today left outer join dw_user_dsct_history old on today.remote_addr=old.ip ) tmp 
                where tmp.old_addr is null;
            注意:每日新用户追加到累计表
            i) 访客 Visit 分析(点击流模型)
                查询今日所有回头访客及其访问次数。
                    insert overwrite table dw_user_returning partition(datestr='20130918') 
                    select tmp.day,tmp.remote_addr,tmp.acc_cnt 
                    from 
                    (select '20130918' as day,remote_addr,count(session) as acc_cnt from ods_click_stream_visit group by remote_addr) tmp 
                    where tmp.acc_cnt>1;
            j) 人均访问频次
                统计出每天所有用户访问网站的平均次数(visit)
                    select sum(pagevisits)/count(distinct remote_addr) from ods_click_stream_visit where datestr='20130918';
            k) 关键路径转化率分析(漏斗模型) -- 基于PageViews模型
                定义好业务流程中的页面标识,下例中的步骤为[模型设计]: Step1、 /item
                                                          Step2、 /category
                                                         Step3、 /index
                                                        Step4、 /order
                --查询每一步人数存入 dw_oute_numbs
                create table dw_oute_numbs as
                select 'step1' as step,count(distinct remote_addr) and request like '/item%'
                union
                select 'step2' as step,count(distinct remote_addr) and request like '/category%'
                union
                select 'step3' as step,count(distinct remote_addr) and request like '/order%'
                union
                select 'step4' as step,count(distinct remote_addr) and request like '/index%';
                as numbs from ods_click_pageviews where datestr='20130920'
                as numbs from ods_click_pageviews where datestr='20130920'
                as numbs from ods_click_pageviews where datestr='20130920'
                as numbs from ods_click_pageviews where datestr='20130920'
                注:UNION 将多个 SELECT 语句的结果集合并为一个独立的结果集。
                ***利用级联求和自己和自己join ******************非常重要********************
                inner join
                select abs.step,abs.numbs,abs.rate as abs_ratio,rel.rate as leakage_rate
                from
                (
                select tmp.rnstep as step,tmp.rnnumbs as numbs,tmp.rnnumbs/tmp.rrnumbs as rate
                from
                (
                select rn.step as rnstep,rn.numbs as rnnumbs,rr.step as rrstep,rr.numbs as rrnumbs inner join
                dw_oute_numbs rr) tmp
                where tmp.rrstep='step1'
                ) abs
                left outer join
                (
                select tmp.rrstep as step,tmp.rrnumbs/tmp.rnnumbs as rate
                from
                (
                select rn.step as rnstep,rn.numbs as rnnumbs,rr.step as rrstep,rr.numbs as rrnumbs inner join
                dw_oute_numbs rr) tmp
                where cast(substr(tmp.rnstep,5,1) as int)=cast(substr(tmp.rrstep,5,1) as int)-1
                ) rel
                on abs.step=rel.step;
                from dw_oute_numbs rn
                其中 cast(substr(tmp.rnstep,5,1) as int) 是 把字符串截取字符 然后强制转化为int
            ) 还可以按照栏目纬度和UA(user agent)纬度来分析PV,
                为了说明PV是可以从各个纬度去分析的

app层:应用层来拿数据展示

Sqoop:是Hadoop和关系数据库服务器之间传送数据的一种工具-sql到Hadoop和Hadoop到sql

sqoop工作机制是将导入或导出命令翻译成MapReduce程序来实现,
在翻译出的MapReduce中主要是对inputformat和outputformat进行定制
1.从关系型数据库(mysql)导入到hadoop 是 DBIputformat,import命令
    bin/sqoop import \
    --connect jdbc:mysql://node-21:3306/sqoopdb \ --username root \
    --password hadoop \
    --target-dir /sqoopresult \ //--target-dir 可以用来指定导出数据存放至 HDFS 的目录;
    --table emp --m 1   //m 1 表示一个map来跑
2.导入 mysql 表数据到 HIVE
    2.1 将关系型数据的表结构复制到 hive 中
    bin/sqoop create-hive-table \
    --connect jdbc:mysql://node-21:3306/sqoopdb \ --table emp_add \
    --username root \
    --password hadoop \
    --hive-table test.emp_add_sp
    2.2 以上只是复制表的结构,并没有将数据导进去,将数据导入Hive表中
        bin/sqoop import \
        --connect jdbc:mysql://node-21:3306/sqoopdb \ --username root \
        --password hadoop \
        --table emp_add \
        --hive-table test.emp_add_sp \
        --hive-import \   ****
        --m 1
    2.3 复杂查询条件:如果不指定分隔符是默认逗号
        bin/sqoop import \
        --connect jdbc:mysql://node-21:3306/sqoopdb \ --username root \
        --password hadoop \
        --target-dir /wherequery12 \
        --query 'select id,name,deg from emp WHERE --split-by id \
        --fields-terminated-by '\t' \
        --m 1
    2.4 下面的命令用于在 EMP 表执行增量导入:
        bin/sqoop import \
        --connect jdbc:mysql://node-21:3306/sqoopdb \ --username root \
        --password hadoop \
        --table emp --m 1 \
        --incremental append \  ****
        --check-column id \  ****
        --last-value 1205    ****
    3. Sqoop 导出
        将数据从 HDFS 导出到 RDBMS 数据库导出前,目标表必须存在于目标数据库中。
        bin/sqoop export \
        --connect jdbc:mysql://node-21:3306/sqoopdb \ --username root \
        --password hadoop \
        --table employee \
        --export-dir /emp/emp_data
        还可以用下面命令指定输入文件的分隔符
        --input-fields-terminated-by '\t'

工作流调度:

整个项目的数据按照处理过程,从数据采集到数据分析,再到结果数据的到处,
一系列的任务可以分割成若干个azkaban的job单元,然后由工作流调度器调度执行。
调度脚本的编写难点在于shell脚本
shell脚本大体框架如下:
    #!/bin/bash
    #set java env
    #set hadoop env
    #设置一些主类、目录等常量
    #获取时间信息
    #shell 主程序、结合流程控制(if....else)去分别执行 shell 命令。 更多工作流及 hql 脚本定义见参考资料。
    hive -e执行sql语句

数据可视化

Echarts:
    百度前端技术部开发的,基于JavaScript的数据可视化图标库,
    可以构建折线图(区域图)、柱状 图(条状图)、散点图(气泡图)、饼图(环形图)、
    K 线图、地图、力导向布局图以及和弦图, 同时支持任意维度的堆积和多图表混合展现。
javaEE中web.xml 的<url-pattern>/</url-pattern> 是拦截所有,jsp除外

1.Mybatis example 排序问题 example.setOrderByClause("`dateStr` ASC");
查询结果便可以根据 dataStr 字段正序排列(从小到大)
如何区分不同数据仓库层的表:
2.Echarts 前端数据格式问题
注意,当异步加载数据的时候,前端一般需要的是数据格式是数组。一定要对应上。在 这里我们可以使用 Java Bean 封装数据,然后转换成 json 扔到前端,对应    上相应的字段即 可。
ObjectMapper om = new ObjectMapper(); beanJson = om.writeValueAsString(bean);
3.Controller 返回的 json @RequestMapping(value="/xxxx",produces="application/json;charset=UTF-8")
@ResponseBody    

一般使用第一种[业内默认的]
1.表之前加前缀 ods_T_access.log
                dw_T_access.log
2.针对不同的数据仓库层 建立对应的数据库 database