搜档网
当前位置:搜档网 › 尚硅谷大数据技术之ELK

尚硅谷大数据技术之ELK

尚硅谷大数据技术之ELK
尚硅谷大数据技术之ELK

第1章 Elasticsearch 概述

1.1 什么是搜索?

百度:我们比如说想找寻任何的信息的时候,就会上百度去搜索一下,比如说找一部自己喜欢的电影,或者说找一本喜欢的书,或者找一条感兴趣的新闻(提到搜索的第一印象)。百度 != 搜索

1)互联网的搜索:电商网站,招聘网站,新闻网站,各种app

2)IT 系统的搜索:OA 软件,办公自动化软件,会议管理,日程管理,项目管理。 搜索,就是在任何场景下,找寻你想要的信息,这个时候,会输入一段你要搜索的关键字,然后就期望找到这个关键字相关的有些信息

1.2 如果用数据库做搜索会怎么样?

如果用数据库做搜索会怎么样?

select * from products where product_name list “%牙膏%”商品id

商品名称商品描述

1高露洁牙膏2中华牙膏3

佳洁士牙膏4其他牙膏京东商城搜索框

5 ……

1万条

京东商城后台商品表逐条遍历

1)比如说“商品描述”字段的长度,有长达数千个,甚至数万个字符,

这个时候,每次都要对每条记录的所有文本进行扫描,判断包不包含我指定的这个关键词(比如说“牙膏”),效率非常低。

select * from products where product_name list “%生化机%”

2)还不能将搜索词拆分开来,尽可能去搜索更多的符合你的期望的结果,

比如输入“生化机”,就搜索不出来“生化危机”。

用数据库来实现搜索,是不太靠

谱的。通常来说,性能会很差的。

1千字的商品描述

1千字的商品描述

用数据库来实现搜索,是不太靠谱的。通常来说,性能会很差的。

1.3 什么是全文检索和Lucene ?

1)全文检索,倒排索引

全文检索是指计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。全文搜索搜索引擎数据库中的数据。

倒排索引原理简介

关键词

ids

商品描述1生化危机电影商品描述2生化危机海报商品描述3生化危机文章商品描述4

生化危机新闻

生化危机电影生化危机海报生化危机文章生化

危机

新闻

生化1,2,3,4危机1,2,3,4电影

1

海报2文章3新闻

4

1 数据库里的数据

2 切词

3 倒排索引总结1:数据库里的数据,一共100万条,按照之前的思路,其实就要扫描100万次,而且每次扫描,都需要匹配那个文本所有的字符,确认是否包含搜索的关键词,而且还不能将搜索词拆解开来进行检索

总结2:利用倒排索引,进行搜索的话,假设100万条数据,拆分出来的词语,假设有1000万个词语,那么在倒排索引中,就有1000万行,我们可能并不需要搜索1000万次,很可能,在搜索到第一次的时候,我们就可以找到这个搜索词对应的数据。也可能滴100次,或者第1000次

查找:生化机

返回1,2,3,4商品

2)lucene ,就是一个jar 包,里面包含了封装好的各种建立倒排索引,以及进行搜索的代码,包括各种算法。我们就用java 开发的时候,引入lucene jar ,然后基于lucene 的api 进行去进行开发就可以了。

1.4 什么是Elasticsearch ?

Elasticsearch ,基于Lucene ,隐藏复杂性,提供简单易用的RestfulAPI 接口、JavaAPI 接口(还有其他语言的API 接口)。

关于Elasticsearch 的一个传说,有一个程序员失业了,陪着自己老婆去英国伦敦学习厨师课程。程序员在失业期间想给老婆写一个菜谱搜索引擎,觉得Lucene 实在太复杂了,就开发了一个封装了Lucene 的开源项目:Compass 。后来程序员找到了工作,是做分布式的高性能项目的,觉得Compass 不够,就写了Elasticsearch ,让Lucene 变成分布式的系统。

Elasticsearch 是一个实时分布式搜索和分析引擎。它用于全文搜索、结构化搜索、分析。 全文检索:将非结构化数据中的一部分信息提取出来,重新组织,使其变得有一定结构,然后对此有一定结构的数据进行搜索,从而达到搜索相对较快的目的。

结构化检索:我想搜索商品分类为日化用品的商品都有哪些,select * from products where category_id='日化用品'

数据分析:电商网站,最近7天牙膏这种商品销量排名前10的商家有哪些;新闻网站,最近1个月访问量排名前3的新闻版块是哪些

1.5 Elasticsearch 的适用场景

1)维基百科,类似百度百科,牙膏,牙膏的维基百科,全文检索,高亮,搜索推荐。

2)The Guardian(国外新闻网站),类似搜狐新闻,用户行为日志(点击,浏览,收藏,评论)+ 社交网络数据(对某某新闻的相关看法),数据分析,给到每篇新闻文章的作者,让他知道他的文章的公众反馈(好,坏,热门,垃圾,鄙视,崇拜)。

3)Stack Overflow(国外的程序异常讨论论坛),IT问题,程序的报错,提交上去,有人会跟你讨论和回答,全文检索,搜索相关问题和答案,程序报错了,就会将报错信息粘贴到里面去,搜索有没有对应的答案。

4)GitHub(开源代码管理),搜索上千亿行代码。

5)国内:站内搜索(电商,招聘,门户,等等),IT系统搜索(OA,CRM,ERP,等等),数据分析(ES热门的一个使用场景)。

1.6 Elasticsearch的特点

1)可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据,服务大公司;也可以运行在单机上,服务小公司

2)Elasticsearch不是什么新技术,主要是将全文检索、数据分析以及分布式技术,合并在了一起,才形成了独一无二的ES;lucene(全文检索),商用的数据分析软件(也是有的),分布式数据库(mycat)

3)对用户而言,是开箱即用的,非常简单,作为中小型的应用,直接3分钟部署一下ES,就可以作为生产环境的系统来使用了,数据量不大,操作不是太复杂4)数据库的功能面对很多领域是不够用的(事务,还有各种联机事务型的操作);特殊的功能,比如全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理;Elasticsearch作为传统数据库的一个补充,提供了数据库所不能提供的很多功能

1.7 Elasticsearch的核心概念

1.7.1 近实时

近实时,两个意思,从写入数据到数据可以被搜索到有一个小延迟(大概1秒);基于es执行搜索和分析可以达到秒级。

1.7.2 Cluster(集群)

集群包含多个节点,每个节点属于哪个集群是通过一个配置(集群名称,默认是elasticsearch)来决定的,对于中小型应用来说,刚开始一个集群就一个节点很正常

1.7.3 Node(节点)

集群中的一个节点,节点也有一个名称(默认是随机分配的),节点名称很重要(在执行运维管理操作的时候),默认节点会去加入一个名称为“elasticsearch”的集群,如果直接启动一堆节点,那么它们会自动组成一个elasticsearch集群,当然一个节点也可以组成一个elasticsearch集群。

1.7.4 Index(索引-数据库)

