如何利用Storm完成实时分析处理数据

存储技术

609人已加入

描述

最近利用闲暇时间,又重新研读了一下Storm。认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算。对于Hadoop,本身不擅长实时的数据分析处理。两者的共同点都是分布式架构,而且都类似有主/从关系的概念。

本文我不会具体阐述Storm集群和Zookeeper集群如何部署的问题,这里想通过一个实际的案例切入,分析一下如何利用Storm完成实时分析处理数据。

Storm本身是Apache托管的开源的分布式实时计算系统,它的前身是Twitter Storm。在Storm问世以前,处理海量的实时数据信息,大部分是类似于使用消息队列,加上工作进程/线程的方式。这使得构建这类的应用程序,变得异常的复杂。很多的业务逻辑中,你不得不考虑消息的发送和接收,线程之间的并发控制等等问题。而其中的业务逻辑可能只是占据整个应用的一小部分,而且很难做到业务逻辑的解耦。但是Storm的出现改变了这种局面,它首先抽象出数据流Stream的抽象概念,一个Stream指的是tuples组成的无边界的序列。后面又继续提出Spouts、Bolts的概念。Spouts在Storm里面是数据源,专门负责生成流。而Bolts则是以流作为输入,并重新生成流作为输出,并且Bolts还会继续指定它输入的流应该如何划分。最后Storm是通过拓扑(Topology)这种抽象概念,组织起若干个Spouts、Bolts构成的分布式数据处理网络。Storm设计的时候,就有意的把Spouts、Bolts组成的拓扑(Topology)网络通过Thrift服务方式进行封装,这个做法,使得Storm的Spouts、Bolts组件可以通过目前主流的任意语言实现,使得整个框架的兼容性和扩展性更加优秀。

在Storm里面拓扑(Topology)的概念,非常类似Hadoop里面MapReduce的Job的概念。不同的是Storm的拓扑(Topology)只要你启动了,它就会一直运行下去,除非你kill掉;而MapReduce的Job最终它是会结束的。基于这样的模式,使得Storm非常适合处理实时性的数据分析、持续计算、DRPC(分布式RPC)等。

下面就结合实际的案例,设计分析一下,如何利用Storm改善应用的处理性能。

某通信公司的垃圾短信监控平台,实时地上传每个省的疑似垃圾短信用户的垃圾短信内容文件,每个省则根据文件中垃圾短信的内容,解析过滤出,包含指定敏感关键字的垃圾短信进行入库。被入库的垃圾短信用户被列为敏感用户,是重点监控对象,毕竟乱发这些垃圾短信是非常不对的。垃圾短信监控平台生成的文件速度非常惊人,原来的传统做法是,根据每个省的每一个地市,对应一个独立应用,串行化地解析、过滤敏感关键字,来进行入库处理。但是,从现状来看,程序处理的性能并不高效,常常造成文件积压,没有及时处理入库。

现在,我们就通过Storm来重新梳理、组织一下上述的应用场景。

首先,我先说明一下,该案例中Storm集群和Zookeeper集群的部署情况,如下图所示:

zookeeper

Nimbus对应的主机是192.168.95.134是Storm主节点,其余两台从节点Supervisor对应的主机分别是192.168.95.135(主机名:slave1)、192.168.95.136(主机名:slave2)。同样的,Zookeeper集群也是部署在上述节点上。

Storm集群和Zookeeper集群会互相通信,因为Storm就是基于Zookeeper的。然后先启动每个节点的Zookeeper服务,其次分别启动Storm的Nimbus、Supervisor服务。具体可以到Storm安装的bin目录下面启动服务,启动命令分别为storm nimbus 》 /dev/null 2 》 &1 &和storm supervisor 》 /dev/null 2 》 &1 &。然后用jps观察启动的效果。没有问题的话,在Nimbus服务对应的主机上启动Storm UI监控对应的服务,在Storm安装目录的bin目录输入命令:storm ui 》/dev/null 2》&1 &。然后打开浏览器输入:http://{Nimbus服务对应的主机ip}:8080,这里就是输入:http://192.168.95.134:8080/。观察Storm集群的部署情况,如下图所示:

zookeeper

可以发现,我们的Storm的版本是0.9.5,它的从节点(Supervisor)有2个,分别是slave1、slave2。一共的woker的数量是8个(Total slots)。Storm集群我们已经部署完毕,也启动成功了。现在就利用Storm的方式,重新改写一下这种敏感信息实时监控过滤的应用。先看下Storm方式的拓扑结构图:

zookeeper

其中的SensitiveFileReader-591、SensitiveFileReader-592(用户短信采集器,分地市)代表的是Storm中的Spouts组件,表示一个数据的源头,这里是表示从服务器的指定目录下,读取疑似垃圾短信用户的垃圾短信内容文件。当然Spouts的组件你可以根据实际的需求,扩展出许多Spouts。

然后读取出文件中每一行的内容之后,就是分析文件的内容组件了,这里是指:SensitiveFileAnalyzer(监控短信内容拆解分析),它负责分析出文件的格式内容。

