此文是根据酷狗音乐大数据架构师王劲在【QCON高可用架构群】中的分享内容整理而成,转发请注明出处。
王劲:目前就职酷狗音乐,大数据架构师,负责酷狗大数据技术规划、建设、应用。 11年的IT从业经验,2年分布式应用开发,3年大数据技术实践经验,主要研究方向流式计算、大数据存储计算、分布式存储系统、NoSQL、搜索引擎等。
编辑整理:陈刚@北京智识
本次分享的主要内容包括:什么是大数据,大数据技术架构,大数据技术实现,持续改进四个方面。
大数据平台是一个庞大的系统工程,整个建设周期很长,涉及的生态链很长(包括:数据采集、接入,清洗、存储计算、数据挖掘,可视化等环节,每个环节都当做一个系统建设),风险也很大。
1.什么是大数据
- 所谓“大数据”(big data) 指的是这样一种现象:一个公司日常运营所生成和积累用户行为数据“增长”如此之快, 以至于难以使用现有的数据库管理工具来驾驭,困难存在于数据的获取、存储、搜索、共享、分析和可视化等方面。这些数据量是如此之大,已经不是以我们所熟知的多少G和多少T为单位来衡量,而是以P(1000个T), E(一百万个T)或Z(10亿个T)为计量单位,所以称之为大数据。
(插图1)
- 大数据来源:半个世纪以来,随着计算机技术全面融入社会生活,信息爆炸已经积累到了一个开始引发变革的程度。它不仅使世界充斥着比以往更多的信息,而且其增长速度也在加快。信息爆炸的学科如天文学和基因学,创造出了“大数据”这个概念。如今,这个概念几乎应用到了所有人类智力与发展的领域中。21世纪是数据信息大发展的时代,移动互联、社交网络、电子商务等极大拓展了互联网的边界和应用范围,各种数据正在迅速膨胀并变大。 互联网(社交、搜索、电商)、移动互联网(微博)、物联网(传感器,智慧地球)、车联网、GPS、医学影像、安全监控、金融(银行、股市、保险)、电信(通话、短信)都在疯狂产生着数据。
(插图2)
- 大数据特征:“大量化(Volume)、多样化(Variety)、快速化(Velocity)、价值密度低(Value)”就是“大数据”显著的4V特征,或者说,只有具备这些特点的数据,才是大数据。
(插图3)
- 大数据技术要解决的问题:大数据技术被设计用于在成本可承受的条件下,通过非常快速(velocity)地采集、发现和分析,从大量(volumes)、多类别(variety)的数据中提取价值(value),将是IT领域新一代的技术与架构。
(插图4)
- 大数据的应用前景:从应用方向上看,通过对大数据的储存、挖掘与分析,大数据在营销、企业管理、数据标准化与情报分析等领域大有作为。从应用行业来看,大数据一方面可以应用于客户服务水平提升及营销方式的改进,另一方面可以助力行业内企业降低成本,提升运营效益,同时还能帮助企业进行商业模式的创新及发现新的市场商机。从对整个社会的价值来看,大数据在智慧城市、智慧交通及灾难预警等方面都有巨大的潜在应用价值。专业机构预测,随着互联网技术的高速发展,云计算、物联网应用的日益丰富,大数据未来发展前景将更为广阔。
(插图5)
大数据现在在生活中无处不在了,那作为IT技术人员,酷狗是怎么通过技术来怎么解决大数据的问题呢?
2.酷狗数据中心大数据技术架构
第一代大数据架构:
(插图6)
主要基于Hadoop1.x+hive做离线计算(T+1),缺点:等所有数据到达后才开始计算,集群资源使用率分布不均衡,凌晨1点到中午12点,集群资源最繁忙;中午12点到晚上12点,集群资源属于空闲状态。
在大数据中,数据的时效性越高,数据越有价值(如:实时个性化推荐系统,RTB系统,实时预警系统等),因此,又研发了第二代技术架构。目前数据中心并行运行两套集群(hadoop1.x,hadoop2.6),新业务直接接入新集群,旧集群的业务数据正在迁移到新集群中;新集群的结果与旧集群的结果对比(很快会全部切换到新集群)。
第二代大数据技术架构
从数据处理流程看,分数据源、数据接入、数据清洗、存储计算、数据服务、数据消费等环节,大数据处理流程,如下图:
(插图7)
第二代大数据技术整体架构图如下:
(插图8)
大数据计算分实时计算与离线计算,在整个集群中,奔着能实时计算的,一定走实时计算流处理,通过实时计算流来减轻集群的资源使用率集中现象。
离线计算(批处理):通过spark,spark SQL实现【关于Spark就不详细介绍了,近几年发展最快的大数据处理框架】,整体性能比hive提高5—10倍,hive脚本都在转换为Spark SQL。
实时计算:基于Storm,Drools,Esper。【关于storm的详细内容,这里不做介绍了,想要查看介绍,请关注本公众号,并查看历史消息即可。Drools(详解见:http://www.drools.org/),Esper(详解见:http://blog.csdn.net/luonanqin/article/category/1557469)】
HBase/MySQL:用于实时计算,离线计算结果存储服务。
Redis:用于中间计算结果存储或字典数据等。
ElasticSearch:用于明细数据实时查询及HBase的二级索引存储。
通过新的技术架构实现比原有的数据时效性明显提高,现在能实时提供DAU,PV等数据,整体离线计算的整体时长缩短了50%。
3.酷狗大数据技术实现
下面给大家讲解下,数据采集接入、数据清洗、实时监控系统、明细查询的实现细节。
组件之数据采集
从数据处理流图中,可以知道,数据源分为前端日志,服务端日志,业务数据。下面讲解数据是怎么采集接入的。
a.前端日志采集接入:
前端日志采集要求实时,可靠性,高可用性等特性。技术选型时,对开源的数据采集工具flume,scribe,chukwa测试对比,发现基本满足不了业务场景需求。
(插图9)
详细的对比,大家可以看看(http://www.ttlsa.com/log-system/scribe-chukwa-kafka-flume-log-system-contrast/) 。所以,选择基于kafka,自己开发前置kafka代理网关,来完成数据采集需求。前置代理网关的开发过程中走了一些弯路,最后采用nginx+lua开发,基于lua实现了kafka生产者协议。kafka的生产者协议实现,有兴趣可以去https://github.com/doujiang24/lua-resty-kafka看看,另一同事实现的,现在的github上还比较活跃,提需求的人也有不少。
采集网关的具体实现如下:
[数据可靠性]
采集网关,对数据可靠性,提供以下两点保障:
- 数据由sdk 发往采集网关,网关收到数据即返回成功,由网关确保数据发往kafka。 1)sdk 未收到成功,就会重试(安全:带上上次task id); 2)js,flash 端没有重试逻辑
- 网关与kafka通讯,提供两种不同类别保障: 1)at most once (最多一次发送成功) 2) at least once (至少一次发送成功)
[网关与kafka通讯可靠性保障细节]
前提:由于kafka 本身,在整体设计以及通讯协议上,并不提供强一致性保证(exactly once)[http://kafka.apache.org/documentation.html#semantics] 所以,网关与kafka通讯中,每次发送数据区分为三种状态: 1)kafka写入成功(两份) 2)可以安全重试(kafka肯定没收到) 3)不可安全重试(kakfa可能写入了或者没有)
网关为保障数据的准确性,采用以下策略: 1)可安全重试的,由网关本地缓存,再发送kafka(优先redis,再磁盘) 2)不可安全重试的,将进入指定容错topic
通过以上策略,确保在正常topic 里提供[at most once] 保障; 算上容错topic 提供[at least once] 保障;
不可安全重试的状态包括: 1)网络错误(发送超时,等待响应超时) 2)RequestTimedOut 状态(该错误码,在动态迁移partition 时,数据并未被成功写入;其他则会被成功写入)
有了两层的保障,再配合网关上报的监控(收到的量,成功发送量,本地缓存量),可以 1)从每个topic 的量,监控整个系统的运行情况 2) 对于普通业务场景,只需要正常使用topic 3)如果网络有长时间的抖动,或者kafka出现宕机(都将导致出现较多不可安全重试内容),可能需要特殊处理容错topic内的数据
b.后端日志采集接入:
FileCollect,考虑到很多线上环境的环境变量不能改动,为减少侵入式,目前是采用Go语言实现文件采集。
前端,服务端的数据整体架构如下图:
(插图10)
c.业务数据接入
Canal:利用Canal通过MySQL的binlog机制实时同步业务增量数据。(有关canal的介绍,参考:http://agapple.iteye.com/blog/1796633)
组件之数据清洗(ETL)
上面介绍了,数据采集接入的实现方式,接下来介绍的都是基于Storm框架实现的,先介绍数据清洗(ETL), 此处的ETL只是做简单的数据转义,补全,异常数据处理。
(插图11)
Storm(数据清洗) Kafka Spout 负责消费Kafka数据
IsDecode Bolt 负责判断数据是否解码
Decode Bolt 负责数据解码
Rules Bolt 负责数据规则解析,引入规则引擎(Drools),解决数据变更需求。
FormatRule 负责数据格式化规则,适应不同的格式。
DataAdapter 负责数据存储适配,需要适配HDFS,HBase,Spark,数据库等。
Error Bolt 负责异常数据写入HDFS,方便异常数据明细查询。
Stat Bolt 统计从kafka消费数据量
在使用Storm中遇到了,业务配置需要变更时,怎么实现动态变更的问题?基于事件驱动的方式解决配置动态变更需求,最初考虑kafka创建事件队列,通过监控队列的事件数据来实现。后来在万能的GitHub找到了Storm-Signal,使用Storm-Signal实现,详细介绍见:https://github.com/ptgoetz/storm-signals。基于Drools实现规则引擎时,需要解决怎么不重启topology的情况下,让修改的规则文件生效,也是基于Storm-Signal组件和把drl文件转换为流存储在redis中,这样动态获取drl流文件。
组件之实时监控
接下来,给大家介绍下实时监控系统基于Storm的实现。在介绍之前,先介绍下OpenTSDB,实时监控的存储是采用OpenTSDB来处理的。
a.OpenTSDB
OpenTSDB是基于HBase存储时间序列数据的一个开源数据库,确切地说,它只是一个HBase的应用而已,其对于时间序列数据的处理可以供其他系统参考和借鉴。OpenTSDB使用HBase作为存储中心,它无须采样,可以完整的收集和存储上亿的数据点,支持秒级别的数据监控,得益于HBase的分布式列式存储,HBase可以灵活的支持metrics的增加,可以支持上万机器和上亿数据点的采集。在openTSDB中,TSD是HBase对外通信的daemon程序,没有master/slave之分,也没有共享状态,因此利用这点和HBase集群的特点就可以消除单点。用户可以通过telnet或者http协议直接访问TSD接口,也可以通过rpc访问TSD。每一个需要获取metrics的Servers都需要设置一个Collector用来收集时间序列数据。这个Collector就是你收集数据的脚本。
(插图12)
如果想快速地展示mysql中在一段时间内执行delete子句的数量,慢查询的数量,创建的临时文件数量以及99%的延迟数量等等。OpenTSDB则可以非常容易存储和处理百万级别以上的数据点,并能实时动态的生成对应的图,如下图:
(插图13)
OpenTSDB使用async HBase,这是个完全异步、非阻塞、线程安全、HBase api,使用更少的线程、锁以及内存可以提供更高的吞吐量,特别对于大量的写操作。下图为读写流程:
(插图14)
在HBase中,表结构的设计对性能具有很大的影响,其中tsdb-uid表和tsdb表见表1和表2
b.实时监控系统
总体架构图:
(插图17)
Kafka Spout 负责读Kafka数据
Decode Bolt 负责日志解码
Detail Bolt 负责原始数据存储ES集群,提示实时原始日志查询。
Stat Rules Bolt负责日志格式解析,引入规则引擎(Drools),解决数据格式变更需求。
TSD Bolt 负责多维度统计结果存储,通过TSD服务建立统计指标与Rowkey的映射关系
Alarm Bolt 负责多维度统计结果写入kafka Alarm Queue。
在实时监控系统的瓶颈不是在实时计算上,而是在结果存储方面?在存储方面,花了大量的时间去调优测试,其中也参考了携程对OpenTSDB的一些建议(携程的OpenTSDB使用的很好,好像还申请了专利)。存储这块,我们主要对它做了以下改进和优化,跟我们的需求进行定制修改源代码。OpenTSDB的改进和优化
(1)去除聚合时的分组插值,直接聚合
(2)修改了startkey和endkey中时间戳,只查询需要的行,对查询结果时间戳添加时区的偏移。
(3)添加降采样表,供采样粒度较大的查询使用,减少了rowkey数,提升了查询性能。
(4)添加协处理器支持,减少io和序列化/反序列化开销。
第4点的改动比较大,把OpenTSDB的查询处理功能有客户端搬迁到服务端(协处理器),大大减少了减少io和序列化/反序列化开销,性能提升明显。还有点,我们通过降维,添加分区标识来提高性能,这点主要利用HBase的分区特性。
组件之明细查询
接下再给大家介绍下,明细数据查询方案,引入了搜索引擎,架构如下图:
(插图18)
目前实现了实时监控系统的明细日志查询功能。基于Storm把明细数据转换成相应的格式,通过ElasticSearch-Storm组件实现日志实时写入ES,再通过ES实时查询。我们现在还利用ES解决HBase的二级索引问题。
关于Storm,我们抽出了一些公共组件,提高开发效率。
(插图19)
4.持续改进
目前我们数据中心存在的问题:
1、业务代码开发量大
2、海量数据查询问题
3、数据的解读问题
针对以上问题,从以下方面改进:
1、基于SummingBird实现Lambda架构
2、大数据存储与查询优化
3、数据可视化应用。大家对SummingBird有兴趣的,可以参考:http://clockfly.diandian.com/post/2014-05-19/summingbirdintroductionapplication
Q&A
Q1、想问下王总:spark目前主要使用在什么场景下,批处理,流处理,数据仓库sql这种all in one的模式比较起来有优势吗?还有就是Spark是如何和其他平台共享集群资源的?感谢!
spark我们目前的大部分新业务都是用spark与spark SQL实现,例如:用户画像。批处理,流处理这块主要是针对海量数据,在大数据中sql的支持度还不是很完善,spark主要基于yarn,资源管理由yarn来负责,我们通过公平调度的策略管理资源。
Q2、数据可视化的时候,不同角色的数据权限控制是怎样做的?
我们通过角色对应的功能点,目前还没做到数据权限,后续会通过acl去实现,在hadoop2.6已经提供了,我们集群的版本已经是最新的了
Q3、提个问题,我们在使用 spark sql 中发现其很不稳定,经常崩溃,而hive目前还是很稳定的,请问您是如何解决的呢?
我们采用的是spark 1.3版本,使用spark sql也是也会遇到一些问题,通过调整内存参数解决,一般通过spark原始api。
Q4、收集日志的sdk,与gateway之间是通过什么通信的?长连接,还是http?怎么协调多个不同的client?
通过http协议,定义了自己的通信协议,不是长连接。手机,pc不同的客户端,都是标识
Q5、storm和spark都有实时计算特性,各适合什么场景?
对稳定性,实时性要求高的,我们会采用storm,spark Streaming目前版本时延比较大,还有对kafka的支持不是很完善,坑多
Q6、这个架构多少人在维护,感觉语言,框架都用的挺多;修改过框架后,如果遇到版本升级,在升级版本的时候,如何处理修改过的内容
十多人维护,语言主要是java,scala。我们的改动比较大,所以后续会自己添加一些新的特性,机会成熟也可以考虑开源
Q7、关于大数据实现用户画像方面的成果、大概方案,能再介绍下不?感兴趣。
其实画像这块,目前还是第一版本,数据训练与分类是分开的,通过spark利用训练好的模型通过行为数据跑用户标签,二期,会基于spark Streaming spark mllib HBase完善标签系统,方案已定了。
Q8、spark目前批处理,流处理,数据仓库sql这种all in one的模式和hadoop生态比较,在实际使用中起来有优势吗?还有就是Spark是如何和其他平台共享集群资源?
相比mr,hive,作业的执行效率高,在相同的时间内处理的任务都要多,但hive的稳定性目前比spark SQL要好。spark部署在yarn上,由yarn负责集群资源调度。
补充:就是说spark在批处理的效率仍然高于mr,而all in one仍然还是各自处理数据,存储到外部。并没有在spark内直接实现数据互通吧?
是的,最终还是存储在hdfs,引入了tachyon,tachyon提高数据的读取。
补充:我本人现在做一个小项目,目的想实现在spark多应用之间的内存缓存数据的互通,例如streaming形成一个history不落地直接给sql使用。不知道有没有适合场景?
kafka Spark streaming有很多应用场景,后期,我们会画像会采用该方案