索引包含一堆有相似结构的文档数据,比如可以有一个客户索引,商品分类索引,订单索引,索引有一个名称。一个index包含很多document,一个index就代表了一类类似的或者相同的document。比如说建立一个product index,商品索引,里面可能就存放了所有的商品数据,所有的商品document。

1.7.5 Type(类型-表)

6.0版本之前每个索引里都可以有多个type,6.0版本之后每个索引里面只能有一个Type,一般使用_doc代替了。

商品index,里面存放了所有的商品数据,商品document

商品type:product_id,product_name,product_desc,category_id,category_name,service_period

每一个type里面,都会包含一堆document

{

"product_id": "1",

"product_name": "长虹电视机",

"product_desc": "4k高清",

"category_id": "3",

"category_name": "电器",

"service_period": "1年"

}

{

"product_id": "2",

"product_name": "基围虾",

"product_desc": "纯天然,冰岛产",

"category_id": "4",

"category_name": "生鲜",

"eat_period": "7天"

}

1.7.6 Document(文档-行)

文档是es中的最小数据单元,一个document可以是一条客户数据,一条商品分类数据,

一条订单数据,通常用JSON数据结构表示,每个index下的type中,都可以去存储多个document。

1.7.7 Field(字段-列)

Field是Elasticsearch的最小单位。一个document里面有多个field,每个field就是一个数据字段。

product document

{

"product_id": "1",

"product_name": "高露洁牙膏",

"product_desc": "高效美白",

"category_id": "2",

"category_name": "日化用品"

}

1.7.8 Mapping(映射-约束)

数据如何存放到索引对象上,需要有一个映射配置,包括:数据类型、是否存储、是否分词等。

Mapping用来定义Document中每个字段的类型,即所使用的分词器、是否索引等属性,非常关键等。创建Mapping 的代码示例如下:

PUT student

{

"mappings": {

"_doc":{

"properties":{

"stu_id":{

"type":"keyword",

"store":"true"

},

"name":{

"type":"keyword"

},

"birth":{

"type":"date" (yyyy-MM-dd)

}

}

}

}

}

1.7.9 ElasticSearch与数据库的类比

1.7.10 ElasticSearch 存入数据和搜索数据机制

Elasticsearch 存入数据和搜索数据机制

Article 文章(Document 对象){

id:1

title:学习Elasticsearch

content:Elasticsearch 是一个非常不错的全文检索的搜索服务器}

Mapping

index : 'blog',type : 'article',body : {article: {

properties: {id: {

type: 'string',analyzer: 'ik',store: ‘yes',

创建索引的对象blog

Article 类型(文章)Comment 类型(评论)

索引区域数据区域学习12

Elasticsearch1非常1不错1全文1检索1搜索1 2服务器1 2

Docid=1

映射:字段类型、是否存储、是否分词

Article 文章(Document 对象){

id:2

title:学习

content:搜索服务器}

Docid=2

待存储的内容

1)索引对象(blog ):存储数据的表结构 ,任何搜索数据,存放在索引对象上 。 2)映射(mapping ):数据如何存放到索引对象上,需要有一个映射配置, 包括:数据类型、是否存储、是否分词等。

3)文档(document ):一条数据记录,存在索引对象上。

4)文档类型(type ):一个索引对象,存放多种类型数据,数据用文档类型进行标识。

第2章 Elasticsearch 快速入门

2.1 安装包下载

1)ElasticSearch 官网: https://www.elastic.co/cn/downloads/elasticsearch

2.2 Elasticsearch安装

1)解压elasticsearch-6.3.1.tar.gz到/opt/module目录下

[atguigu@hadoop102 software]$ tar -zxvf elasticsearch-6.3.1.tar.gz -C /opt/module/

2)在/opt/module/elasticsearch-6.3.1路径下创建data和logs文件夹

[atguigu@hadoop102 elasticsearch-6.3.1]$ mkdir data

[atguigu@hadoop102 elasticsearch-6.3.1]$ mkdir logs

3)修改配置文件/opt/module/elasticsearch-6.3.1/config/elasticsearch.yml

[atguigu@hadoop102 config]$ pwd

/opt/module/elasticsearch-6.3.1/config

[atguigu@hadoop102 config]$ vi elasticsearch.yml

#-----------------------Cluster-----------------------

https://www.sodocs.net/doc/0412751243.html,: my-application

#-----------------------Node-----------------------

https://www.sodocs.net/doc/0412751243.html,: node-102

#-----------------------Paths-----------------------

path.data: /opt/module/elasticsearch-6.3.1/data

path.logs: /opt/module/elasticsearch-6.3.1/logs

#-----------------------Memory-----------------------

bootstrap.memory_lock: false

bootstrap.system_call_filter: false

#-----------------------Network-----------------------

network.host: 192.168.9.102

#-----------------------Discovery-----------------------

discovery.zen.ping.unicast.hosts: ["192.168.9.102"]

(1)https://www.sodocs.net/doc/0412751243.html,

如果要配置集群需要两个节点上的elasticsearch配置的https://www.sodocs.net/doc/0412751243.html,相同,都启动可以自动组成集群,这里如果不改https://www.sodocs.net/doc/0412751243.html,则默认是https://www.sodocs.net/doc/0412751243.html,=my-application,

(2)nodename随意取但是集群内的各节点不能相同

(3)修改后的每行前面不能有空格,修改后的“:”后面必须有一个空格

分发至hadoop103以及hadoop104,分发之后修改:

https://www.sodocs.net/doc/0412751243.html,: node-103

network.host: 192.168.9.103

https://www.sodocs.net/doc/0412751243.html,: node-104

network.host: 192.168.9.104

5)配置linux系统环境(参考:https://www.sodocs.net/doc/0412751243.html,/satiling/article/details/59697916)

(1)切换到root用户,编辑limits.conf 添加类似如下内容

[root@hadoop105 elasticsearch-6.3.1]# vi /etc/security/limits.conf 添加如下内容:

* soft nofile 65536

* hard nofile 131072

* soft nproc 2048

* hard nproc 4096

(2)切换到root用户,进入limits.d目录下修改配置文件。

[root@hadoop102 elasticsearch-6.3.1]# vi /etc/security/limits.d/90-nproc.conf

修改如下内容:

* soft nproc 1024

#修改为

* soft nproc 4096

(3)切换到root用户修改配置sysctl.conf

[root@hadoop102 elasticsearch-6.3.1]# vi /etc/sysctl.conf

添加下面配置:

vm.max_map_count=655360

并执行命令:

[root@hadoop102 elasticsearch-6.3.1]# sysctl -p

以上修改的Linux配置需要分发至其他节点。

然后,重新启动Linux,必须重启!!!

6)启动elasticsearch

[atguigu@hadoop102 elasticsearch-6.3.1]$ bin/elasticsearch

7)测试elasticsearch

[atguigu@hadoop102 elasticsearch-6.3.1]$ curl http://hadoop102:9200 {

"name" : "node-102",

"cluster_name" : "my-application",

"cluster_uuid" : "KOpuhMgVRzW_9OTjMsHf2Q",