为了简单演示起见,我这里定义文件的格式为如下内容(随便写一个例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每个列之间用&进行连接。其中home_city=591表示疑似垃圾短信的用户归属地市编码,591表示福州、592表示厦门;user_id=5911000表示疑似垃圾短信的用户标识;msisdn=10000表示疑似垃圾短信的用户手机号码;sms_content=abc-slave1代表的就是垃圾短信的内容了。SensitiveFileAnalyzer代表的就是Storm中的Bolt组件,用来处理Spouts“流”出的数据。

最后,就是我们根据解析好的数据,匹配业务规定的敏感关键字,进行过滤入库了。这里我们是把过滤好的数据存入MySQL数据库中。负责这项任务的组件是:SensitiveBatchBolt(敏感信息采集处理),当然它也是Storm中的Bolt组件。好了,以上就是完整的Storm拓扑(Topology)结构了。

现在,我们对于整个敏感信息采集过滤监控的拓扑结构,有了一个整体的了解之后,我们再来看下如何具体编码实现!先来看下整个工程的代码层次结构,它如下图所示:

zookeeper

首先来看下,我们定义的敏感用户的数据结构RubbishUsers,假设我们要过滤的敏感用户的短信内容中,要包含“racketeer”、“Bad”等敏感关键字。具体代码如下:

zookeeper

现在,我们看下敏感信息数据源组件SensitiveFileReader的具体实现,它负责从服务器的指定目录下面,读取疑似垃圾短信用户的垃圾短信内容文件,然后把每一行的数据,发送给下一个处理的Bolt(SensitiveFileAnalyzer),每个文件全部发送结束之后,在当前目录中,把原文件重命名成后缀bak的文件(当然,你可以重新建立一个备份目录,专门用来存储这种处理结束的文件),SensitiveFileReader的具体实现如下:

zookeeper

监控短信内容拆解分析器SensitiveFileAnalyzer,这个Bolt组件,接收到数据源SensitiveFileReader的数据之后,就按照上面定义的格式,对文件中每一行的内容进行解析,然后把解析完毕的内容,继续发送给下一个Bolt组件:SensitiveBatchBolt(敏感信息采集处理)。现在,我们来看下SensitiveFileAnalyzer这个Bolt组件的实现:

zookeeper

最后一个Bolt组件SensitiveBatchBolt(敏感信息采集处理)根据上游Bolt组件SensitiveFileAnalyzer发送过来的数据,然后跟业务规定的敏感关键字进行匹配,如果匹配成功,说明这个用户,就是我们要重点监控的用户,我们把它通过hibernate采集到MySQL数据库,统一管理。最后要说明的是,SensitiveBatchBolt组件还实现了一个监控的功能,就是定期打印出,我们已经采集到的敏感信息用户数据。现在给出SensitiveBatchBolt的实现:

zookeeper

由于是通过hibernate入库到MySQL,所以给出hibernate配置,首先是:hibernate.cfg.xml

zookeeper

对应的ORM映射配置文件rubbish-users.hbm.xml内容如下:

zookeeper

最后,还是通过Spring把hibernate集成起来,数据库连接池用的是:DBCP。对应的Spring配置文件jdbc-hibernate-bean.xml的内容如下:

zookeeper

到此为止,我们已经完成了敏感信息实时监控的所有的Storm组件的开发。现在,我们来完成Storm的拓扑(Topology),由于拓扑(Topology)又分为本地拓扑和分布式拓扑,因此封装了一个工具类StormRunner(拓扑执行器),对应的代码如下:

zookeeper

好了,现在我们把上面所有的Spouts/Bolts拼接成“拓扑”(Topology)结构,我们这里用的是分布式拓扑,来进行部署运行。具体的SensitiveTopology(敏感用户监控Storm拓扑)代码如下:

zookeeper

到此为止,所有的Storm组件已经开发完毕!现在,我们把上述工程打成jar包,放到Storm集群中运行,具体可以到Nimbus对应的Storm安装目录下面的bin目录,输入:storm jar + {jar路径}。

比如我这里是输入:storm jar /home/tj/install/SensitiveTopology.jar newlandframework.storm.topology.SensitiveTopology,然后,把疑似垃圾短信用户的垃圾短信内容文件放到指定的服务器下面的目录(/home/tj/data/591、/home/tj/data/592),最后打开刚才的Storm UI,观察任务的启动执行情况,这里如下图所示:

zookeeper

可以看到我们刚才提交的拓扑:SensitiveTopology已经成功提交到Storm集群里面了。这个时候,你可以鼠标点击SensitiveTopology,然后会打开如下的一个Spouts/Bolts的监控界面,如下图所示:

zookeeper

我们可以很清楚地看到:Spouts组件(用户短信采集器):SensitiveFileReader591、SensitiveFileReader592的线程数executors、任务提交emitted情况。以及Bolts组件:监控短信内容拆解分析器(SensitiveFileAnalyzer)、敏感信息采集处理(SensitiveBatchBolt)的运行情况,这样监控起来就非常方便。

此外,我们还可以到对应的Supervisor服务器对应的Storm安装目录下面的logs目录,查看一下worker的工作日志,我们来看下敏感信息监控过滤的处理情况,截图如下:

zookeeper

通过SensitiveBatchBolt模块的监控线程,可以看到,我们目前已经采集到了9个敏感信息用户了,再来看下,这些包含敏感关键字的用户有没有入库MySQL成功?

zookeeper

发现入库的结果也是9个,和日志打印的数量上是一致的。而且垃圾短信内容sms_content果然都包含了“racketeer”、“Bad”这些敏感关键字!完全符合我们的预期。而且,以后文件处理量上来了,我们可以通过调整设置Spouts/Bolts的并行度,和Worker的数量进行化解。当然,你还可以通过水平扩展集群的数量来解决这个问题。

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
评论(0)
发评论
zhangtiwei 2019-01-03
0 回复 举报
不错的文章,支持下 收起回复

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分