"version" : {

"number" : "6.3.1",

"build_flavor" : "default",

"build_type" : "tar",

"build_hash" : "eb782d0",

"build_date" : "2018-06-29T21:59:26.107521Z",

"build_snapshot" : false,

"lucene_version" : "7.3.1",

"minimum_index_compatibility_version" : "5.0.0"

},

"tagline" : "You Know, for Search"

}

8)停止集群

kill -9 进程号

9)群起脚本

[atguigu@hadoop102 bin]$ vi es.sh

#!/bin/bash

es_home=/opt/module/elasticsearch-6.3.1

case $1 in

"start") {

for i in hadoop102 hadoop103 hadoop104

do

echo "==============$i=============="

ssh $i "source /etc/profile;${es_home}/bin/elasticsearch >/dev/null 2>&1 &"

done

};;

"stop") {

for i in hadoop102 hadoop103 hadoop104

do

echo "==============$i=============="

ssh $i "ps -ef|grep $es_home |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1

done

};;

esac

2.3 Elasticsearch操作工具

2.3.1 浏览器

2.3.2 Linux命令行

请求:

[root@linux1 es]curl -XPOST 'http://192.168.9.102:9200/atguigu/doc' -i -H "Content-Type:application/json" -d '{"name":"haha","age":"10"}'

响应:

HTTP/1.1 201 Created

Location: /atguig/doc/Hm7z1GwBEd-elda4prGr

content-type: application/json; charset=UTF-8

content-length: 172

{"_index":"atguigu","_type":"doc","_id":"Hm7z1GwBEd-elda4prGr","_ version":1,"result":"created","_shards":{"total":2,"successful":1 ,"failed":0},"_seq_no":0,"_primary_term":1}

2.3.3 Kibana的Dev Tools

第3章Elasticsearch高阶

3.1 数据类型

3.1.1 核心数据类型

字符串型:text、keyword

数值型:long、integer、short、byte、double、float、half_float、scaled_float

日期类型:date

布尔类型:boolean

二进制类型:binary

范围类型:integer_range、float_range、long_range、double_range、date_range

3.1.2 复杂数据类型

数组类型:array

对象类型:object

嵌套类型:nested object

3.1.3 地理位置数据类型

geo_point(点)、geo_shape(形状)

3.1.4 专用类型

记录IP地址ip

实现自动补全completion

记录分词数:token_count

记录字符串hash值murmur3

多字段特性multi-fields

3.2 Mapping

3.2.1 手动创建

1)创建操作

PUT my_index1

{

"mappings": {

"_doc":{

"properties":{

"username":{

"type": "text",

"fields": {

"pinyin":{

"type": "text"

}

}

}

}

}

}

}

2)创建文档

PUT my_index1/_doc/1

{

"username":"haha heihei"

}

3)查询

GET my_index1/_search

{

"query": {

"match": {

"username.pinyin": "haha" }

}

—————————————————————————————}

3.2.2 自动创建

ES可以自动识别文档字段类型,从而降低用户使用成本

1)直接插入文档

PUT /test_index/_doc/1

{

"username":"alfred",

"age":1,

"birth":"1991-12-15"

}

2)查看mapping

GET /test_index/doc/_mapping

{

"test_index": {

"mappings": {

"doc": {

"properties": {

"age": {

"type": "long"

},

"birth": {

"type": "date"

},

"username": {

"type": "text",

"fields": {

"keyword": {

"type": "keyword",

"ignore_above": 256

}

}

}

}

}

}

}

}

age自动识别为long类型,username识别为text类型

3)日期类型的自动识别

日期的自动识别可以自行配置日期格式,以满足各种需求。

(1)自定义日期识别格式

PUT my_index

{

"mappings":{

"_doc":{

"dynamic_date_formats": ["yyyy-MM-dd","yyyy/MM/dd"]

}

}

}

(2)关闭日期自动识别

PUT my_index

{

"mappings": {

"_doc": {

"date_detection": false

}

}

}

4)字符串是数字时,默认不会自动识别为整形,因为字符串中出现数字时完全合理的

Numeric_datection可以开启字符串中数字的自动识别

PUT my_index

{

"mappings":{

"doc":{

"numeric_datection": true

}

}

}

3.3 IK分词器

3.3.1 为什么使用分词器

分词器主要应用在中文上,在ES中字符串类型有keyword和text两种。keyword默认不进行分词,而text是将每一个汉字拆开称为独立的词,这两种都是不适用于生产环境,所以我们需要有其他的分词器帮助我们完成这些事情,其中IK分词器是应用最为广泛的一个分词器。

1)keyword类型的分词

GET _analyze

{

"keyword":"我是程序员"

}

结果展示(报错)

{

"error": {

"root_cause": [

{

"type": "illegal_argument_exception",

"reason": "Unknown parameter [keyword] in request body or parameter is of the wrong type[VALUE_STRING] "

}

],

"type": "illegal_argument_exception",

"reason": "Unknown parameter [keyword] in request body or parameter is of the wrong type[VALUE_STRING] "

},

"status": 400

}

2)text类型的分词

GET _analyze

{

"text":"我是程序员"

}

结果展示:

{

"tokens": [

{

"token": "我",

"start_offset": 0,

"end_offset": 1,

"type": "",

"position": 0

},

{

"token": "是",

"start_offset": 1,

"end_offset": 2,

"type": "",

"position": 1

},

{

"token": "程",

"start_offset": 2,

"end_offset": 3,

"type": "",

"position": 2

},

{

"token": "序",

"start_offset": 3,

"end_offset": 4,

"type": "",

"position": 3

},

{

"token": "员",

"start_offset": 4,

"end_offset": 5,

"type": "",

"position": 4

}

]

}

3.3.2 IK分词器安装

1)下载与安装的ES相对应的版本

2)解压elasticsearch-analysis-ik-6.3.1.zip,将解压后的IK文件夹拷贝到ES安装目录下的plugins目录下,并重命名文件夹为ik(什么名称都OK)

[atguigu@hadoop102 software]$ unzip elasticsearch-analysis-ik-6.3.1.zip -d /opt/module/elasticsearch-6.3.1/plugins/ik/

3)分发分词器目录

[atguigu@hadoop102 elasticsearch-6.3.1]$ xsync plugins/

4)重新启动Elasticsearch,即可加载IK分词器

3.3.3 IK分词器测试

IK提供了两个分词算法ik_smart 和ik_max_word,其中ik_smart 为最少切分,ik_max_word为最细粒度划分。

1)最少划分ik_smart

get _analyze

{

"analyzer": "ik_smart",

"text":"我是程序员"

}

结果展示

{

"tokens" : [

{

"token" : "我",

"start_offset" : 0,

"end_offset" : 1,

"type" : "CN_CHAR",

"position" : 0

},

{

"token" : "是",

"start_offset" : 1,

"end_offset" : 2,

"type" : "CN_CHAR",

"position" : 1

},

{

"token" : "程序员",

"start_offset" : 2,

"end_offset" : 5,

"type" : "CN_WORD",

"position" : 2

}

]

}

2)最细切分ik_max_word

get _analyze

{

"analyzer": "ik_max_word",

"text":"我是程序员"

}

输出的结果为:

"tokens" : [

{

"token" : "我",

"start_offset" : 0,

"end_offset" : 1,

"type" : "CN_CHAR",

"position" : 0

},

{

"token" : "是",

"start_offset" : 1,

"end_offset" : 2,

"type" : "CN_CHAR",

"position" : 1

},

{

"token" : "程序员",

"start_offset" : 2,

"end_offset" : 5,

"type" : "CN_WORD",

"position" : 2

},

{

"token" : "程序",

"start_offset" : 2,

"end_offset" : 4,

"type" : "CN_WORD",

"position" : 3

},

{

"token" : "员",

"start_offset" : 4,

"end_offset" : 5,

"type" : "CN_CHAR",

"position" : 4

}

]

}

3.4 检索文档

向Elasticsearch增加数据

PUT /atguigu/doc/1

{

"first_name" : "John",

"last_name" : "Smith",

"age" : 25,

"about" : "I love to go rock climbing",

"interests": ["sports", "music"]

}

如果在关系型数据库Mysql中主键查询数据一般会执行下面的SQL语句select * from atguigu where id = 1;

但在Elasticsearch中需要采用特殊的方式

# 协议方法索引/类型/文档编号

GET /atguigu/doc/1

响应

{

"_index": "atguigu",

"_type": "doc",

"_id": "1",

"_version": 1,

"found": true,

"_source": { // 文档的原始数据JSON数据

"first_name": "John",

"last_name": "Smith",

"age": 25,

"about": "I love to go rock climbing",

"interests": [

"sports",

"music"

]

}

}

我们通过HTTP方法GET来检索文档,同样的,我们可以使用DELETE方法删除文档,使用HEAD方法检查某文档是否存在。如果想更新已存在的文档,我们只需再PUT一次。

3.4.1 元数据查询

GET /_cat/indices

3.4.2 全文档检索

如果在关系型数据库Mysql中查询所有数据一般会执行下面的SQL语句

select * from user;

但在Elasticsearch中需要采用特殊的方式

# 协议方法索引/类型/_search

GET /atguigu/_doc/_search

响应内容不仅会告诉我们哪些文档被匹配到,而且这些文档内容完整的被包含在其中—我们在给用户展示搜索结果时需要用到的所有信息都有了。

3.4.3 字段全值检索

如果在关系型数据库Mysql中查询多字段匹配数据(字段检索)

一般会执行下面的SQL语句

select * from atguigu where name = 'haha';

但在Elasticsearch中需要采用特殊的方式

GET atguigu/_search

{

"query": {

"bool": {

"filter": {

"term": {

"about": "I love to go rock climbing"

}

}

}

}

}

3.4.4 字段分词检索

GET atguigu/_search

{

"query": {

"match": {

"about": "I"

}

}

}

3.4.5 字段模糊检索

如果在关系型数据库Mysql中模糊查询多字段数据

一般会执行下面的SQL语句

select * from user where name like '%haha%'

但在Elasticsearch中需要采用特殊的方式,查询出所有文档字段值分词后包含haha的文档GET test/_search

{

"query": {

"fuzzy": {

"aa": {

"value": "我是程序"

}

}

}

}

3.4.6 聚合

GET test/_search

{

"aggs": {

"groupby_aa": {

"terms": {

"size": 10

}

}

}

}

3.5 API操作

新建工程并导入依赖:

org.apache.httpcomponents

httpclient

4.5.5

org.apache.httpcomponents

httpmime

4.3.6

io.searchbox

jest

5.3.3

net.java.dev.jna

jna

4.5.2

org.codehaus.janino

commons-compiler

2.7.8

3.5.1 写数据

//1.创建ES客户端连接池

JestClientFactory factory = new JestClientFactory();

//2.创建ES客户端连接地址

HttpClientConfig httpClientConfig = new HttpClientConfig.Builder("http://hadoop102:9200").build();

//3.设置ES连接地址

factory.setHttpClientConfig(httpClientConfig);

//4.获取ES客户端连接

JestClient jestClient = factory.getObject();

//5.构建ES插入数据对象

Index index = new Index.Builder("{\n" +

" \"name\":\"zhangsan\",\n" +

"}").index("test5").type("_doc").id("2").build();

//6.执行插入数据操作

jestClient.execute(index);

//7.关闭连接

jestClient.shutdownClient();

3.5.2 读数据

//1.创建ES客户端连接池

JestClientFactory factory = new JestClientFactory();

//2.创建ES客户端连接地址

HttpClientConfig httpClientConfig = new HttpClientConfig.Builder("http://hadoop102:9200").build();

//3.设置ES连接地址

factory.setHttpClientConfig(httpClientConfig);

//4.获取ES客户端连接

JestClient jestClient = factory.getObject();

//5.构建查询数据对象

Search search = new Search.Builder("{\n" +

" \"query\": {\n" +

" \"match\": {\n" +

" \"name\": \"zhangsan\"\n" +

" }\n" +

" }\n" +

"}").addIndex("test5").addType("_doc").build();

//6.执行查询操作

SearchResult searchResult = jestClient.execute(search);

//7.解析查询结果

System.out.println(searchResult.getTotal());

List> hits = searchResult.getHits(Map.class);

for (SearchResult.Hit hit : hits) {

System.out.println(hit.index + "--" + hit.id);

}

//8.关闭连接

jestClient.shutdownClient();

第4章Logstash

4.1 logstash架构

搜集---》过滤---》处理

尚硅谷大数据技术之ELK

第1章 Elasticsearch 概述 1.1 什么是搜索? 百度:我们比如说想找寻任何的信息的时候,就会上百度去搜索一下,比如说找一部自己喜欢的电影,或者说找一本喜欢的书,或者找一条感兴趣的新闻(提到搜索的第一印象)。百度 != 搜索 1)互联网的搜索:电商网站,招聘网站,新闻网站,各种app 2)IT 系统的搜索:OA 软件,办公自动化软件,会议管理,日程管理,项目管理。 搜索,就是在任何场景下,找寻你想要的信息,这个时候,会输入一段你要搜索的关键字,然后就期望找到这个关键字相关的有些信息 1.2 如果用数据库做搜索会怎么样? 如果用数据库做搜索会怎么样? select * from products where product_name list “%牙膏%”商品id 商品名称商品描述 1高露洁牙膏2中华牙膏3 佳洁士牙膏4其他牙膏京东商城搜索框 5 …… 1万条 京东商城后台商品表逐条遍历 1)比如说“商品描述”字段的长度,有长达数千个,甚至数万个字符, 这个时候,每次都要对每条记录的所有文本进行扫描,判断包不包含我指定的这个关键词(比如说“牙膏”),效率非常低。 select * from products where product_name list “%生化机%” 2)还不能将搜索词拆分开来,尽可能去搜索更多的符合你的期望的结果, 比如输入“生化机”,就搜索不出来“生化危机”。 用数据库来实现搜索,是不太靠 谱的。通常来说,性能会很差的。 1千字的商品描述 1千字的商品描述 用数据库来实现搜索,是不太靠谱的。通常来说,性能会很差的。 1.3 什么是全文检索和Lucene ? 1)全文检索,倒排索引 全文检索是指计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。全文搜索搜索引擎数据库中的数据。

01_尚硅谷大数据之HBase简介

第1章HBase简介 1.1 什么是HBase HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE 技术可在廉价PC Server上搭建起大规模结构化存储集群。 HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。 HBASE是Google Bigtable的开源实现,但是也有很多不同之处。比如:Google Bigtable 利用GFS作为其文件存储系统,HBASE利用Hadoop HDFS作为其文件存储系统;Google 运行MAPREDUCE来处理Bigtable中的海量数据,HBASE同样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby作为协同服务,HBASE利用Zookeeper作为对应。 1.2 HBase中的角色 1.2.1 HMaster 功能: 1) 监控RegionServer 2) 处理RegionServer故障转移 3) 处理元数据的变更 4) 处理region的分配或移除 5) 在空闲时间进行数据的负载均衡 6) 通过Zookeeper发布自己的位置给客户端 1.2.2 RegionServer 功能: 1) 负责存储HBase的实际数据 2) 处理分配给它的Region 3) 刷新缓存到HDFS 4) 维护HLog 5) 执行压缩

6) 负责处理Region分片 1.2.3 其他组件: 1) Write-Ahead logs HBase的修改记录,当对HBase读写数据的时候,数据不是直接写进磁盘,它会在内存中保留一段时间(时间以及数据量阈值可以设定)。但把数据保存在内存中可能有更高的概率引起数据丢失,为了解决这个问题,数据会先写在一个叫做Write-Ahead logfile的文件中,然后再写入内存中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。 2) HFile 这是在磁盘上保存原始数据的实际的物理文件,是实际的存储文件。 3) Store HFile存储在Store中,一个Store对应HBase表中的一个列族。 4) MemStore 顾名思义,就是内存存储,位于内存中,用来保存当前的数据操作,所以当数据保存在WAL 中之后,RegsionServer会在内存中存储键值对。 5) Region Hbase表的分片,HBase表会根据RowKey值被切分成不同的region存储在RegionServer中,在一个RegionServer中可以有多个不同的region。 1.3 HBase架构

01_尚硅谷大数据之Hive基本概念

第1章Hive基本概念 1.1 什么是Hive Hive:由Facebook开源用于解决海量结构化日志的数据统计。 Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能。 本质是:将HQL转化成MapReduce程序 1)Hive处理的数据存储在HDFS 2)Hive分析数据底层的实现是MapReduce 3)执行程序运行在Yarn上 1.2 Hive的优缺点 1.2.1 优点 1)操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手) 2)避免了去写MapReduce,减少开发人员的学习成本。 3)Hive的执行延迟比较高,因此Hive常用于数据分析,对实时性要求不高的场合; 4)Hive优势在于处理大数据,对于处理小数据没有优势,因为Hive的执行延迟比较高。5)Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。 1.2.2 缺点 1)Hive的HQL表达能力有限 (1)迭代式算法无法表达 (2)数据挖掘方面不擅长 2)Hive的效率比较低 (1)Hive自动生成的MapReduce作业,通常情况下不够智能化 (2)Hive调优比较困难,粒度较粗

1.3 Hive架构原理 HDFS MapReduce Meta store SQL Parser 解析器 Physical Plan 编译器Execution 执行器 Query Optimizer 优化器 Driver CLI JDBC Client Hive 架构 如图中所示,Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。 1)用户接口:Client CLI(hive shell)、JDBC/ODBC(java访问hive)、WEBUI(浏览器访问hive) 2)元数据:Metastore 元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等; 默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore 3)Hadoop 使用HDFS进行存储,使用MapReduce进行计算。 4)驱动器:Driver (1)解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。 (2)编译器(Physical Plan):将AST编译生成逻辑执行计划。

尚硅谷大数据技术之数仓开发规范

1.背景 为了避免底层业务变动对上层需求影响过大,屏蔽底层复杂的业务逻辑,尽可能简单、完整的在接口层呈现业务数据,建设高内聚松耦合的数据组织,使数据从业务角度可分割,显得尤为重要。从整个集团业务条线出发,形成数据仓库总体概念框架,并对整个系统所需要的功能模块进行划分,明确各模块技术细节,建设一套完整的开发规范。 2.分层规范 ODS(原始数据层):ODS层是数据仓库准备区,为DWD层提供基础原始数据。 DWD(明细数据层):和ODS粒度一致的明细数据,对数据进行去重,脏数据过滤,空处理,保证数据质量。 DWS(服务数据层):轻度汇总数据及建宽表(按主题)存放数据。 ADS(应用数据层):存放应用类表数据。 3.表规范 3.1 命名 维表命名形式:dim_描述 事实表命名形式:fact_描述_[AB] 临时表命名形式:tmp_ 正式表名_ [C自定义序号] 宽表命名形式:dws_主题_描述_[AB] 备份表命名形式:正式表名_bak_yyyymmdd 表命名解释: 1)表名使用英文小写字母,单词之间用下划线分开,长度不超过40个字符,命名一般控制在小于等于6级。 2)其中ABC第一位"A"时间粒度:使用"c"代表当前数据,"h"代表小时数据,"d"代表天

数据,"w"代表周数据,"m"代表月数据,"q"代表季度数据, "y"代表年数据。 3)其中ABC的第二位"B"表示对象属性,用"t"表示表,用"v"表示视图。 4)其中ABC的第三位"C"自定义序号用于标识多个临时表的跑数顺序。 3.2 注释 注释要结合表的英文名,要求注释简洁明了,体现出表的业务出处、主题和用途。3.3 存储格式 所谓的存储格式就是在Hive建表的时候指定的将表中的数据按照什么样子的存储方式,如果指定了方式,那么在向表中插入数据的时候,将会使用该方式向HDFS中添加相应的数据类型。在数仓中建表默认用的都是PARQUET存储格式,相关语句如下所示:STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ 3.5 字符集 Hadoop和hive 都是用utf-8编码的,在建表时可能涉及到中文乱码问题,所以导入的文件的字符编码统一为utf-8格式。 3.6 约定 理论上在数仓落地的表不应该出现null未知类型,对于可能出现null的字段,如果为字符型统一为空字符串,如果是数值则给0。 4.字段规范 4.1 命名

尚硅谷大数据项目之实时项目2(日活需求)

第1章实时处理模块 1.1 模块搭建 添加scala框架 1.2 代码思路 1)消费kafka中的数据; 2)利用redis过滤当日已经计入的日活设备; 3)把每批次新增的当日日活信息保存到HBASE或ES中;

4)从ES中查询出数据,发布成数据接口,通可视化化工程调用。 1.3 代码开发1 ---消费Kafka 1.3.1 配置 1)config.properties # Kafka配置 kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092 # Redis配置 redis.host=hadoop102 rdis.port=6379 2)pom.xml com.atguigu.gmall2019.dw dw-common 1.0-SNAPSHOT org.apache.spark spark-core_2.11 org.apache.spark spark-streaming_2.11 org.apache.kafka kafka-clients 0.11.0.2 org.apache.spark spark-streaming-kafka-0-8_2.11 redis.clients jedis 2.9.0 io.searchbox

大数据技术之MySQL高级

尚硅谷大数据技术之MySQL高级 版本V3.0 第1章MySQL简介 1.1 什么是MySQL (1)MySQL是一个关系型数据库管理系统,由瑞典MySQL AB公司开发,目前属于Oracle 公司。 (2)Mysql是开源的,可以定制的,采用了GPL协议,你可以修改源码来开发自己的Mysql系统。 (3)MySQL使用标准的SQL数据语言形式。 (4)MySQL可以允许于多个系统上,并且支持多种语言。这些编程语言包括C、C++、Python、Java、Perl、PHP、Eiffel、Ruby和Tcl等。 (5)MySQL支持大型数据库,支持5000万条记录的数据仓库,32位系统表文件最大可支持4GB,64位系统支持最大的表文件为8TB。 1.2 在Linux上安装MySQL 1.2.1 检查当前系统是否安装过MySQL CentOS 6命令:rpm -qa|grep mysql 默认Linux在安装的时候,自带了mysql相关的组件。 先卸载系统自带的mysql,执行卸载命令rpm -e --nodeps mysql-libs 1

CentOS 7命令:rpm -qa|grep mariadb 不检查依赖卸载 检查/tmp目录的权限是否是满的 1.2.2 Mysql的安装 安装的版本是mysql 5.7,官网下载地址:https://www.sodocs.net/doc/0412751243.html,/downloads/mysql/ 1)通过Xft5文件传输工具将rpm安装包传输到opt目录下 2)执行rpm安装,必须按照下面的顺序安装 1)rpm -ivh mysql-community-common-5.7.16-1.el7.x86_64.rpm 2)rpm -ivh mysql-community-libs-5.7.16-1.el7.x86_64.rpm 3)rpm -ivh mysql-community-client-5.7.16-1.el7.x86_64.rpm 4)rpm -ivh mysql-community-server-5.7.16-1.el7.x86_64.rpm 2

07_尚硅谷大数据之HBase优化

第7章HBase优化 7.1 高可用 在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。 1)关闭HBase集群(如果没有开启则跳过此步) [atguigu@hadoop102 hbase]$ bin/stop-hbase.sh 2)在conf目录下创建backup-masters文件 [atguigu@hadoop102 hbase]$ touch conf/backup-masters 3)在backup-masters文件中配置高可用HMaster节点 [atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters 4)将整个conf目录scp到其他节点 [atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/modules/cdh/hbase-0.98.6-cdh5.3.6/ [atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/modules/cdh/hbase-0.98.6-cdh5.3.6/ 5)打开页面测试查看 0.98版本之前:http://hadooo102:60010 0.98版本及之后:http://hadooo102:16010 7.2 预分区 每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey 范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据索要投放的分区提前大致的规划好,以提高HBase性能。 1)手动设定预分区 hbase> create 'staff','info','partition1',SPLITS => ['1000','2000','3000','4000'] 2)生成16进制序列预分区 create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} 3)按照文件中设置的规则预分区 创建splits.txt文件内容如下: create 'staff3','partition3',SPLITS_FILE => 'splits.txt' 4)使用JavaAPI创建预分区

02_尚硅谷大数据之Hive安装

第2章Hive安装 2.1 Hive安装地址 1)Hive官网地址: https://www.sodocs.net/doc/0412751243.html,/ 2)文档查看地址: https://https://www.sodocs.net/doc/0412751243.html,/confluence/display/Hive/GettingStarted 3)下载地址: https://www.sodocs.net/doc/0412751243.html,/dist/hive/ 4)github地址: https://https://www.sodocs.net/doc/0412751243.html,/apache/hive 2.2 Hive安装部署 1)Hive安装及配置 (1)把apache-hive-1.2.1-bin.tar.gz上传到linux的/opt/software目录下 (2)解压apache-hive-1.2.1-bin.tar.gz到/opt/module/目录下面 [atguigu@hadoop102 software]$ tar -zxvf apache-hive-1.2.1-bin.tar.gz -C /opt/module/ (3)修改apache-hive-1.2.1-bin.tar.gz的名称为hive [atguigu@hadoop102 module]$ mv apache-hive-1.2.1-bin/ hive (4)修改/opt/module/hive/conf目录下的hive-env.sh.template名称为hive-env.sh [atguigu@hadoop102 conf]$ mv hive-env.sh.template hive-env.sh (5)配置hive-env.sh文件 (a)配置HADOOP_HOME路径 export HADOOP_HOME=/opt/module/hadoop-2.7.2 (b)配置HIVE_CONF_DIR路径 export HIVE_CONF_DIR=/opt/module/hive/conf 2)Hadoop集群配置 (1)必须启动hdfs和yarn [atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh

13_尚硅谷大数据之常见错误及解决方案

第13章常见错误及解决方案 1)SecureCRT 7.3出现乱码或者删除不掉数据,免安装版的SecureCRT 卸载或者用虚拟机直接操作或者换安装版的SecureCRT 2)连接不上mysql数据库 (1)导错驱动包,应该把mysql-connector-java-5.1.27-bin.jar导入/opt/module/hive/lib的不是这个包。错把mysql-connector-java-5.1.27.tar.gz导入hive/lib包下。 (2)修改user表中的主机名称没有都修改为%,而是修改为localhost 3)hive默认的输入格式处理是CombineHiveInputFormat,会对小文件进行合并。 hive (default)> set hive.input.format; hive.input.format=https://www.sodocs.net/doc/0412751243.html,bineHiveInputFormat 可以采用HiveInputFormat就会根据分区数输出相应的文件。 hive (default)> set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 4)不能执行mapreduce程序 可能是hadoop的yarn没开启。 5)启动mysql服务时,报MySQL server PID file could not be found! 异常。 在/var/lock/subsys/mysql路径下创建hadoop102.pid,并在文件中添加内容:4396 6)报service mysql status MySQL is not running, but lock file (/var/lock/subsys/mysql[失败])异常。 解决方案:在/var/lib/mysql 目录下创建:-rw-rw----. 1 mysql mysql 5 12月22 16:41 hadoop102.pid 文件,并修改权限为777。 附录:Sqoop常用命令及参数手册 这里给大家列出来了一部分Sqoop操作时的常用参数,以供参考,需要深入学习的可以参看对应类的源代码。

7_尚硅谷大数据之HDFS概述

一HDFS概述 1.1 HDFS产生背景 随着数据量越来越大,在一个操作系统管辖的范围内存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种。 1.2 HDFS概念 HDFS,它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。 HDFS的设计适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做数据分析,并不适合用来做网盘应用。 1.3 HDFS优缺点 1.3.1 优点 1)高容错性 (1)数据自动保存多个副本。它通过增加副本的形式,提高容错性; (2)某一个副本丢失以后,它可以自动恢复。 2)适合大数据处理 (1)数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据; (2)文件规模:能够处理百万规模以上的文件数量,数量相当之大。 3)流式数据访问,它能保证数据的一致性。 4)可构建在廉价机器上,通过多副本机制,提高可靠性。 1.3.2 缺点 1)不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。 2)无法高效的对大量小文件进行存储。 (1)存储大量小文件的话,它会占用NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的; (2)小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。 3)并发写入、文件随机修改。

(1)一个文件只能有一个写,不允许多个线程同时写; (2)仅支持数据append(追加),不支持文件的随机修改。 1.4 HDFS组成架构 HDFS的架构图 这种架构主要由四个部分组成,分别为HDFS Client、NameNode、DataNode和Secondary NameNode。下面我们分别介绍这四个组成部分。 1)Client:就是客户端。 (1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储; (2)与NameNode交互,获取文件的位置信息; (3)与DataNode交互,读取或者写入数据; (4)Client提供一些命令来管理HDFS,比如启动或者关闭HDFS; (5)Client可以通过一些命令来访问HDFS; 2)NameNode:就是Master,它是一个主管、管理者。 (1)管理HDFS的名称空间; (2)管理数据块(Block)映射信息; (3)配置副本策略; (4)处理客户端读写请求。 3) DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。 (1)存储实际的数据块;

高清现场:尚硅谷区块链、AI技术沙龙人气爆棚!

下一届技术沙龙即将举办 我们在北京相聚 敬请期待! 2018年4月1日,来自各企业的IT技术人员、编程爱好者、教育从业者等百余位专业技术人士,齐聚深圳西部硅谷大厦,参加了由尚硅谷教育举办的“渔人节”IT技术交流研讨会。尚硅谷教育知名讲师韩顺平、宋红康、封捷等做了主题演讲,尚硅谷教育创始人佟刚老师(学员昵称:刚哥)出席了研讨会。 签到处排成了长队

座无虚席的沙龙现场,不得不临时加座 此次沙龙,是深圳少有的IT互联网专业技术沙龙。原本规划30人的规模,因报名太多,不得不连夜紧急寻找更大的场地,以尽可能多地满足大家的参会需求。沙龙的议题从程序员创业,到AI(人工智能)、区块链,专属IT从业者的聚会,专业性和活跃感并存。 各位嘉宾具体都分享了什么内容呢?一起来看看吧。 一 刚刚录制了Linux教程的韩顺平老师做了开场演讲。

韩老师分享了自己的创业历程和心得,以及加入尚硅谷的前因后果(详见:创业不易,尚硅谷教育延续初心)。韩老师说,从事一个行业,经验累积非常重要,所有的经历都是收获;软件工程师作为专业技术人员,更应该选对方向,努力不懈。 二 科技发展瞬息万变,每个时代都有每个时代的领军人物和代表性的技术方向,人工智 能正是当今时代的主角之一。 人类制造人工智能,目的是为了要造出机器人,和人差不多吗?或者换句话说,人类 今天能够站在食物链的顶端,靠的是什么呢?在引人深思的发问中,宋红康老师的《无学习,不AI》演讲开始了。 宋老师对人工智能的发展历程、深度学习崛起的时代背景和应用场景,以及常见的机 器学习算法、深度学习网络结构的技术趋势及应用,进行了生动幽默的全景式展现。 最后,以是否出现不可避免的人机大战收尾,引发全场思考。

05_尚硅谷大数据之Kafka producer拦截器(interceptor)

第5章Kafka producer拦截器(interceptor) 5.1 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。 对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括: (1)configure(configs) 获取配置信息和初始化数据时调用。 (2)onSend(ProducerRecord): 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算 (3)onAcknowledgement(RecordMetadata, Exception): 该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率 (4)close: 关闭interceptor,主要用于执行一些资源清理工作 如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。 5.2 拦截器案例 1)需求: 实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或

尚硅谷大数据技术之Phoenix

第1章Phoenix简介 1.1Phoenix定义 Phoenix是HBase的开源SQL皮肤。可以使用标准JDBC API代替HBase客户端API来创建表,插入数据和查询HBase数据。 1.2 Phoenix特点 1) 容易集成:如Spark,Hive,Pig,Flume和Map Reduce。 2) 操作简单:DML命令以及通过DDL命令创建和操作表和版本化增量更改。 3) 完美支持Hbase二级索引创建。 1.3 Phoenix架构 Phoenix架构

1.4 Phoenix的作用 第2章Phoenix快速入门 2.1 Phoenix安装部署 2.1.1 官方网址: 2.1.2 上传jar包到/opt/software/ 2.1.3 复制server和client这俩个包拷贝到各个节点的hbase/lib 在phoenix目录下 向每个节点发送server jar

向每个节点发送client jar 2.1.4 在root权限下给/etc/profile 下添加如下内容 2.1.5 启动Zookeeper,Hadoop,Hbase 2.1.6 启动Phoenix 2.2 phoenix表操作 2.2.1 显示所有表

2.2.2 创建表 如下显示: 在phoenix中,默认情况下,表名等会自动转换为大写,若要小写,使用双引号,如"us_population"。 2.2.3 插入记录 2.2.4 查询记录 2.2.5 删除记录 2.2.6 删除表 2.2.7 退出命令行 2.3 phoenix表映射 2.3.1 Phoenix和Hbase表的关系 默认情况下,直接在hbase中创建的表,通过phoenix是查看不到的。如图1和图2,US_POPULATION是在phoenix中直接创建的,而kylin相关表是在hbase中直接创建的,在phoenix中是查看不到kylin等表的。

大数据技术之Atlas(5元数据管理)

尚硅谷大数据技术之Atlas(元数据管理) 第1章Atlas入门 1.1 Atlas概述 Apache Atlas为组织提供开放式元数据管理和治理功能,用以构建其数据资产目录,对这些资产进行分类和管理,并为数据分析师和数据治理团队,提供围绕这些数据资产的协作功能。 1)表与表之间的血缘依赖 2)字段与字段之间的血缘依赖 1

2 1.2 Atlas 架构原理 Atlas 架构原理 类型系统(Type System): 用户为他们想要管理的元数据对象定义模型。Type System 称为“实体”的“类型”实例,表示受管理的实际元数据对象。 图形引擎(Graph Engine): Atlas 在内部使用Graph 模型持久保存它管理的元数据对象。 采集/导出(Ingest/Export):采集组件允许将元数据添加到Atlas 。同样,“导出”组件将Atlas 检测到的元数据导出。API: Atlas 的所有功能都通过REST API 向最终用户暴露,该API 允许创建,更新和删除类型和实体。它也是查询和发现Atlas 管理的类型和实体的主要机制。 Messaging: 除了API 之外,用户还可以选择使用基于Kafka 的消息传递接口与Atlas 集成。 Metadata Sources :目前,Atlas 支持从以下来源提取和管理元数据:HBase 、Hive 、Sqoop 、Storm 、Kafka Admin UI: 该组件是一个基于Web 的应用程序,允许数据管理员和科学家发现和注释元数据。这里最重要的是搜索界面和类似SQL 的查询语言,可用于查询Atlas 管理的元数据类型和对象。 Ranger Tag Based Policies :权限管理模块Business Taxonomy :业务分类 Metadata Store:采用HBase 来存储元数据Index Store:采用Solr 来建索引 第2章 Atlas 安装及使用 1) Atlas 官网地址:s:https://www.sodocs.net/doc/0412751243.html,/ 2)文档查看地址:s:https://www.sodocs.net/doc/0412751243.html,/0.8.4/index.html 3)下载地址:s:https://www.sodocs.net/doc/0412751243.html,/dyn/closer.cgi/atlas/0.8.4/apache-atlas-0.8.4-sources.tar.gz

尚硅谷大数据项目之实时项目5(灵活分析需求)

1.1 灵活查询的场景 数仓中存储了大量的明细数据,但是hadoop存储的数仓计算必须经过mr ,所以即时交互性非常糟糕。为了方便数据分析人员查看信息,数据平台需要提供一个能够根据文字及选项等条件,进行灵活分析判断的数据功能。 2.2 需求详细 输入参数 返回结果

2.1 T+1 模式 2.1.1 实现步骤 1)利用sqoop等工具,从业务数据库中批量抽取数据; 2)利用数仓作业,在dws层组织宽表(用户购买行为); 3)开发spark的批处理任务,把dws层的宽表导入到ES中; 4)从ES读取数据发布接口,对接可视化模块。 2.1.2 特点 优点:可以利用在离线作业处理好的dws层宽表,直接导出一份到ES进行快速交互的分析。缺点:因为要用离线处理的后的结果在放入ES,所以时效性等同于离线数据。

2.2 T+0 模式 2.2.1 实现步骤 1)利用canal抓取对应的数据表的实时新增变化数据,推送到Kafka; 2)在spark-streaming中进行转换,过滤,关联组合成宽表的结构; 3)保存到ES中; 4)从ES读取数据发布接口,对接可视化模块。 2.2.2 特点 优点:实时产生数据,时效性非常高。 缺点:因为从kafka中得到的是原始数据,所以要利用spark-streaming要进行加工处理,相对来说要比批处理方式麻烦,比如join操作。 第3章实时采集数据 3.1 在canal 模块中增加要追踪的表 代码 public class CanalHandler { private List rowDatasList; String tableName; CanalEntry.EventType eventType;

尚硅谷大数据项目之实时项目4(预警需求)

第1章需求分析 1.1 简介 实时预警,是一种经常出现在实时计算中的业务类型。根据日志数据中系统报错异常,或者用户行为异常的检测,产生对应预警日志。预警日志通过图形化界面的展示,可以提醒监控方,需要及时核查问题,并采取应对措施。 1.2 需求说明 需求:同一设备,5分钟内三次及以上用不同账号登录并领取优惠劵,并且在登录到领劵过程中没有浏览商品。达到以上要求则产生一条预警日志。 同一设备,每分钟只记录一次预警。 1.3 预警日志格式

第2章整体流程设计 2.1 框架流程 2.2 开发思路 1)从kafka中消费数据,根据条件进行过滤筛选,生成预警日志;2)预警日志保存到ElasticSearch中; 3)利用Kibana快速搭建可视化图形界面。 第3章实时计算模块 3.1 筛选条件分析 同一设备(分组) 5分钟内(窗口) 三次不同账号登录(用户) 领取优惠券(行为) 没有浏览商品(行为)

同一设备每分钟只记录一次预警(去重)3.2 数据处理流程图 3.3 代码开发 3.3.1 事件日志样例类– EventInfo case class EventInfo(mid:String, uid:String, appid:String, area:String, os:String, ch:String, `type`:String, evid:String, pgid:String, npgid:String, itemid:String, var logDate:String, var logHour:String, var ts:Long)

3.3.2 预警日志样例类–CouponAlertInfo case class CouponAlertInfo(mid:String, uids:java.util.HashSet[String], itemIds:java.util.HashSet[String], events:java.util.List[String], ts:Long) 3.3.3 预警业务类– AlertApp import com.alibaba.fastjson.JSON import com.atguigu.gmall.constant.GmallConstants import com.atguigu.gmall2019.realtime.bean.{CouponAlertInfo, EventInfo} import com.atguigu.gmall2019.realtime.util.{MyEsUtil, MyKafkaUtil} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.util.control.Breaks._ object AlertApp { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("event_app") val ssc = new StreamingContext(sparkConf,Seconds(5)) val inputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_EVENT,ssc) //1 格式转换成样例类 val eventInfoDstream: DStream[EventInfo] = inputDstream.map { record => val jsonstr: String = record.value() val eventInfo: EventInfo = JSON.parseObject(jsonstr, classOf[EventInfo]) eventInfo } //2 开窗口 val eventInfoWindowDstream: DStream[EventInfo] = eventInfoDstream.window(Seconds(30),Seconds(5)) //3同一设备分组 val groupbyMidDstream: DStream[(String, Iterable[EventInfo])] = eventInfoWindowDstream.map(eventInfo=>(eventInfo.mid,eventInfo)). groupByKey() //4 判断预警 // 在一个设备之内 // 1 三次及以上的领取优惠券 (evid coupon) 且 uid都不相同 // 2 没有浏览商品(evid clickItem) val checkCouponAlertDStream: DStream[(Boolean, CouponAlertInfo)] = groupbyMidDstream.map { case (mid, eventInfoItr) =>

04_尚硅谷大数据之DDL数据定义

第4章DDL数据定义 4.1 创建数据库 1)创建一个数据库,数据库在HDFS上的默认存储路径是/user/hive/warehouse/*.db。 hive (default)> create database db_hive; 2)避免要创建的数据库已经存在错误,增加if not exists判断。(标准写法) 3)创建一个数据库,指定数据库在HDFS上存放的位置 hive (default)> create database db_hive2 location '/db_hive2.db'; 4.2 修改数据库 用户可以使用ALTER DATABASE命令为某个数据库的DBPROPERTIES设置键-值对属性值,来描述这个数据库的属性信息。数据库的其他元数据信息都是不可更改的,包括数据库名和数据库所在的目录位置。 hive (default)> alter database db_hive set dbproperties('createtime'='20170830'); 在mysql中查看修改结果 hive> desc database extended db_hive; db_name comment location owner_name owner_type parameters db_hive hdfs://hadoop102:8020/user/hive/warehouse/db_hive.db atguigu USER {createtime=20170830} 4.3 查询数据库 4.3.1 显示数据库 1)显示数据库

hive> show databases; 2)过滤显示查询的数据库 hive> show databases like 'db_hive*'; OK db_hive db_hive_1 4.3.2 查看数据库详情 1)显示数据库信息 hive> desc database db_hive; OK db_hive hdfs://hadoop102:8020/user/hive/warehouse/db_hive.db atguiguUSER 2)显示数据库详细信息,extended hive> desc database extended db_hive; OK db_hive hdfs://hadoop102:8020/user/hive/warehouse/db_hive.db atguiguUSER 4.3.3 切换当前数据库 hive (default)> use db_hive; 4.4 删除数据库 1)删除空数据库 hive>drop database db_hive2; 2)如果删除的数据库不存在,最好采用if exists判断数据库是否存在 hive> drop database if exists db_hive2; 3)如果数据库不为空,可以采用cascade命令,强制删除

相关主题