All posts by dotte

机器学习驱动编程:新世界的新编程

Where to buy 🚀 aged domains and backlinks 🔥 from Best-SEO-Domains | 0083-0608

如果今天要重新建立Google,这家公司的大部分系统将会是通过学习得来的,而非编写代码而来。Google的25,000名开发者中原本有大概10%精通机器学习,现在这一比例可能会达到100% — Jeff Dean

和天气一样,大家都会抱怨编程,但谁也没有为此做点什么。这种情况正在发生变化,就像突如其来的暴风雨一样,这些变化也来自一个出乎意料的方向:机器学习/深度学习。

我知道很多人对深度学习都听厌了。谁不是呢?但是编程技术很长时间来都处在一成不变的情况下,是时候做点什么改变这一情况了。

各种有关编程的“战斗”还在继续,但解决不了任何问题。函数还是对象,这种语言还是那种语言,这朵公有云还是那朵公有云或者这朵私有云亦或那朵“填补了空白”的云,REST还是Unrest,这种字节级编码还是另一种编码,这个框架还是那个框架,这种方法论还是那种方法论,裸机还是容器或者虚拟机亦或Unikernel,单层还是微服务或者NanoService,最终一致还是事务型,可变(Mutable)还是不可变(Immutable),DevOps还是NoOps或者SysOps,横向扩展还是纵向扩展,中心化还是去中心化,单线程还是大规模并行,同步还是异步。诸如此类永无止境。

年复一年,天天如此。我们只是创造了调用函数的不同方法,但最终的代码还需要我们人类来编写。更强大的方法应该是让机器编写我们需要的函数,这就是机器学习大展拳脚的领域了,可以为我们编写函数。这种年复一年天天如此的无聊活动还是交给机器学习吧。

机器学习驱动的编程

我最初是和Jeff Dean聊过之后开始接触用深度学习方式编程的。当时我针对聊天内容写了篇文章:《Jeff Dean针对Google大规模深度学习的看法》。强烈建议阅读本文了解深度学习技术。围绕本文的主题,本次讨论的重点在于深度学习技术如何有效地取代人工编写的程序代码:

Google的经验是对于大量子系统,其中一些甚至是通过机器学习方式获得的,使用更为通用的端到端机器学习系统取代它们。通常当你有大量复杂的子系统时,必须通过大量复杂代码将它们连接在一起。Google的目标是让大家使用数据和非常简单的算法取代这一切。

机器学习将成为软件工程的敏捷工具

Google研究总监Peter Norvig针对这个话题进行过详细的讨论:用深度学习与可理解性对抗软件工程和验证。他的大致想法如下:

  • 软件。你可以将软件看作为函数构建的规范,而实现相应的函数就可以满足规范的要求。
  • 机器学习。以(x,y)对为例,你会猜测某一函数会使用x作为参数并生成y作为结果,这样就能很好地概括出x这个新值。
  • 深度学习。依然是以(x,y)对为例,可以通过学习知道你所组建的表征(Representation)具备不同级别的抽象,而非简单直接的输入和输出。这样也可以很好地概括出x这个新值。

软件工程师并不需要介入这个循环,只需要将数据流入机器学习组件并流出所需的表征。

表征看起来并不像代码,而是类似这样:

这种全新类型的程序并非人类可以理解的函数分解(Functional decomposition),看起来更像是一堆参数。

在机器学习驱动的编程(MLDP)世界里,依然可以有人类的介入,不过这些人再也不叫“程序员”了,他们更像是数据科学家。

可以通过范例习得程序的部分内容吗?可以。

  • 有一个包含超过 2000 行代码的开源拼写检查软件的例子,这个软件的效果始终不怎么好。但只要通过包含 17 行代码的朴素贝叶斯分类器就能以同样性能实现这个软件的功能,但代码量减少了100倍,效果也变得更好。
  • AlphaGo是另一个例子,该程序的结构是手工编写的,所有参数则是通过学习得到的。

只通过范例可以习得完整的程序吗?简单的程序可以,但大型传统程序目前还不行。

  • 在Jeff Dean的讲话中,他介绍了一些有关使用神经网络进行序列学习的顺序的细节。借助这种方法可以从零开始构建最尖端的机器翻译程序,这将是一种端到端学习获得的完整系统,无需手工编写大量代码或机器学习的模型即可解决一系列细节问题。
  • 另一个例子是学习如何玩Atari出品的游戏,对于大部分游戏机器可以玩得和人一样好,甚至比人玩得更好。如果按照传统方式对不同组件进行长期的规划,效果肯定不会像现在这么好。
  • 神经图灵机(Neural Turing Machine)也是一次学习如何用程序编写更复杂程序的尝试,但Peter认为这种技术不会走的太远。

正如你所期待的,Norvig先生的这次讲话非常棒,很有远见,值得大家一看。

Google正在以开源代码的方式从GitHub收集数据

深度学习需要数据,如果你想要创建一个能通过范例学习如何编程的AI,肯定需要准备大量程序/范例作为学习素材。

目前Google和GitHub已经开始合作让开源数据更可用。他们已经从GitHub Archive将超过3TB的数据集上传至BigQuery,其中包含超过280万个开源GitHub代码库中包含的活动数据,以及超过1.45亿次提交,和超过20亿不同的文件路径。

这些都可以当作很棒的训练数据。

就真的完美无缺吗?

当然不是。神经网络如同“黑匣子”般的本质使其难以与其他技术配合使用,这就像试图将人类潜意识行为与意识行为背后的原因结合在一起一样。

Jeff Dean还提到了Google搜索评级团队在搜索评级研究工作中运用神经网络技术时的犹豫。对于搜索评级,他们想要了解整个模型,了解做出某一决策的原因。当系统出现错误后他们还想了解为什么会出现这样的错误。

为了解决这样的问题,需要创建配套的调试工具,而相关工具必须具备足够的可理解性。

这样的做法会有效的。针对搜索结果提供的机器学习技术RankBrain发布于2015年,现在已成为第三大最重要的搜索评级指标(指标共有100项)。详情可访问:Google将利润丰厚的网页搜索技术交给AI计算机处理

Peter还通过一篇论文《机器学习:技术债的高利率信用卡》进一步介绍了相关问题的更多细节。缺乏明确的抽象层,就算有Bug你也不知道到底在哪。更改任何细节都要改变全局,很难进行纠正,纠正后的结果更是难以预测。反馈环路,机器学习系统生成数据后会将这些数据重新送入系统,导致反馈环路。诱惑性损害(Attractive Nuisance),一旦一个系统使得所有人都想使用,这样的系统很可能根本无法在不同上下文中使用。非稳态(Non-stationarity),随着时间流逝数据会产生变化,因此需要指出到底需要使用哪些数据,但这个问题根本没有明确答案。配置依赖性,数据来自哪里?数据准确吗?其他系统中出现的改动是否会导致这些数据产生变化,以至于今天看到的结果和昨天不一样?训练用数据和生产用数据是否不同?系统加载时是否丢弃了某些数据?缺乏工具,标准化软件开发有很多优秀的工具可以使用,但机器学习编程是全新的,暂时还没有工具可用。

MLDP是未来吗?

这并不是一篇“天呐,世界末日到了”这样的文章,而是一篇类似于“有些事你可能没听说过,世界又将为此产生变化,这多酷啊”的文章。

这种技术依然处于非常早期的阶段。对程序的部分内容进行学习目前已经是可行的,而对大规模复杂程序进行学习目前还不实用。但是当我们看到标题类似于Google如何将自己重塑为一家“机器学习为先”的公司这样的文章,可能还不明白这到底真正意味着什么。这种技术不仅仅是为了开发类似AlphaGo这样的系统,最终结果远比这个还要深刻。

相关文章

查看英文原文Machine Learning Driven Programming: A New Programming for a New World

from:http://www.infoq.com/cn/articles/machine-learning-programing

Pinterest谈实战经验:如何在两年内实现零到数百亿的月访问

Pinterest一直保持着指数增长,每一个半月都会翻一翻。在两年内,他们实现了从0到数百亿的月PV;从开始的两个创始人加一个工程师增长到现在超过40个工程师,从一个小型的MySQL服务器增长到180个Web Enigne、240个API Enigne、88个MySQL DB(cc2.8xlarge,每个DB都会配置一个从属节点)、110个Redis Instance以及200个Mmecache Instance。

在一个名为 《Scaling Pinterest》 的主题演讲上,Pinterest的Yashwanth NelapatiMarty Weiner为我们讲述了这个戏剧性的过程。当然扩展到当下规模,Pinterest在众多选择中不可避免的走了许多的弯路,而Todd Hoff认为其中最宝贵的经验该归结于以下两点:

  1. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  2. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障,这导致他们对工具的选择有着特殊的偏好:成熟、简单、优秀、知名、被更多的用户喜爱、更好的支持、稳定且杰出的表现、通常情况下无故障以及免费。使用这些标准,他们选择了MySQL、Solr、Memcache、Redis、Cassandra,同时还抛弃了MongoDB。

同样这两个点是有关联的,符合第二个原则的工具就可以通过投入更多的主机进行扩展。即使负载的增加,项目也不会出现很多故障。即使真的出现难以解决的问题,至少有一个社区去寻找问题解决的方案。一旦你选择过于复杂和挑剔的工具,在扩展的道路上将充满荆棘。

需要注意的是所有他们选择的工具都依靠增加分片来进行扩展,而非通过集群。讲话中还阐述了为什么分片优于集群以及如何进行分片,这些想法可能是之前你闻所未闻的。

下面就看一下Pinterest扩展的阶段性时间轴:

项目背景

  • Pins是由其它零零碎碎信息集合成的图片,显示了对客户重要的信息,并且链接到它所在的位置。
  • Pinterest是一个社交网络,你可以follow(关注)其他人以及board。
  • 数据库:Pinterest的用户拥有board,而每个board都包含pin;follow及repin人际关系、验证信息。

1. 2010年3月发布——寻找真我的时代

在那时候,你甚至不知道需要建立一个什么样的产品。你有想法,所以你快速的迭代以及演变。而最终你将得到一些很小的MySQL查询,而这些查询在现实生活中你从未进行过。

Pinterest初期阶段的一些数字:

  • 2个创始人
  • 1个工程师
  • Rackspace
  • 1个小的网络引擎
  • 1个小的MySQL数据库
  • 2011年11月

仍然是小规模,产品通过用户反馈进行演变后的数字是:

  • Amazon EC2 + S3 + CloudFront
  • 1 NGinX, 4 Web Engines (用于冗余,不全是负载)
  • 1 MySQL DB + 1 Read Slave (用于主节点故障情况)
  • 1 Task Queue + 2 Task Processors
  • 1 MongoDB (用于计数)
  • 2 Engineers

2. 贯穿2011年——实验的时代

迈上疯狂增长的脚步,基本上每1个半月翻一翻。

  • 当你增长的如此之快,每一天每一星期你可能都需要打破或者抛弃一些东西。
  • 在这个时候,他们阅读大量的论文,这些论文都阐述着只需要添加一台主机问题就会得以解决。他们着手添加许多技术,随后又不得不放弃。
  • 于是出现了一些很奇怪的结果
  • Amazon EC2 + S3 + CloudFront
  • 2NGinX, 16 Web Engines + 2 API Engines
  • 5 Functionally Sharged MySQL DB + 9 read slaves
  • 4 Cassandra Nodes
  • 15 Membase Nodes (3 separate clusters)
  • 8 Memcache Nodes
  • 10 Redis Nodes
  • 3 Task Routers + 4 Task Processors
  • 4 Elastic Search Nodes
  • 3 Mongo Clusters
  • 3个工程师
  • 5个主数据库技术,只为了独立其中的数据。
  • 增长太快以至于MySQL疲于奔命,所有其它的技术也达到了极限。
  • 当你把事物用至极限时,这些技术都会以各自不同的方式出错。
  • 开始抛弃一些技术,并且自我反省究竟需要些什么,基本上重做了所有的架构。

3. 2012年2月——成熟的时代

  • 在重做了所有的架构后,系统呈现了如下状态
  • Amazon EC2 + S3 + Akamai, ELB
  • 90 Web Engines + 50 API Engines
  • 66 MySQL DBs (m1.xlarge) +,每个数据库都配备了从属节点
  • 59 Redis Instances
  • 51 Memcache Instances
  • 1 Redis Task Manager + 25 Task Processors
  • Sharded Solr
  • 6个工程师
  • 现在采用的技术是被分片的MySQL、Redis、Memcache和Solr,有点在于这些技术都很简单很成熟。
  • 网络传输增长仍然保持着以往的速度,而iPhone传输开始走高。

4. 2012年10月12日 —— 收获的季节

大约是1月份的4倍

  • 现在的数据是:
  • Amazon EC2 + S3 + Edge Cast,Akamai, Level 3
  • 180 Web Engines + 240 API Engines
  • 88 MySQL DBs (cc2.8xlarge) ,同样每个数据库都有一个从属节点
  • 110 Redis Instances
  • 200 Memcache Instances
  • 4 Redis Task Manager + 80 Task Processors
  • Sharded Solr
  • 40个工程师(仍在增长)
  • 需要注意的是,如今的架构已趋近完美,应对增长只需要投入更多的主机。
  • 当下已开始转移至SSD

下面一览该演讲中的干货,决策的制定:

为什么会选择EC2和S3

  1. 相当好的可靠性,即使数据中心发生故障。多租户会增加风险,但是也不是太坏。
  2. 良好的报告和支持。它们(EC2和S3)有着良好的架构,并且知道问题所在。
  3. 完善的周边设施,特别是在你需要快速增长时。你可以从APP Engine处获得maged cache、负载均衡、MapReduce、数据库管理以及其它你不想自己动手编写的组件,这可以加速你应用程序的部署,而在你工程师空闲时,你可以着手编写你需要的一切。
  4. 新的实例可以在几秒内就绪,这就是云的力量;特别是在只有两个工程师的初期,不需要去担心容量规划,更不需要花两个星期去建立自己的Memcache,你可以在数分钟内添加10个Memcached。
  5. 缺点:有限的选择。直到最近,才可以选择使用SSD,同时无法获得太大的内存配置。
  6. 优点:你不需要给大量的主机进行不同的配置。

为什么会选择MySQL

  1. 非常成熟。
  2. 非常稳定。不会宕机,并且不会丢失数据。
  3. 在招聘上具有优势,市场上有大把的人才。
  4. 在请求呈直线上升时,仍能将相应时间控制在一定的范围内,有些数据库技术在面对请求的飙升时表现并不是很好。
  5. 非常好的周边软件支持——XtraBackup、Innotop、Maatkit。
  6. 可以从类似Percona这样的公司得到优秀的技术支持。
  7. 开源(免费)——这一点非常重要,特别是在资金缺乏的初期

为什么使用Memcache

  • 非常成熟。
  • 非常简单。可以当成是一个socket哈希表
  • 杰出稳定的表现
  • 知名并为大量用户喜爱
  • 永不崩溃
  • 开源

为什么选择Redis

  • 虽然还不够成熟,但是非常简单及优秀
  • 提供了大量的数据结构类型
  • 提供多种的选择进行持久化和备份:你可以备份而非持久化,选择备份的话你还可以选择多久备份一次;同样你还可以选择使用什么方式进行持久化,比如MySQL等。
  • Home feed被储存在Redis上,每3个小时保存一次;然而并不是3个小时持久化一次,只是简单的每3个小时备份一次。
  • 如果你存储数据的主机发生故障,丢失的也只是备份周期内的数据。虽然不是完全可靠,但是非常简单。避免了复杂的持久化及复制,这样的架构简单且便宜。
  • 知名并为大量用户喜爱
  • 稳定且杰出的表现
  • 很少出故障。有一些专有的故障模型,你需要学会解决。这也是成熟的优势,只需要学习就可以解决。
  • 开源

Solr

  1. 只需要几分钟的安装时间,就可以投入使用
  2. 不能扩展到多于一台的机器上(最新版本并非如此)
  3. 尝试弹性搜索,但是以Pinterest的规模来说,可能会因为零碎文件和查询太多而产生问题。
  4. 选择使用Websolr,但是Pinterest拥有搜索团队,将来可能会开发自己的版本。

集群vs.分片

  • 在迅速扩展的过程中,Pinterest认识到每次负载的增加,都需要均匀的传播他们的数据。
  • 针对问题先确定解决方案的范围,他们选择的范围是集群和分片之间的一系列解决方案。

集群——所有的操作都是通过自动化

  • 比如:Cassandra、MemBase、HBase
  • 结论:没有安全感,将来可能会比较成熟,但是当下这个解决方案中还存在太多的复杂性和故障点。
  • 特性:
  • 数据自动分布
  • 节点间转移数据
  • 需要平衡分配
  • 节点间的相互通信,需要做很多措施用于防止干扰、无效传递及协商。
  • 优点:
  • 自动扩展你的数据存储,最起码论文中是这么说的。
  • 便于安装
  • 数据上的空间分布及机房共置。你可以在不同区域建立数据中心,数据库会帮你打理好一切。
  • 高有效性
  • 负载平衡
  • 不存在单点故障
  • 缺点:
  • 仍然不成熟。
  • 本质上说还很复杂。一大堆的节点必须对称协议,这一点非常难以解决。
  • 缺少社区支持。社区的讨论因为产品方向的不同而不能统一,而在每个正营中也缺乏强有力的支持。
  • 缺乏领域内资深工程师,可能大多数的工程师都还未使用过Cassandra。
  • 困难、没有安全感的机制更新。这可能是因为这些技术都使用API并且只在自己的领域内通行,这导致了复杂的升级路径。
  • 集群管理算法本身就用于处理SPOF(单点故障),如果存在漏洞的话可能就会影响到每个节点。
  • 集群管理器代码非常复杂,并且需要在所有节点上重复,这就可能存在以下的故障模式:
  • 数据平衡失控。当给集群中添加新的主机时,可能因为数据的拷贝而导致集群性能下降。那么你该做什么?这里不存在去发现问题所在的工具。没有社区可以用来求助,同样你也被困住了,这也是Pinterest回到MySQL的原因。
  • 跨节点的数据损坏。如果这里存在一个漏洞,这个漏洞可能会影响节点间的日志系统和压缩等其它组件?你的读延时增加,所有的数据都会陷入麻烦以及丢失。
  • 错误负载平衡很难被修复,这个现象十分普遍。如果你有10个节点,并且你注意到所有的负载都被堆积到一个节点上。虽然可以手动处理,但是之后系统还会将负载都加之一个节点之上。
  • 数据所有权问题,主次节点转换时的数据丢失。集群方案是非常智能的,它们会在特定的情况下完成节点权利的转换,而主次节点切换的过程中可能会导致数据的部分丢失,而丢失部分数据可能比丢失全部还糟糕,因为你不可能知道你究竟丢失了哪一部分。

分片——所有事情都是手动的

  • 结论:它是获胜者。Todd Hoff还认为他们的分片架构可能与Flickr架构类似。
  • 特性:
  • 分片可以让你摆脱集群方案中所有不想要的特性。
  • 数据需要手动的分配。
  • 数据不会移动。Pinterest永远都不会在节点间移动,尽管有些人这么做,这让他们在一定范围内站的更高。
  • 通过分割数据的方式分配负载。
  • 节点并没有互相通信,使用一些主节点控制程序的运行。
  • 优点:
  • 可以分割你的数据库以提高性能。
  • 空间分布及放置数据
  • 高有效性
  • 负载平衡
  • 放置数据的算法非常简单。主要原因是,用于处理单点故障的代码只有区区的半页,而不是一个复杂的集群管理器。并且经过短暂的测试就知道它是否能够正常工作。
  • ID生成非常简单
  • 缺点:
  • 不可以执行大多数的join。
  • 失去所有事务的能力。在一个数据库上的插入可能会成功,而在另一个上会失败。
  • 许多约束必须放到应用程序层。
  • 模式的转变需要从长计议。
  • 报告需要在所有分片上执行查询,然后需要手动的进行聚合。
  • Join在应用程序层执行。
  • 应用程序必须容忍以上所有问题。

什么时候进行分片

  1. 如果你的项目拥有PB级的数据,那么你需要立刻对其进行分片。
  2. Pin表格拥有百万行索引,索引大小已经溢出内存并被存入了磁盘。
  3. Pinterest使用了最大的表格,并将它们(这些索引)放入自己的数据库。
  4. 然后果断的超过了单数据库容量。
  5. 接着Pinterest必须进行分片。

分片的过渡

  • 过渡从一个特性的冻结开始。
  • 确认分片该达到什么样的效果——希望尽少的执行查询以及最少数量的数据库去呈现一个页面。
  • 剔除所有的MySQL join,将要做join的表格加载到一个单独的分片去做查询。
  • 添加大量的缓存,基本上每个查询都需要被缓存。
  • 这个步骤看起来像:
  • 1 DB + Foreign Keys + Joins
  • 1 DB + Denormalized + Cache
  • 1 DB + Read Slaves + Cache
  • Several functionally sharded DBs+Read Slaves+Cache
  • ID sharded DBs + Backup slaves + cache
  • 早期的只读从属节点一直都存在问题,因为存在slave lag。读任务分配给了从属节点,然而主节点并没有做任何的备份记录,这样就像一条记录丢失。之后Pinterest使用缓存解决了这个问题。
  • Pinterest拥有后台脚本,数据库使用它来做备份。检查完整性约束、引用。
  • 用户表并不进行分片。Pinterest只是使用了一个大型的数据库,并在电子邮件和用户名上做了相关的一致性约束。如果插入重复用户,会返回失败。然后他们对分片的数据库做大量的写操作。

如何进行分片

  • 可以参考Cassandra的ring模型、Membase以及Twitter的Gizzard。
  • 坚信:节点间数据传输的越少,你的架构越稳定。
  • Cassandra存在数据平衡和所有权问题,因为节点们不知道哪个节点保存了另一部分数据。Pinterest认为应用程序需要决定数据该分配到哪个节点,那么将永远不会存在问题。
  • 预计5年内的增长,并且对其进行预分片思考。
  • 初期可以建立一些虚拟分片。8个物理服务器,每个512DB。所有的数据库都装满表格。
  • 为了高有效性,他们一直都运行着多主节点冗余模式。每个主节点都会分配给一个不同的可用性区域。在故障时,该主节点上的任务会分配给其它的主节点,并且重新部署一个主节点用以代替。
  • 当数据库上的负载加重时:
  • 先着眼节点的任务交付速度,可以清楚是否有问题发生,比如:新特性,缓存等带来的问题。
  • 如果属于单纯的负载增加,Pinterest会分割数据库,并告诉应用程序该在何处寻找新的节点。
  • 在分割数据库之前,Pinterest会给这些主节点加入一些从属节点。然后置换应用程序代码以匹配新的数据库,在过渡的几分钟之内,数据会同时写入到新旧节点,过渡结束后将切断节点之间的通道。

ID结构

  • 一共64位
  • 分片ID:16位
  • Type:10位—— Board、User或者其它对象类型
  • 本地ID——余下的位数用于表中ID,使用MySQL自动递增。
  • Twitter使用一个映射表来为物理主机映射ID,这将需要备份;鉴于Pinterest使用AWS和MySQL查询,这个过程大约需要3毫秒。Pinterest并没有让这个额外的中间层参与工作,而是将位置信息构建在ID里。
  • 用户被随机分配在分片中间。
  • 每个用户的所有数据(pin、board等)都存放在同一个分片中,这将带来巨大的好处,避免了跨分片的查询可以显著的增加查询速度。
  • 每个board都与用户并列,这样board可以通过一个数据库处理。
  • 分片ID足够65536个分片使用,但是开始Pinterest只使用了4096个,这允许他们轻易的进行横向扩展。一旦用户数据库被填满,他们只需要增加额外的分片,然后让新用户写入新的分片就可以了。

查找

  • 如果存在50个查找,举个例子,他们将ID分割且并行的运行查询,那么延时将达到最高。
  • 每个应用程序都有一个配置文件,它将给物理主机映射一个分片范围。
  • “sharddb001a”: : (1, 512)
  • “sharddb001b”: : (513, 1024)——主要备份主节点
  • 如果你想查找一个ID坐落在sharddb003a上的用户:
  • 将ID进行分解
  • 在分片映射中执行查找
  • 连接分片,在数据库中搜寻类型。并使用本地ID去寻找这个用户,然后返回序列化数据。

对象和映射

  • 所有数据都是对象(pin、board、user、comment)或者映射(用户由baord,pin有like)。
  • 针对对象,每个本地ID都映射成MySQL Blob。开始时Blob使用的是JSON格式,之后会给转换成序列化的Thrift。
  • 对于映射来说,这里有一个映射表。你可以为用户读取board,ID包含了是时间戳,这样就可以体现事件的顺序。
  • 同样还存在反向映射,多表对多表,用于查询有哪些用户喜欢某个pin这样的操作。
  • 模式的命名方案是:noun_verb_noun: user_likes_pins, pins_like_user。
  • 只能使用主键或者是索引查找(没有join)。
  • 数据不会向集群中那样跨数据的移动,举个例子:如果某个用户坐落在20分片上,所有他数据都会并列存储,永远不会移动。64位ID包含了分片ID,所以它不可能被移动。你可以移动物理数据到另一个数据库,但是它仍然与相同分片关联。
  • 所有的表都存放在分片上,没有特殊的分片,当然用于检测用户名冲突的巨型表除外。
  • 不需要改变模式,一个新的索引需要一个新的表。
  • 因为键对应的值是blob,所以你不需要破坏模式就可以添加字段。因为blob有不同的版本,所以应用程序将检测它的版本号并且将新记录转换成相应的格式,然后写入。所有的数据不需要立刻的做格式改变,可以在读的时候进行更新。
  • 巨大的胜利,因为改变表格需要在上面加几个小时甚至是几天的锁。如果你需要一个新的索引,你只需要建立一张新的表格,并填入内容;在不需要的时候,丢弃就好。

呈现一个用户文件界面

  1. 从URL中取得用户名,然后到单独的巨型数据库中查询用户的ID。
  2. 获取用户ID,并进行拆分
  3. 选择分片,并进入
  4. SELECT body from users WHERE id =
  5. SELECT board_id FROM user_has_boards WHERE user_id=
  6. SELECT body FROM boards WHERE id IN ()
  7. SELECT pin_id FROM board_has_pins WHERE board_id=
  8. SELECT body FROM pins WHERE id IN (pin_ids)
  9. 所有调用都在缓存中进行(Memcache或者Redis),所以在实践中并没有太多连接数据库的后端操作。

脚本相关

  1. 当你过渡到一个分片架构,你拥有两个不同的基础设施——没有进行分片的旧系统和进行分片的新系统。脚本成为了新旧系统之间数据传输的桥梁。
  2. 移动5亿的pin、16亿的follower行等。
  3. 不要轻视项目中的这一部分,Pinterest原认为只需要2个月就可以完成数据的安置,然而他们足足花了4至5个月时间,别忘了期间他们还冻结了一项特性。
  4. 应用程序必须同时对两个系统插入数据。
  5. 一旦确认所有的数据都在新系统中就位,就可以适当的增加负载来测试新后端。
  6. 建立一个脚本农场,雇佣更多的工程师去加速任务的完成。让他们做这些表格的转移工作。
  7. 设计一个Pyres副本,一个到GitHub Resque队列的Python的接口,这个队列建立在Redis之上。支持优先级和重试,使用Pyres取代Celery和RabbitMQ更是让他们受益良多。
  8. 处理中会产生大量的错误,用户可能会发现类似丢失board的错误;必须重复的运行任务,以保证在数据的处理过程中不会出现暂时性的错误。

开发相关

  • 开始尝试只给开发者开放系统的一部分——他们每个人都拥有自己的MySQL服务器等,但是事情改变的太快,以至于这个模式根本无法实行。
  • 转变成Facebook模式,每个人都可以访问所有东西,所以不得不非常小心。

未来的方向

  • 基于服务的架构
  • 当他们发现大量的数据库负载,他们开始布置大量的应用程序服务器和一些其它的服务器,所有这些服务器都连接至MySQL和Memcache。这意味着在Memcache上将存在3万的连接,这些连接将占用几个G的内存,同时还会产生大量的Memcache守护进程。
  • 为了解决这个问题,将这些工作转移到了一个服务架构。比如:使用一个follower服务,这个服务将专注处理follower查询。这将接下30台左右的主机去连接数据库和缓存,从而减少了连接的数量。
  • 对功能进行隔离,各司其职。让一个服务的开发者不能访问其它的服务,从而杜绝安全隐患。

学到的知识

  1. 为了应对未来的问题,让其保持简单。
  2. 让其变的有趣。只要应用程序还在使用,就会有很多的工程师加入,过于复杂的系统将会让工作失去乐趣。让架构保持简单就是大的胜利,新的工程师从入职的第一周起就可以对项目有所贡献。
  3. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障。
  4. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  5. 集群管理算法本身就用于处理SPOF,如果存在漏洞的话可能就会影响到每个节点。
  6. 为了快速的增长,你需要为每次负载增加的数据进行均匀分配。
  7. 在节点间传输的数据越少,你的架构越稳定。这也是他们弃集群而选择分片的原因。
  8. 一个面向服务的架构规则。拆分功能,可以帮助减少连接、组织团队、组织支持以及提升安全性。
  9. 搞明白自己究竟需要什么。为了匹配愿景,不要怕丢弃某些技术,甚至是整个系统的重构。
  10. 不要害怕丢失一点数据。将用户数据放入内存,定期的进行持久化。失去的只是几个小时的数据,但是换来的却是更简单、更强健的系统!

原文链接: Scaling Pinterest – From 0 To 10s Of Billions Of Page Views A Month In Two Years (编译/仲浩 审校/王旭东)

from:http://www.csdn.net/article/2013-04-16/2814902-how-pinterest-scaling-0-to-billions-pv

Amazon's AWS

Where to buy 🚀 aged domains and backlinks 🔥 from Best-SEO-Domains | 0083-0608

原文链接:A Beginner’s Guide To Scaling To 11 Million+ Users On Amazon’s AWS

译者:杰微刊–汪建

 

一个系统从一个用户到多于1100万用户访问,你将如何对你的系统进行扩展?Amazon的web服务解决方案架构师乔尔?威廉姆斯就此话题给出了一个精彩的演讲:2015扩展你的第一个一千万用户。
如果你是一个拥有较丰富的AWS使用经验的用户,这个演讲将不太适合你,但如果你作为一个刚接触云、刚接触AWS的新用户,或者你还没有跟上Amazon源源不断对外发布的AWS新特性,它将是一个很好的入门资料。
正如大家所期望的,这个演讲讨论Amazon服务如何针对问题提出先进且主流的解决方案,Amazon平台总是令人印象深刻且拥有指导性。对于如何把所有产品组合在一起Amazon做了大量工作去提取出用户需要的是什么,并且确保Amazon对于每个用户的需求都拥有一个产品能满足这部分的需求。
演讲的一些有趣的要点:

1、一般刚开始时使用SQL而在必要时刻转向NoSQL。
2、一致的观点是通过引入组件去解耦系统,使用组件便于扩展并且组件故障不会影响到其他模块。组件便于使系统分层和构建微服务。
3、只把区别于已有任务的部分作为你的业务逻辑,不要重复发明轮子。
4、可伸缩性和冗余性不是两个互相独立的概念,你经常要将两个概念同时放在一起考虑。
5、没有提及成本,成为AWS解决方案被批评的一个主要方面。

 

基本情况
AWS覆盖全世界12个国家区域

1. 每个区域都对应着世界上的一个物理位置,每个位置都有弹性计算云提供多个可用区域(Availability Zones),这些区域包含北美、南美、欧洲、中东、非洲、亚太等地区。
2. 每个可用区域(AZ)实质上是单个数据中心,尽管它可由多个数据中心构造。
3. 每个可用区域都拥有很强的隔离性,他们各自拥有独立的电源和网络。
4. 可用区域之间只能通过低延迟网络互相连接,它们可以相距5或15英里,但网络的速度相当快以至于你的应用程序像在同一个数据中心。
5. 每个区域至少有2个可用区域,可用区域总共有32个。
6. 借助若干可用区域即可构建一个高可用的架构供你的应用使用。
7. 在即将到来的2016年将会增加至少9个可用区域和4个区域。

 

AWS在世界上拥有53个边缘位置
1. 这些边缘位置被用于Amazon的内容分发网络CDN、Route53、CloudFront以及Amazon的DNS管理服务器。
2. 边缘位置使用户可以在世界的任何角落低延迟地访问网页。
构建块服务
1. AWS已经使用多个可用区域构架了大量服务供使用,这些服务内部拥有高可用性和容错性。以下是可供使用的服务列表。
2. 你可以在你的应用中直接使用这些服务,它们是收费的,但使用它们你可以不必自己考虑高可用性。
3. 每个可用区域都提供很多服务,包括CloudFront, Route 53, S3, DynamoDB, 弹性负载均衡, EFS, Lambda, SQS, SNS, SES, SWF。
4. 即使这些服务只存在于一个单一的可用区域,通过使用这些服务任然可以构建一个高可用架构。
一个用户
在这种情况下,你是作为仅有的用户,你仅仅只想让web应用跑起来。
你的架构看起来像下面几点:

1. 运行在单独的实例上,可能是t2.micro型。实例类型包括了CPU、内存、存储和网络的不同组合,通过选择这些不同实例类型组成一个适合你的web应用的资源。
2. 在单独的实例上运行整个web栈,例如web应用程序、数据库以及各种管理系统等。
3. 使用Amazon的Route53作为DNS服务。
4. 在此实例上添加一个的弹性IP。
5. 在一段时间内运行的良好。
纵向扩展
1、你需要一个更大的容器放置你的应用,最简单的扩展方法是选择一个更大的实例类型,例如c4.8xlarge或者m3.2xlarge。
2、这种方法称为纵向扩展。
3、需要做的仅仅是选择一个新型实例取代原来的实例,应用跑起来即可以更加强大。
4、提供多种不同的硬件配置混搭选择,可以选择一个244G内存的系统(2TB的RAM即将到来),也可以选择40个CPU内核的系统,可以组成I/0密集型实例、CPU密集型实例以及高存储型实例。
5、Amazon的一些服务使用可配置的IOPS选项来保证性能,你可以使用小一点的实例去跑你的应用,对于需要扩展的服务独立使用Amazon的可扩展服务,例如DynamoDB。
6、纵向扩展有一个很大的问题:它不具备failover功能,同时也不具备冗余性。就像把所有鸡蛋都放在同一个篮子里,一旦实例发生故障你的web也会宕掉。
7、一个单独的实例最终能做到的也就这些,想要更加强大需要其他的措施。
10+用户
将单个主机分为多个主机
1. Web应用使用一台主机。
2. 数据库使用一台主机,你可以在上面跑任意数据库,只要负责数据库的管理。
3. 将主机分离成多个主机可以让web应用和数据库各自独立对自己进行扩展,例如在某种情况下可能你需要的数据库比web应用更大的规模。
或者你可以不自己搭建数据库转而使用Amazon的数据库服务
1. 你是一个DBA吗?你真的想要担心数据备份的问题吗?担心高可用?担心数据库补丁?担心操作系统?
2. 使用Amazon数据库服务有一大优势,你只要简单一点击即可完成多可用区域的数据库的安装,而且你不必担心这些可用区域之间的数据备份或其他类似的事情,这些数据库具备高可用性高可靠性。
正如你所想,Amazon有几种类型的完全托管数据库服务供出售:
1. Amazon RDS(Relational Database Service),可供选择的数据库类型相当多,包括Microsoft SQL Server, Oracle, MySQL, PostgreSQL, MariaDB, Amazon Aurora.
2. Amazon DynamoDB,一个NoSQL数据库。
3. Amazon Redshift,一个PB级的数据仓库系统。
更多Amazon 特性
1. 拥有自动扩展存储到64TB的能力,你不再需要限定你的数据存储。
2. 多大15个读副本。
3. 持续增量备份到S3。
4. 多达6路备份到3个可用区域,有助于处理故障。
5. MySQL兼容。
用SQL数据库取代NoSQL数据库
1. 建议使用SQL数据库。
2. SQL数据库相关技术完善。
3. 存在大量开源源码、社区、支持团队、书籍和工具。
4. 千万用户级别系统的还不足以拖垮SQL数据库,除非你的数据非常巨大。
5. 具有清晰的扩展模式。
什么时候你才需要使用NoSQL数据库
1. 如果你一年需要存储超过5TB的数据,或者你有一个令人难以置信的数据密集任务。
2. 你的应用具有超低延迟需求。
3. 你的应用需要一个非常高的吞吐量,需要在数据的读写I/O上进行优化。
4. 你的应用没有任何关系型数据。
100+用户
在web层进行主机分离。
使用Amazon RDS存储数据,它把数据库的所有工作都揽下了。
上面两点做好即可。

 

1000+用户
现在你构建的应用存在可用性问题,你的web应用将会宕掉如果你web服务的主机宕掉了。
你需要在另外一个可用区域上搭建另外一个web实例,由于可用区域之间拥有毫秒级别的低延迟,他们看起来就像互相挨着。
同样,你需要在另外一个可用区域上搭建一个RDS数据库slave,组成主备数据库,一旦主数据库发生故障你的web应用将会自动切换到slave备数据库。由于你的应用总是使用相同的端,failover不会带给应用任何改变。
在两个可用区域中分布着两个web主机实例,使用弹性负载均衡器(ELB)将用户访问分流到两个web主机实例。
弹性负载均衡器(ELB)
1. ELB是一个高可用的负载均衡器,它存在于所有的可用区域中,对于你的应用来说它是一个DNS服务,只需要把他放到Route53即可,它就会在你的web主机实例中进行负载分发。
2. ELB有健康检查机制,这个机制保证流量不会分发到宕掉的主机上。
3. 不用采取任何措施即可完成扩展,当它发现额外流量时它将在后台通过横向和纵向扩展,随着你的应用不断扩展,它也会自动不断扩展,而且这些都是系统自动完成的,你不必对ELB做任何管理。
10000到100000用户
前面例子中说到ELB后面挂载两个web主机实例,而实际上你可以在ELB后面挂载上千个主机实例,这就叫横向扩展。
添加更多的读副本到数据库中,或者添加到RDS中,但需要保持副本的同步。
通过转移一些流量到web层服务器减轻web应用的压力,例如从你的web应用中将静态内容抽离出来放到Amazon S3和Amazon CloudFront上,CloudFront是Amazon的CDN,它会将你的静态内容保存在全世界的53个边缘地区,通过这些措施提高性能和效率。
Amazon S3是一个对象仓库。
1. 它不像EBS,它不是搭载在EC2实例上的存储设备,它是一个对象存储而不是块存储。
2. 对于静态内容如JavaScript、css、图片、视频等存放在Amazon S3上再合适不过,这些内容没必要放到EC2实例上。
3. 高耐用性,11个9的可靠性。
4. 无限制的可扩展,只要你想可以往里面扔尽可能多的数据,用户在S3上存储了PB级别的数据。
5. 支持最大5TB的对象存储。
6. 支持加密,你可以使用Amazon的加密,或者你自己的加密,又或者第三方的加密服务。
Amazon CloudFront对你的内容提供缓存
1. 它将内容缓存在边缘地区以便供你的用户低延迟访问。
2. 如果没有CDN,将导致你的用户更高延迟地访问你的内容,你的服务器也会因为处理web层的请求而处于更高的负载。
3. 例如有个客户需要应对60Gbps的访问流量,CloudFront将一切都处理了,web层甚至都不知道有这么大的访问流量存在。
你还可以通过转移session状态减轻你的web层的负载
1. 将session状态保存到ElastiCache或DynamoDB。
2. 这个方法也让你的系统在未来可以自动扩展。
你也可以将数据库的一些数据缓存在ElastiCache减轻应用负载
数据库没有必要处理所有获取数据的请求,缓存服务可以处理这些请求从而让宝贵的数据库资源处理更加重要的操作。
Amazon DynamoDB——全托管的NoSQL数据库
1. 根据你自己想要的吞吐量,定制你想要的读写性能。
2. 支持高性能。
3. 具备分布式和容错性,它部署在多个可用区域中。
4. 它以kv结构存储,且支持JSON格式。
5. 支持最大400k大的文件。
Amazon Elasticache ——全托管的Memcached或Redis
1. 维护管理一个memcached集群并不会让你赚更多的钱,所以让Amazon来做。
2. Elasticache集群会自动帮你扩展,它是一个具备自我修复特性的基础设施,如果某些节点宕掉了其它的新节点即会自动启动。
你也可以转移动态内容到CloudFront减轻负载
众所周知CloudFront能处理静态内容,例如文件,但除此之外它还还能处理某些动态内容,这个话题不再进行深入的探讨,可以看看这个链接。
自动扩展
对于黑色星期五,假如你不用做任何扩展就足够处理这些峰值流量,那么你是在浪费钱。如果需求和计算能力相匹配自然是最好的,而这由自动扩展帮你实现,它会自动调整计算集群的大小。
作为用户,你可以决定集群的最小实例数和最大实例数,通过实例池中设置最小和最大实例数即可。
云监控是一种嵌入应用的管理服务
1. 云监控的事件触发扩展。
2. 你准备扩展CPU的数量吗?你准备优化延迟吗?准备扩展带宽吗?
3. 你也可以自定义一些指标到云监控上,如果你想要指定应用针对某些指标自动扩展,只需将这些指标放到云监控上,告诉根据云监控根据这些指标分别扩展哪些资源。
500000+用户
前面的配置可以自动扩展群组添加到web层,在两个可用区域里自动扩展群组,也可以在三个可用区域里扩展,在不同可用区域中的多实例模式不经可以确保可扩展性,同时也保证了可用性。
论题中的案例每个可用区域只有3个web层实例,其实它可以扩展成上千个实例,而你可以设置实例池中最小实例数为10最大实例数为1000。
ElastiCache用于承担数据库中热点数据的读负载。
DynamoDB用于Session数据的负载。
你需要增加监控、指标以及日志。
1. 主机级别指标,查看自动扩展的集群中的某一CPU参数,快速定位到问题的位置。
2. 整体级别指标,查看弹性负载均衡的指标判断整个实例集群的整体性能。
3. 日志分析,使用CloudWatch日志查看应用有什么问题,可以使用CloudTrail对这些日志进行分析管理。
4. 外部站点监控,使用第三方服务例如New Relic或Pingdom监控作为终端用户看到了什么情况。
你需要知道你的用户的反馈,他们是不是访问延迟很慢,他们在访问你的web应用时是不是出现了错误。
从你的系统结构中尽可能多地排出性能指标,这有助于自动扩展的决策,你可不希望你的系统CPU使用率才20%。
自动化运维
随着基础设施越来越大,它扩展到了上千个实例,我们有读副本,我们有水平横线扩展,对于这些我们需要一些自动化运维措施去对他们进行管理,我们可不希望对着每个实例一个一个单独地管理。
动化运维工具分为两个层级
1. DIY层,包括Amazon EC2和AWS CloudFormation。
2. 更高层次的服务,包括AWS Elastic Beanstalk和AWS OpsWorks。
AWS Elastic Beanstalk,为你的应用自动管理基础设施,很方便。
AWS OpsWorks,应用程序管理服务,用于部署和操作不同形态规模的应用程序,它还能做到持续集成。
AWS CloudFormation
1. 提供了最大的灵活性,它提供了你的应用栈的模板,它可以构建你的整个应用栈,或者仅仅是应用栈中的某个组件。
2. 如果你要更新你的应用栈你只要更新CloudFormation模板,它将更新你的整个应用。
3. 它拥有大量的控制,但缺乏便利性。
AWS CodeDeploy,部署你的程序到整个EC2实例集群
1. 可以部署一到上千个实例。
2. Code Deploy可以指向一个自动扩展配置。
3. 可连同Chef和Puppet一起使用。
解耦基础设施
使用SOA/微服务,从你的应用抽离出不同服务,就像前面你将web层与数据库层分离出来那样,再分别创建这些服务。
这些独立出来的服务就可以根据自己需要扩展,它给你系统的扩展带来了灵活性,同时也保证了高可用性。
SOA是Amazon搭建架构关键的组成部分。
松耦合解放了你
1. 你可以对某些服务单独地扩展和让它失效。
2. 如果一个工作节点从SQS拉取数据失败,没有没关系?没有,只要重启另外一个工作节点即可,所有操作都有可能发生故障,所以一定要搭建一个可以处理故障的架构,提供failover功能。
3. 将所有模块设置成黑盒。
4. 把交互设计成松耦合方式。
5. 优先考虑内置了冗余性和可扩展性的服务,而不是靠自己构建实现。
不要重复发明轮子
只需把你区别于已有任务的部分作为你的业务逻辑。
Amazon的很多服务本身具备容错能力,因为他们跨多个可用区域,例如:队列、邮件、转码、搜索、数据库、监控、性能指标采集、日志处理、计算等服务,没有必要自己搭建。
SQS:队列服务
1. Amazon提供的第一个服务。
2. 它是跨可用区域的所以拥有容错性。
3. 它具备可扩展性、安全性、简单性。
4. 队列可以帮助你的基础设施上的不同组件之间传递消息。
5. 以图片管理系统为例,图片收集系统和图片处理系统是两个不同的系统,他们各自都可以独立地扩展,他们之间具备松耦合特性,摄取照片然后扔进队列里面,图片处理系统可以拉取队列里面的图片再对其进行其他处理。
AWS Lambda,用于代码部署和服务管理。
1. 提供解耦你的应用程序的工具。
2. 在前面图片系统的例子中,Lambda可以响应S3的事件,就像S3中某文件被增加时Lambda相关函数会被自动触发去处理一些逻辑。
3. 已经在EC2上集成,供应用扩展。
百万级别用户
当用户数量达到百万级别时,这就要求前面提到的所有方案都要综合考虑。
1. 扩展多为可用区域。
2. 在所有层之间使用弹性负载均衡,不仅在web层使用,而且还要在应用层、数据层以及应用包含的其他所有层都必须使用弹性负载均衡。
3. 自动伸缩能力。
4. 面向服务的架构体系。
5. 巧妙使用S3和CloudFront部署一部分内容。
6. 在数据库前面引入缓存。
7. 将涉及状态的对象移除出Web层。
使用Amazon SES发送邮件。
使用CloudWatch监控。

 

千万级别用户
当我们的系统变得越来越大,我们会在数据层遇到一些问题,你可能会遇到竞争写主库的数据库问题,这也就意味着你最多只能发送这么多写流量到一台服务器上。
你如何解决此问题?
1. Federation,根据你的应用功能把数据库分成多个库。
2. Sharding,分表分片,使用多个服务器分片。
3. 把部分数据迁移到其他类型的数据库上,例如NoSQL、graph等。
Federation——根据应用功能切分成多个库
1. 例如,创建一个论坛数据库、一个用户数据库、一个产品数据库,你可能之前就是一个数据库包含这所有类型的数据,所以现在要将他们拆分开。
2. 按照功能分离出来的数据库可以各自独立进行扩展。
3. 缺点:不能做跨数据库查询。
Sharding——将数据分割到多主机上
1. 应用层变得更加复杂,扩展能力更强。
2. 例如,对于用户数据库,三分之一的用户被发送到一个分片上,三分之一发到另一个分片上,最后三分之一发到第三个分片。
将数据迁移到其他类型的数据库上
1. 考虑NoSQL数据库。
2. 如果你的数据不要求复杂的join操作,比如说排行榜,日志数据,临时数据,热表,元数据/查找表等等,满足这些情况可以考虑迁移到NoSQL数据库上。
3. 这意味着他们可以各自单独扩展。
11000000用户
扩展是一个迭代的过程,当你的系统变得越来越大,你总有更多的事情需要你解决。
调整你的应用架构。
更多的SOA特性和功能。
从多可用区域到多区域。
自定义解决方案去解决你的特定问题,当用户量到达十亿级别时自定义解决方案是必要的。
深入分析你的整个应用栈。
回顾
使用多可用区域的基础设施提升可靠性。
使用自带扩展能力的服务,比如ELB,S3,SQS,SNS,DynamoDB等等。
每一层级都建立冗余,可扩展性和冗余性不是两个分开单独的概念,经常需要同时考虑两者。
刚开始使用传统关系型数据库。
在你的基础设施的里面和外面都考虑缓冲数据。
在你的基础设施中使用自动化工具。
确保你的应用有良好的指标采样、系统监控、日志记录,确保收集你的用户访问你的应用过程中产生的问题。
将各个层分拆成独立的SOA服务,让这些服务能保持最大的独立性,可以各自进行扩展,及时发生故障也不波及其他。
一旦做了足够的准备及可使用自动扩展功能。
不重复发明轮子,尽量使用托管服务而不是自己构建,除非非要不可。
必要的情况下转向NoSQL数据库。

参考资料
On HackerNews / On Reddit

http://aws.amazon.com/documentation

http://aws.amazon.com/architecture

http://aws.amazon.com/start-ups

http://aws.amazon.com/free

From:http://www.jfh.com/jfperiodical/article/1242

 

Flume+Kafka+Storm+Redis实时分析系统基本架构

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同。这篇文章的目的只是带大家入个门,让大家对实时分析技术有一个简单的认识,并和大家一起做学习交流。
文章的最后还有Troubleshooting,分享了作者在部署本文示例程序过程中所遇到的各种问题和解决方案。

系统基本架构

整个实时分析系统的架构就是先由电商系统的订单服务器产生订单日志, 然后使用Flume去监听订单日志,并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 接着由Storm系统消费Kafka中的消息,同时消费记录由Zookeeper集群管理,这样即使Kafka宕机重启后也能找到上次的消费记录,接着从上次宕机点继续从Kafka的Broker中进行消费。但是由于存在先消费后记录日志或者先记录后消费的非原子操作,如果出现刚好消费完一条消息并还没将信息记录到Zookeeper的时候就宕机的类似问题,或多或少都会存在少量数据丢失或重复消费的问题, 其中一个解决方案就是Kafka的Broker和Zookeeper都部署在同一台机子上。接下来就是使用用户定义好的Storm Topology去进行日志信息的分析并输出到Redis缓存数据库中(也可以进行持久化),最后用Web APP去读取Redis中分析后的订单信息并展示给用户。之所以在Flume和Storm中间加入一层Kafka消息系统,就是因为在高并发的条件下, 订单日志的数据会井喷式增长,如果Storm的消费速度(Storm的实时计算能力那是最快之一,但是也有例外, 而且据说现在Twitter的开源实时计算框架Heron比Storm还要快)慢于日志的产生速度,加上Flume自身的局限性,必然会导致大量数据滞后并丢失,所以加了Kafka消息系统作为数据缓冲区,而且Kafka是基于log File的消息系统,也就是说消息能够持久化在硬盘中,再加上其充分利用Linux的I/O特性,提供了可观的吞吐量。架构中使用Redis作为数据库也是因为在实时的环境下,Redis具有很高的读写速度。

业务背景
各大电商网站在合适的时间进行各种促销活动已是常态,在能为网站带来大量的流量和订单的同时,对于用户也有不小的让利,必然是大家伙儿喜闻乐见的。在促销活动期间,老板和运营希望能实时看到订单情况,老板开心,运营也能根据实时的订单数据调整运营策略,而让用户能实时看到网站的订单数据,也会勾起用户的购买欲。但是普通的离线计算系统已然不能满足在高并发环境下的实时计算要求,所以我们得使用专门实时计算系统,如:Storm, Heron, Spark Stream等,去满足类似的需求。
既然要分析订单数据,那必然在订单产生的时候要把订单信息记录在日志文件中。本文中,作者通过使用log4j2,以及结合自己之前开发电商系统的经验,写了一个订单日志生成模拟器,代码如下,能帮助大家随机产生订单日志。下面所展示的订单日志文件格式和数据就是我们本文中的分析目标,本文的案例中用来分析所有商家的订单总销售额并找出销售额钱20名的商家。

订单数据格式:
orderNumber: XX | orderDate: XX | paymentNumber: XX | paymentDate: XX | merchantName: XX | sku: [ skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;] | price: [ totalPrice: XX discount: XX paymentPrice: XX ]

订单日志生成程序:
使用log4j2将日志信息写入文件中,每小时滚动一次日志文件

  1.       
  2.         
  3.           
  4.         
  5.         
  6.           filePattern=”/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log”>
  7.             
  8.             
  9.                 
  10.             
  11.         
  12.       
  13.       
  14.         
  15.           
  16.           
  17.         
  18.       


      
        
          
        
    	
        	
       		
        		
      		
                  		
      
      
        
          
          
        
      

生成器代码:

  1. package com.guludada.ordersInfo;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Random;
  5. // Import log4j classes.
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. public class ordersInfoGenerator {
  9.     public enum paymentWays {
  10.         Wechat,Alipay,Paypal
  11.     }
  12.     public enum merchantNames {
  13.         优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
  14.         暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
  15.     }
  16.     public enum productNames {
  17.         黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
  18.         沙滩拖鞋
  19.     }
  20.     float[] skuPriceGroup = {299,399,699,899,1000,2000};
  21.     float[] discountGroup = {10,20,50,100};
  22.     float totalPrice = 0;
  23.     float discount = 0;
  24.     float paymentPrice = 0;
  25.     private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
  26.     private int logsNumber = 1000;
  27.     public void generate() {
  28.         for(int i = 0; i <= logsNumber; i++) {
  29.             logger.info(randomOrderInfo());
  30.         }
  31.     }
  32.     public String randomOrderInfo() {
  33.         SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
  34.         Date date = new Date();
  35.         String orderNumber = randomNumbers(5) + date.getTime();
  36.         String orderDate = sdf.format(date);
  37.         String paymentNumber = randomPaymentWays() + “-” + randomNumbers(8);
  38.         String paymentDate = sdf.format(date);
  39.         String merchantName = randomMerchantNames();
  40.         String skuInfo = randomSkus();
  41.         String priceInfo = calculateOrderPrice();
  42.         return “orderNumber: ” + orderNumber + ” | orderDate: ” + orderDate + ” | paymentNumber: ” +
  43.             paymentNumber + ” | paymentDate: ” + paymentDate + ” | merchantName: ” + merchantName +
  44.             ” | sku: ” + skuInfo + ” | price: ” + priceInfo;
  45.     }
  46.     private String randomPaymentWays() {
  47.         paymentWays[] paymentWayGroup = paymentWays.values();
  48.         Random random = new Random();
  49.         return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
  50.     }
  51.     private String randomMerchantNames() {
  52.         merchantNames[] merchantNameGroup = merchantNames.values();
  53.         Random random = new Random();
  54.         return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
  55.     }
  56.     private String randomProductNames() {
  57.         productNames[] productNameGroup = productNames.values();
  58.         Random random = new Random();
  59.         return productNameGroup[random.nextInt(productNameGroup.length)].name();
  60.     }
  61.     private String randomSkus() {
  62.         Random random = new Random();
  63.         int skuCategoryNum = random.nextInt(3);
  64.         String skuInfo =”[“;
  65.         totalPrice = 0;
  66.         for(int i = 1; i <= 3; i++) {
  67.             int skuNum = random.nextInt(3)+1;
  68.             float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
  69.             float totalSkuPrice = skuPrice * skuNum;
  70.             String skuName = randomProductNames();
  71.             String skuCode = randomCharactersAndNumbers(10);
  72.             skuInfo += ” skuName: ” + skuName + ” skuNum: ” + skuNum + ” skuCode: ” + skuCode
  73.                     + ” skuPrice: ” + skuPrice + ” totalSkuPrice: ” + totalSkuPrice + “;”;
  74.             totalPrice += totalSkuPrice;
  75.         }
  76.         skuInfo += ” ]”;
  77.         return skuInfo;
  78.     }
  79.     private String calculateOrderPrice() {
  80.         Random random = new Random();
  81.         discount = discountGroup[random.nextInt(discountGroup.length)];
  82.         paymentPrice = totalPrice – discount;
  83.         String priceInfo = “[ totalPrice: ” + totalPrice + ” discount: ” + discount + ” paymentPrice: ” + paymentPrice +” ]”;
  84.         return priceInfo;
  85.     }
  86.     private String randomCharactersAndNumbers(int length) {
  87.         String characters = “abcdefghijklmnopqrstuvwxyz0123456789”;
  88.         String randomCharacters = “”;
  89.                 Random random = new Random();
  90.                 for (int i = 0; i < length; i++) {
  91.               randomCharacters += characters.charAt(random.nextInt(characters.length()));
  92.                 }
  93.                 return randomCharacters;
  94.     }
  95.     private String randomNumbers(int length) {
  96.         String characters = “0123456789”;
  97.         String randomNumbers = “”;
  98.                 Random random = new Random();
  99.                 for (int i = 0; i < length; i++) {
  100.              randomNumbers += characters.charAt(random.nextInt(characters.length()));
  101.                 }
  102.                return randomNumbers;
  103.     }
  104.     public static void main(String[] args) {
  105.         ordersInfoGenerator generator = new ordersInfoGenerator();
  106.         generator.generate();
  107.     }
  108. }
package com.guludada.ordersInfo;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class ordersInfoGenerator {
	
	public enum paymentWays {
		Wechat,Alipay,Paypal
	}
	public enum merchantNames {
		优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
		暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
	}
	
	public enum productNames {
		黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
		沙滩拖鞋
	}
	
	float[] skuPriceGroup = {299,399,699,899,1000,2000};
	float[] discountGroup = {10,20,50,100};
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
	private int logsNumber = 1000;
	
	public void generate() {
				
		for(int i = 0; i <= logsNumber; i++) {			
			logger.info(randomOrderInfo());			
		}
	}
	
	public String randomOrderInfo() {
		
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");		
		Date date = new Date();		
		
		String orderNumber = randomNumbers(5) + date.getTime();
		
		String orderDate = sdf.format(date);
		
		String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
		
		String paymentDate = sdf.format(date);
		
		String merchantName = randomMerchantNames();
		
		String skuInfo = randomSkus();
		
		String priceInfo = calculateOrderPrice();
		
		return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
			paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName + 
			" | sku: " + skuInfo + " | price: " + priceInfo;
	}
		
	private String randomPaymentWays() {
		
		paymentWays[] paymentWayGroup = paymentWays.values();
		Random random = new Random();
		return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
	}
	
	private String randomMerchantNames() {
		
		merchantNames[] merchantNameGroup = merchantNames.values();
		Random random = new Random();
		return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
	}
	
	private String randomProductNames() {
		
		productNames[] productNameGroup = productNames.values();
		Random random = new Random();
		return productNameGroup[random.nextInt(productNameGroup.length)].name();
	}
	
	
	private String randomSkus() {
		
		Random random = new Random();
		int skuCategoryNum = random.nextInt(3);
		
		String skuInfo ="[";
		
		totalPrice = 0;
		for(int i = 1; i <= 3; i++) {
			
			int skuNum = random.nextInt(3)+1;
			float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
			float totalSkuPrice = skuPrice * skuNum;			
			String skuName = randomProductNames();
			String skuCode = randomCharactersAndNumbers(10);
			skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
					+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";		
			totalPrice += totalSkuPrice;
		}
		
		
		skuInfo += " ]";
		
		return skuInfo;
	}
	
	private String calculateOrderPrice() {
		
		Random random = new Random();
		discount = discountGroup[random.nextInt(discountGroup.length)];
		paymentPrice = totalPrice - discount;
		
		String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
		
		return priceInfo;
	}
	
	private String randomCharactersAndNumbers(int length) {
		
		String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
		String randomCharacters = "";  
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	  randomCharacters += characters.charAt(random.nextInt(characters.length()));  
                }  
                return randomCharacters;  
	}
	
	private String randomNumbers(int length) {
		
		String characters = "0123456789";
		String randomNumbers = "";   
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	 randomNumbers += characters.charAt(random.nextInt(characters.length()));  
                }  
               return randomNumbers;		
	}
	
	public static void main(String[] args) {
		
		ordersInfoGenerator generator = new ordersInfoGenerator();
		generator.generate();
	}
}

收集日志数据
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,FlumeApache的一个顶级项目,与Kafka也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。

Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件(Flume Event)并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分给下一个装在分布式系统中其它服务器上的Flume Agent进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。

在本文中,Flume的Source我们选择的是Exec Source,因为是实时系统,直接通过tail 命令来监听日志文件,而在Kafka的Broker集群端的Flume我们选择Kafka Sink 来把数据下沉到Kafka消息系统中。

下图是来自Flume官网里的Flume拉取数据的架构图:

图片来源:http://flume.apache.org/FlumeUserGuide.html

订单日志产生端的Flume配置文件如下:

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = exec
  5. agent.sources.origin.command = tail -F /export/data/trivial/app.log
  6. agent.sources.origin.channels = memorychannel
  7. agent.sources.origin.interceptors = i1
  8. agent.sources.origin.interceptors.i1.type = static
  9. agent.sources.origin.interceptors.i1.key = topic
  10. agent.sources.origin.interceptors.i1.value = ordersInfo
  11. agent.sinks.loggerSink.type = logger
  12. agent.sinks.loggerSink.channel = memorychannel
  13. agent.channels.memorychannel.type = memory
  14. agent.channels.memorychannel.capacity = 10000
  15. agent.sinks.target.type = avro
  16. agent.sinks.target.channel = memorychannel
  17. agent.sinks.target.hostname = 172.16.124.130
  18. agent.sinks.target.port = 4545
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /export/data/trivial/app.log
agent.sources.origin.channels = memorychannel

agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545

Kafka消息系统端Flume配置文件

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = avro
  5. agent.sources.origin.channels = memorychannel
  6. agent.sources.origin.bind = 0.0.0.0
  7. agent.sources.origin.port = 4545
  8. agent.sinks.loggerSink.type = logger
  9. agent.sinks.loggerSink.channel = memorychannel
  10. agent.channels.memorychannel.type = memory
  11. agent.channels.memorychannel.capacity = 5000000
  12. agent.channels.memorychannel.transactionCapacity = 1000000
  13. agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
  14. #agent.sinks.target.topic = bigdata
  15. agent.sinks.target.brokerList=localhost:9092
  16. agent.sinks.target.requiredAcks=1
  17. agent.sinks.target.batchSize=100
  18. agent.sinks.target.channel = memorychannel
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
#agent.sinks.target.topic = bigdata
agent.sinks.target.brokerList=localhost:9092
agent.sinks.target.requiredAcks=1
agent.sinks.target.batchSize=100
agent.sinks.target.channel = memorychannel

这里需要注意的是,在日志服务器端的Flume agent中我们配置了一个interceptors,这个是用来为Flume Event(Flume Event就是拉取到的一行行的日志信息)的头部添加key为“topic”的K-V键值对,这样这条抓取到的日志信息就会根据topic的值去到Kafka中指定的topic消息池中,当然还可以为Flume Event额外配置一个key为“Key”的键值对,Kafka Sink会根据key“Key”的值将这条日志信息下沉到不同的Kafka分片上,否则就是随机分配。在Kafka集群端的Flume配置里,有几个重要的参数需要注意,“topic”是指定抓取到的日志信息下沉到Kafka哪一个topic池中,如果之前Flume发送端为Flume Event添加了带有topic的头信息,则这里可以不用配置;brokerList就是配置Kafka集群的主机地址和端口;requireAcks=1是配置当下沉到Kafka的消息储存到特定partition的leader中成功后就返回确认消息,requireAcks=0是不需要确认消息成功写入Kafka中,requireAcks=-1是指不光需要确认消息被写入partition的leander中,还要确认完成该条消息的所有备份;batchSize配置每次下沉多少条消息,每次下沉的数量越多延迟也高。

Kafka消息系统
这一部分我们将谈谈Kafka的配置和使用,Kafka在我们的系统中实际上就相当于起到一个数据缓冲池的作用, 有点类似于ActiveQ的消息队列和Redis这样的缓存区的作用,但是更可靠,因为是基于log File的消息系统,数据不容易丢失,以及能记录数据的消费位置并且用户还可以自定义消息消费的起始位置,这就使得重复消费消息也可以得以实现,而且同时具有队列和发布订阅两种消息消费模式,十分灵活,并且与Storm的契合度很高,充分利用Linux系统的I/O提高读写速度等等。另一个要提的方面就是Kafka的Consumer是pull-based模型的,而Flume是push-based模型。push-based模型是尽可能大的消费数据,但是当生产者速度大于消费者时数据会被覆盖。而pull-based模型可以缓解这个压力,消费速度可以慢于生产速度,有空余时再拉取那些没拉取到的数据。

Kafka是一个分布式的高吞吐量的消息系统,同时兼有点对点和发布订阅两种消息消费模式Kafka主要由Producer,Consumer和Broker组成。Kafka中引入了一个叫“topic”的概念,用来管理不同种类的消息,不同类别的消息会记录在到其对应的topic池中,而这些进入到topic中的消息会被Kafka写入磁盘的log文件中进行持久化处理。Kafka会把消息写入磁盘的log file中进行持久化对于每一个topic里的消息log文件,Kafka都会对其进行分片处理,而每一个消息都会顺序写入中log分片中,并且被标上“offset”的标量来代表这条消息在这个分片中的顺序,并且这些写入的消息无论是内容还是顺序都是不可变的。所以Kafka和其它消息队列系统的一个区别就是它能做到分片中的消息是能顺序被消费的,但是要做到全局有序还是有局限性的,除非整个topic只有一个log分片。并且无论消息是否有被消费,这条消息会一直保存在log文件中,当留存时间足够长到配置文件中指定的retention的时间后,这条消息才会被删除以释放空间。对于每一个Kafka的Consumer,它们唯一要存的Kafka相关的元数据就是这个“offset”值,记录着Consumer在分片上消费到了哪一个位置。通常Kafka是使用Zookeeper来为每一个Consumer保存它们的offset信息,所以在启动Kafka之前需要有一个Zookeeper集群;而且Kafka默认采用的是先记录offset再读取数据的策略,这种策略会存在少量数据丢失的可能。不过用户可以灵活设置Consumer的“offset”的位置,在加上消息记录在log文件中,所以是可以重复消费消息的。log的分片和它们的备份分散保存在集群的服务器上,对于每一个partition,在集群上都会有一台这个partition存在服务器作为leader,而这个partitionpartition的其它备份所在的服务器做为follower,leader负责处理关于这个partition所有请求,而follower负责这个partition的其它备份的同步工作,当leader服务器宕机时,其中一个follower服务器就会被选举为新的leader。

一般的消息系统分为两种模式,一种是点对点的消费模式,也就是queuing模式,另一种是发布订阅模式,也就是publish-subscribe模式,而Kafka引入了一个Consumer Group的概念,使得其能兼有两种模式。在Kafka中,每一个consumer都会标明自己属于哪个consumer group,每个topic的消息都会分发给每一个subscribe了这个topic的所有consumer group中的一个consumer实例。所以当所有的consumers都在同一个consumer group中,那么就像queuing的消息系统,一个message一次只被一个consumer消费。如果每一个consumer都有不同consumer group,那么就像public-subscribe消息系统一样,一个消息分发给所有的consumer实例。对于普通的消息队列系统,可能存在多个consumer去同时消费message,虽然message是有序地分发出去的,但是由于网络延迟的时候到达不同的consumer的时间不是顺序的,这时就失去了顺序性,解决方案是只用一个consumer去消费message,但显然不太合适。而对于Kafka来说,一个partiton只分发给每一个consumer group中的一个consumer实例,也就是说这个partition只有一个consumer实例在消费,所以可以保证在一个partition内部数据的处理是有序的,不同之处就在于Kafka内部消息进行了分片处理,虽然看上去也是单consumer的做法,但是分片机制保证了并发消费。如果要做到全局有序,那么整个topic中的消息只有一个分片,并且每一个consumer group中只能有一个consumer实例。这实际上就是彻底牺牲了消息消费时的并发度。

 

Kafka的配置和部署十分简单
1. 首先启动Zookeeper集群,Kafka需要Zookeeper集群来帮助记录每一个Consumer的offset
2. 为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有两个节点的Kafka集群,那么节点1和节点2的最基本的配置如下:

  1. config/server-1.properties:
  2.     broker.id=1
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=export/data/kafka
    zookeeper.connect=localhost:2181
  1. config/server-2.properties:
  2.     broker.id=2
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=/export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9093
    log.dir=/export/data/kafka
    zookeeper.connect=localhost:2181

broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。
3. 配置完上面的配置文件后,只要分别在节点上
输入下面命令启动Kafka进程就可以使用了

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...


Storm实时计算框架

接下来开始介绍本篇文章要使用的实时计算框架StormStrom是一个非常快的实时计算框架,至于快到什么程度呢?官网首页给出的数据是每一个Storm集群上的节点每一秒能处理一百万条数据。相比Hadoop“Mapreduce”计算框架,Storm使用的是"Topology"Mapreduce程序在计算完成后最终会停下来,而Topology则是会永远运行下去除非你显式地使用“kill -9 XXX”命令停掉它。和大多数的集群系统一样,Storm集群也存在着Master节点和Worker节点,在Master节点上运行的一个守护进程叫“Nimbus”,类似于Hadoop“JobTracker”的功能,负责集群中计算程序的分发,任务的分发,监控任务和工作节点的运行情况等;Worker节点上运行的守护进程叫“Supervisor”,负责接收Nimbus分发的任务并运行,每一个Worker上都会运行着Topology程序的一部分,而一个Topology程序的运行就是由集群上多个Worker一起协同工作的。值得注意的是NimubsSupervisor之间的协调工作也是通过Zookeeper来管理的,NimbusSupervisor自己本身在集群上都是无状态的,它们的状态都保存在Zookeeper上,所以任何节点的宕机和动态扩容都不会影响整个集群的工作运行,并支持fast-fail机制。

Storm有一个很重要的对数据的抽象概念,叫做“Stream”,我们姑且称之为数据流,数据流Stream就是由之间没有任何关系的松散的一个一个的数据元组“tuples”所组成的序列。要在Storm上做实时计算,首先你得有一个计算程序,这就是“Topology”,一个Topology程序由“Spout”“Bolt”共同组成。Storm就是通过Topology程序将数据流Stream通过可靠(ACK机制)的分布式计算生成我们的目标数据流Stream,就比如说把婚恋网站上当日注册的所有用户信息数据流Stream通过Topology程序计算出月收入上万年龄在30岁以下的新的用户信息流Stream。在我们的文章中,Spout就是实现了特定接口的Java类,它相当于数据源,用于产生数据或者从外部接收数据;而Bolt就是实现了Storm Bolt接口的Java类,用于消费从Spout发送出来的数据流并实现用户自定义的数据处理逻辑;对于复杂的数据处理,可以定义多个连续的Bolt去协同处理。最后在程序中通过SpoutBolt生成Topology对象并提交到Storm集群上执行。

tuplesStorm的数据模型,,由值和其所对应的field所组成,比如说在SpoutBolt中定义了发出的元组的field为:(name,age,gender),那么从这个SpoutBolt中发出的数据流的每一个元组值就类似于(''咕噜大大",27,"中性")Storm中还有一个Stream Group的概念,它用来决定从Spout或或或Bolt组件中发出的tuples接下来应该传到哪一个组件中或者更准确地说在程序里设置某个组件应该接收来自哪一个组件的tuples; 并且在Storm中提供了多个用于数据流分组的机制,比如说shuffleGrouping,用来将当前组件产生的tuples随机分发到下一个组件中,或者 fieldsGrouping,根据tuplesfield值来决定当前组件产生的tuples应该分发到哪一个组件中。
 
另一部分需要了解的就是Stormtasksworkers的概念。每一个worker都是一个运行在物理机器上的JVM进程,每个worker中又运行着多个task线程,这些task线程可能是Spout任务也可能是Bolt任务,由Nimbus根据RoundRobin负载均衡策略来分配,而至于在整个Topology程序里要起几个Spout线程或Bolt线程,也就是tasks,由用户在程序中设置并发度来决定。

Storm集群的配置文件如下:
Storm的配置文件在项目的conf目录下,也就是:conf/storm.yaml

  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements.  See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership.  The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License.  You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ########### These MUST be filled in for a storm configuration
  17. storm.zookeeper.servers:
  18.   - "ymhHadoop"
  19.   - "ymhHadoop2"
  20.   - "ymhHadoop3"
  21. storm.local.dir: "/export/data/storm/workdir"
  22. nimbus.host: "ymhHadoop"
  23. supervisor.slots.ports:
  24.   -6700
  25.   -6701
  26.   -6702
  27.   -6703
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
  - "ymhHadoop"
  - "ymhHadoop2"
  - "ymhHadoop3"    

storm.local.dir: "/export/data/storm/workdir"
 
nimbus.host: "ymhHadoop"

supervisor.slots.ports:
  -6700
  -6701
  -6702
  -6703 
 
storm.zookeeper.servers自然就是用来配置我们熟悉的Zookeeper集群中各个节点的URI地址和端口的
storm.local.dir 是用来配置storm节点相关文件的存储目录的,每一个storm集群的节点在本地服务器上都要有一个目录存储少量的和该节点有关的一些信息。记得要开发这个目录的读写权限哦
nimbus.host 自然就是用来指定nimbus服务器的URI
supervisor.slots.ports 这个是用来配置supervisor服务器启动的worker所监听的端口,每一个worker就是一个物理的JVM进程。上面这些是基本配置,并且要严格按照上面的格式来,少一个空格都会报错。

接下来就是将配置文件拷贝到集群的各个机器上,然后在分别在nimbussupervisor机器上通过$bin/storm nimbus $bin/storm supervisor命令来启动集群上的机子。最后在nimbus上通过$bin/storm UI 命令可以启动Storm提供的UI界面,功能十分强大,可以监控集群上各个节点的运行状态,提交Topology任务,监控Topology任务的运行情况等。这个UI界面可以通过http://{nimbus host}:8080的地址访问到。



Redis数据库

Redis是一个基于内存的多种数据结构的存储工具,经常有人说Redis是一个基于key-value数据结构的缓存数据库,这种说法必然是不准确的,Key-Value只是其中的一种数据结构的实现,Redis支持Stringshasheslistssetssorted sets等多种常见的数据结构,并提供了功能强大的范围查询,以及提供了INCRINCRBY,DECR,DECRBY等多种原子命令操作,保证在并发的环境下不会出现脏数据。虽然Redis是基于内存的数据库,但也提供了多种硬盘持久化策略,比如说RDB策略,用来将某个时间点的Redis的数据快照存储在硬盘中,或者是AOF策略,将每一个Redis操作命令都不可变的顺序记录在log文件中,恢复数据时就将log文件中的所有命令顺序执行一遍等等。Redis不光可以作为网站热点数据的缓存服务器,还可以用来做数据库,或者消息队列服务器的broker等。在本文中选择Redis作为订单分析结果的存储工具,一方面是其灵活的数据结构和强大的数据操作命令,另一方面就是在大数据的实时计算环境下,需要Redis这样的具备高速I/O的数据库。
 
在本文的例子中,作者使用Sorted Sets数据结构来存储各个商家的总订单销售额,Sorted Sets数据结构由Key, Scoreelement value 三部分组成,Set的数据结构保证同一个key中的元素值不会重复,而在Sorted Sets结构中是通过 Score来为元素值排序,这很自然地就能将各个商家的总订单销售额设置为Score,然后商家名称为element value,这样就能根据总订单销售额来为商家排序。在Storm程序中,我们通过Jedis API来调用Redis
$ZINCRBY KEY INCREMENT MEMBER
的命令来统计商家总销售额, ZINCRBY是一个原子命令,能保证在Storm的并发计算的环境下,正确地增加某个商家的Score的值,也就是它们的订单总销售额。而对于两个商家同名这种情况应该在业务系统中去避免而不应该由我们的数据分析层来处理。最后提一个小trips,就是如果所有商家的Score都设置成相同的分数,那么Redis就会默认使用商家名的字母字典序来排序。

Kafka+Storm+Redis的整合
当数据被Flume拉取进Kafka消息系统中,我们就可以使用Storm来进行消费,Redis来对结果进行存储。StormKafka有很好的兼容性,我们可以通过Kafka Spout来从Kafka中获取数据;在Bolt处理完数据后,通过Jedis API在程序中将数据存储在Redis数据库中。

下面就是Kafka Spout和创建Topology的程序代码:

BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
zkHosts是用来指定Zookeeper集群的节点的URI和端口,而Zookeeper集群是用来记录SpoutKafka消息消费的offset位置

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
主要是用来将SpoutKafka拉取来的byte[]数组格式的数据转化为Stormtuples

  1. package com.guludada.ordersanalysis;
  2. import java.util.UUID;
  3. import backtype.storm.Config;
  4. import backtype.storm.LocalCluster;
  5. import backtype.storm.StormSubmitter;
  6. import backtype.storm.generated.AlreadyAliveException;
  7. import backtype.storm.generated.InvalidTopologyException;
  8. import backtype.storm.spout.SchemeAsMultiScheme;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. import storm.kafka.Broker;
  12. import storm.kafka.BrokerHosts;
  13. import storm.kafka.KafkaSpout;
  14. import storm.kafka.SpoutConfig;
  15. import storm.kafka.StaticHosts;
  16. import storm.kafka.StringScheme;
  17. import storm.kafka.ZkHosts;
  18. import storm.kafka.trident.GlobalPartitionInformation;
  19. public class ordersAnalysisTopology {
  20.     private static String topicName = "ordersInfo";
  21.     private static String zkRoot = "/stormKafka/"+topicName;
  22.     public static void main(String[] args) {
  23.         BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
  24.         SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
  25.         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  26.         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  27.         TopologyBuilder builder = new TopologyBuilder();
  28.         builder.setSpout("kafkaSpout",kafkaSpout);
  29.         builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");
  30.         Config conf = new Config();
  31.         conf.setDebug(true);
  32.         if(args != null && args.length > 0) {
  33.             conf.setNumWorkers(1);
  34.             try {
  35.                 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  36.             } catch (AlreadyAliveException e) {
  37.                 // TODO Auto-generated catch block
  38.                 e.printStackTrace();
  39.             } catch (InvalidTopologyException e) {
  40.                 // TODO Auto-generated catch block
  41.                 e.printStackTrace();
  42.             }
  43.         } else {
  44.             conf.setMaxSpoutPending(3);
  45.             LocalCluster cluster = new LocalCluster();
  46.             cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
  47.         }
  48.     }
  49. }
package com.guludada.ordersanalysis;

import java.util.UUID;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.Broker;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.GlobalPartitionInformation;

public class ordersAnalysisTopology {
	
	private static String topicName = "ordersInfo";
	private static String zkRoot = "/stormKafka/"+topicName;
	
	public static void main(String[] args) {
		
		BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");

		
		SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();        
		builder.setSpout("kafkaSpout",kafkaSpout);        
		builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");

		Config conf = new Config();
		conf.setDebug(true);
		
		if(args != null && args.length > 0) {
			conf.setNumWorkers(1);
			try {
				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} else {
			
			conf.setMaxSpoutPending(3);
			
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
			
			
		}

	}
}

下面是Bolt程序,主要是用来处理从Kafka拉取到的订单日志信息, 并计算出所有商家的总订单收入,然后使用Jedis API将计算结果存入到Redis数据库中。

 

  1. package com.guludada.domain;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. public class ordersBean {
  5.     Date createTime = null;
  6.     String number = "";
  7.     String paymentNumber = "";
  8.     Date paymentDate = null;
  9.     String merchantName = "";
  10.     ArrayList skuGroup = null;
  11.     float totalPrice = 0;
  12.     float discount = 0;
  13.     float paymentPrice = 0;
  14.     public Date getCreateTime() {
  15.         return createTime;
  16.     }
  17.     public void setCreateTime(Date createTime) {
  18.         this.createTime = createTime;
  19.     }
  20.     public String getNumber() {
  21.         return number;
  22.     }
  23.     public void setNumber(String number) {
  24.         this.number = number;
  25.     }
  26.     public String getPaymentNumber() {
  27.         return paymentNumber;
  28.     }
  29.     public void setPaymentNumber(String paymentNumber) {
  30.         this.paymentNumber = paymentNumber;
  31.     }
  32.     public Date getPaymentDate() {
  33.         return paymentDate;
  34.     }
  35.     public void setPaymentDate(Date paymentDate) {
  36.         this.paymentDate = paymentDate;
  37.     }
  38.     public String getMerchantName() {
  39.         return merchantName;
  40.     }
  41.     public void setMerchantName(String merchantName) {
  42.         this.merchantName = merchantName;
  43.     }
  44.     public ArrayList getSkuGroup() {
  45.         return skuGroup;
  46.     }
  47.     public void setSkuGroup(ArrayList skuGroup) {
  48.         this.skuGroup = skuGroup;
  49.     }
  50.     public float getTotalPrice() {
  51.         return totalPrice;
  52.     }
  53.     public void setTotalPrice(float totalPrice) {
  54.         this.totalPrice = totalPrice;
  55.     }
  56.     public float getDiscount() {
  57.         return discount;
  58.     }
  59.     public void setDiscount(float discount) {
  60.         this.discount = discount;
  61.     }
  62.     public float getPaymentPrice() {
  63.         return paymentPrice;
  64.     }
  65.     public void setPaymentPrice(float paymentPrice) {
  66.         this.paymentPrice = paymentPrice;
  67.     }
  68. }
package com.guludada.domain;

import java.util.ArrayList;
import java.util.Date;

public class ordersBean {

	Date createTime = null;
	String number = "";
	String paymentNumber = "";
	Date paymentDate = null;
	String merchantName = "";
	ArrayList skuGroup = null;
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	public Date getCreateTime() {
		return createTime;
	}
	public void setCreateTime(Date createTime) {
		this.createTime = createTime;
	}
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}
	public String getPaymentNumber() {
		return paymentNumber;
	}
	public void setPaymentNumber(String paymentNumber) {
		this.paymentNumber = paymentNumber;
	}
	public Date getPaymentDate() {
		return paymentDate;
	}
	public void setPaymentDate(Date paymentDate) {
		this.paymentDate = paymentDate;
	}
	public String getMerchantName() {
		return merchantName;
	}
	public void setMerchantName(String merchantName) {
		this.merchantName = merchantName;
	}
	public ArrayList getSkuGroup() {
		return skuGroup;
	}
	public void setSkuGroup(ArrayList skuGroup) {
		this.skuGroup = skuGroup;
	}
	public float getTotalPrice() {
		return totalPrice;
	}
	public void setTotalPrice(float totalPrice) {
		this.totalPrice = totalPrice;
	}
	public float getDiscount() {
		return discount;
	}
	public void setDiscount(float discount) {
		this.discount = discount;
	}
	public float getPaymentPrice() {
		return paymentPrice;
	}
	public void setPaymentPrice(float paymentPrice) {
		this.paymentPrice = paymentPrice;
	}
	
	
}

本文例子中用不到skusbean,所以这里作者就没有写委屈偷懒一下下

  1. package com.guludada.domain;
  2. public class skusBean {
  3.       ………………
  4. }
package com.guludada.domain;

public class skusBean {
      ………………
}

logInfoHandler用来过滤订单的日志信息,并保存到ordersBean和skusBean中,方便Bolt获取日志数据的各项属性进行处理

  1. package com.guludada.common;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. import com.guludada.domain.ordersBean;
  7. public class logInfoHandler {
  8.     SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  9.     public ordersBean getOrdersBean(String orderInfo) {
  10.         ordersBean order = new ordersBean();
  11.         //从日志信息中过滤出订单信息
  12.         Pattern orderPattern = Pattern.compile("orderNumber:.+");
  13.         Matcher orderMatcher = orderPattern.matcher(orderInfo);
  14.         if(orderMatcher.find()) {
  15.             String orderInfoStr = orderMatcher.group(0);
  16.             String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
  17.             //获取订单号
  18.             String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
  19.             order.setNumber(orderNum);
  20.             //获取创建时间
  21.             String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
  22.             try {
  23.                 order.setCreateTime(sdf_final.parse(orderCreateTime));
  24.             } catch (ParseException e) {
  25.                 // TODO Auto-generated catch block
  26.                 e.printStackTrace();
  27.             }
  28.             //获取商家名称
  29.             String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
  30.             order.setMerchantName(merchantName);
  31.             //获取订单总额
  32.             String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
  33.             String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
  34.             order.setTotalPrice(Float.parseFloat(totalPrice));
  35.             return order;
  36.         } else {
  37.             return order;
  38.         }
  39.     }
  40. }
package com.guludada.common;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.domain.ordersBean;

public class logInfoHandler {
	
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public ordersBean getOrdersBean(String orderInfo) {
		
		ordersBean order = new ordersBean();
		
		//从日志信息中过滤出订单信息
		Pattern orderPattern = Pattern.compile("orderNumber:.+");
		Matcher orderMatcher = orderPattern.matcher(orderInfo);
		if(orderMatcher.find()) {
			
			String orderInfoStr = orderMatcher.group(0);
			String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
			
			//获取订单号
			String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
			order.setNumber(orderNum);
						
			//获取创建时间
			String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
			try {
				order.setCreateTime(sdf_final.parse(orderCreateTime));
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
			//获取商家名称
			String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
			order.setMerchantName(merchantName);
			
			//获取订单总额
			String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
			String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
			order.setTotalPrice(Float.parseFloat(totalPrice));
						
			return order;
						
		} else {
			return order;
		}
	}
}

 

  1. package com.guludada.ordersanalysis;
  2. import java.util.Map;
  3. import com.guludada.common.logInfoHandler;
  4. import com.guludada.domain.ordersBean;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichBolt;
  9. import backtype.storm.tuple.Tuple;
  10. import redis.clients.jedis.Jedis;
  11. import redis.clients.jedis.JedisPool;
  12. import redis.clients.jedis.JedisPoolConfig;
  13. public class merchantsSalesAnalysisBolt extends BaseRichBolt {
  14.     private OutputCollector _collector;
  15.     logInfoHandler loginfohandler;
  16.     JedisPool pool;
  17.     public void execute(Tuple tuple) {
  18.         String orderInfo = tuple.getString(0);
  19.         ordersBean order = loginfohandler.getOrdersBean(orderInfo);
  20.         //store the salesByMerchant infomation into Redis
  21.         Jedis jedis = pool.getResource();
  22.         jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
  23.     }
  24.     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
  25.         this._collector = collector;
  26.         this.loginfohandler = new logInfoHandler();
  27.         this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
  28.     }
  29.     public void declareOutputFields(OutputFieldsDeclarer arg0) {
  30.         // TODO Auto-generated method stub
  31.     }
  32. }
package com.guludada.ordersanalysis;

import java.util.Map;

import com.guludada.common.logInfoHandler;
import com.guludada.domain.ordersBean;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class merchantsSalesAnalysisBolt extends BaseRichBolt {
	
	private OutputCollector _collector;
	logInfoHandler loginfohandler;
	JedisPool pool;

	public void execute(Tuple tuple) {
		String orderInfo = tuple.getString(0);
		ordersBean order = loginfohandler.getOrdersBean(orderInfo);
		
		//store the salesByMerchant infomation into Redis
		Jedis jedis = pool.getResource();
		jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this._collector = collector;
		this.loginfohandler = new logInfoHandler();
		this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
		
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		
	}

}

Topology项目的Maven配置文件

  1.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2.   4.0.0
  3.   com.guludada
  4.   Storm_OrdersAnalysis
  5.   war
  6.   0.0.1-SNAPSHOT
  7.   Storm_OrdersAnalysis Maven Webapp
  8.   http://maven.apache.org
  9.   
  10.     
  11.         org.apache.storm
  12.         storm-core
  13.         0.9.6
  14.         provided
  15.     
  16.     
  17.         org.apache.storm
  18.         storm-kafka
  19.         0.9.6
  20.     
  21.     
  22.         org.apache.kafka
  23.         kafka_2.10
  24.         0.9.0.1
  25.             
  26.                 
  27.                     org.apache.zookeeper
  28.                     zookeeper
  29.                 
  30.                 
  31.                     log4j
  32.                     log4j
  33.                 
  34.                 
  35.                     org.slf4j
  36.                     slf4j-log4j12
  37.                 
  38.             
  39.     
  40.     
  41.         redis.clients
  42.         jedis
  43.         2.8.1
  44.     
  45.   
  46.   
  47.     Storm_OrdersAnalysis
  48.     
  49.         
  50.             maven-assembly-plugin
  51.             
  52.                 
  53.                     jar-with-dependencies
  54.                 
  55.                 
  56.                    
  57.                      com.guludada.ordersanalysis.ordersAnalysisTopology
  58.                    
  59.                  
  60.              
  61.           
  62.       
  63.   

  4.0.0
  com.guludada
  Storm_OrdersAnalysis
  war
  0.0.1-SNAPSHOT
  Storm_OrdersAnalysis Maven Webapp
  http://maven.apache.org
  
    
		org.apache.storm
		storm-core
		0.9.6
		provided
	
	
        org.apache.storm
        storm-kafka
        0.9.6
    
    
    	org.apache.kafka
        kafka_2.10
        0.9.0.1
            
                
                    org.apache.zookeeper
                    zookeeper
                
                
                    log4j
                    log4j
                
                
                    org.slf4j
    				slf4j-log4j12
                
            
    
    
	    redis.clients
	    jedis
	    2.8.1
		
  
  
    Storm_OrdersAnalysis
    
		
			maven-assembly-plugin
			
				  
			    	jar-with-dependencies
			    
			    
			       
			         com.guludada.ordersanalysis.ordersAnalysisTopology
			       
			     
			 
		  
	  
  

maven配置文件中配置了一个官方推荐的maven-assembly-plugin插件,用来帮助用户方便地打包Topology程序的。只需要进入到项目的根路径,然后运行
$mvn assembly:assembly
命令就可以打包好Topologyjar包了。

最后我带大家梳理一下整个项目的部署流程
1.  启动Zookeeper
2. 启动Kafka
3. 启动Flume将程序拉取到Kafka
4. 启动Storm集群
5. 启动Redis服务端  通过命令
$ src/redis-server
6. 提交打包好的Topology程序到Storm集群中通过Storm UI 或者命令$storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
7. 启动RedisCLI客户端查看结果通过命令
$ src/redis-cli --raw
$  zrange key 0 -1 withscores

如下图:

 

Troubleshooting
  1. 在使用maven同时导入storm-core, storm-kaka和kafka的依赖包的时候可能会出现jar包冲突导致无法初始化Log4jLoggerFactory,并无法启动Storm程序.解决方法也很简单,按照红字提示,把多余的jar包移除就行了,通过在maven的pom文件中kafka的依赖设置部分加入下面的设置org.slf4jslf4j-log4j12
  2. 第一次执行Storm建立Topology时,作者遇到了一个十分低级的问题,就是发现明明Kafka的topic里有数据,可是Storm程序怎么都无法读取到数据,后来才从下面的文章中明白了问题的所在 http://m.blog.csdn.net/article/details?id=18615761  原因就在于Topology第一次启动前还没有在zookeeper中的zkRoot创建offset信息,Storm取不到offset信息就会使用默认的offset,也就是log文件中从最后一个元素开始读取信息,所以之前在kafka中的数据都无法读出来。Storm启动后,再往broker中写数据,这些后写的数据就能正确被Storm处理。
  3. 当Storm的topology传到Nimbus的时候,或者说你的Storm程序刚开始启动的时候可能会报关于JedisPool是一个无法序列化的对象而导致的错误:java.lang.RuntimeException:java.io.NotSerializableException: redis.clients.jedis.JedisPool 解决方案就是将Bolt类中外部的JedisPool初始化代码放入Bolt的prepare()方法中,如本文的代码示例所示
  4. 在Storm启动并开始连接Redis的时候,会报出连接被拒绝,因为Redis运行在protect mode模式下的错误。这是因为Storm程序是远程连接Redis的服务器端,如果Redis服务器端没有设置密码的话是拒绝远程连接的。解决方法也十分简单,关闭protect mode模式(强烈不推荐),或者使用下面命令为Redis设置密码就可以了$config set requirepass 123
  5. 向Storm提交Topology以后, Supervisor端会一直报“Kill XXXX No Such process”的错误,多数原因是提交的topology没有正确被执行,而Storm的日记中不会显示topology程序里的错误。解决方法就是启动Storm UI, 通过这个Storm自带的UI界面查看topology的运行情况,并且程序中的错误也会在UI界面中显示出来,能方便地查看topology程序的错误。

    6.kafka使用的时候的小问题:
        当在一台机子上启动kafka producer客户端的时候,是无法在同一台机子上继续启动kafka的consumer客户端的,因为这两个进程可能占用的同一个端口,需要在另外一台机子上启动kafka consumer程序,这样就能看见正确的结果了

最后,感谢所有耐心看完这篇文章的人,楼主也深感自己的技术水平和语言表达还有很多需要提高的地方,希望能和大家一起交流学习共同进步,欢迎大家留下宝贵的意见和评论!还有再最后吐槽一下,CSDN的文章编辑器在我的MAC系统的火狐浏览器下十分十分十分十分难用,字体格式等根本不受控制,各种莫名其妙的BUG…………

from:http://blog.csdn.net/ymh198816/article/details/51998085

Flume+Spark+Hive+Spark SQL离线分析系统

前段时间把Scala和Spark一起学习了,所以借此机会在这里做个总结,顺便和大家一起分享一下目前最火的分布式计算技术Spark!当然Spark不光是可以做离线计算,还提供了许多功能强大的组件,比如说,Spark Streaming 组件做实时计算,和Kafka等消息系统也有很好的兼容性;Spark Sql,可以让用户通过标准SQL语句操作从不同的数据源中过来的结构化数据;还提供了种类丰富的MLlib库方便用户做机器学习等等。Spark是由Scala语言编写而成的,Scala是运行在JVM上的面向函数的编程语言,它的学习过程简直反人类,可读性就我个人来看,也不是能广为让大众接受的语言,但是它功能强大,熟练后能极大提高开发速度,对于实现同样的功能,所需要写的代码量比Java少得多得多,这都得益于Scala的语言特性。本文借鉴作者之前写的另一篇关于Hadoop离线计算的文章,继续使用那篇文章中点击流分析的案例,只不过MapReduce部分改为由Spark离线计算来完成,同时,你会发现做一模一样的日志清洗任务,相比上一篇文章,代码总数少了非常非常多,这都是Scala语言的功劳。本篇文章在Flume部分的内容和之前的Hadoop离线分析文章的内容基本一致,Hive部分新加了对Hive数据仓库的简单说明,同时还补充了对HDFS的说明和配置,并且新加了大量对Spark框架的详细介绍,文章的最后一如既往地添加了Troubleshooting段落,和大家分享作者在部署时遇到的各种问题,读者们可以有选择性的阅读。

PS:本文Spark说明部分的最后一段非常重要,作者总结了Spark在集群环境下不得忽略的一些特性,所有使用Spark的用户都应该要重点理解。或者读者们可以直接阅读官方文档加深理解:http://spark.apache.org/docs/latest/programming-guide.html

Spark离线分析系统架构图

这里写图片描述
整个离线分析的总体架构就是使用Flume从FTP服务器上采集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Spark的RDDs操作函数清洗日志文件,最后使用Spark SQL配合HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,当然大家也可以尝试一些自动化的任务调度工具,比如说AZKABAN或者OOZIE等。
分析所使用的点击流日志文件主要来自Nginx的access.log日志文件,需要注意的是在这里并不是用Flume直接去生产环境上拉取nginx的日志文件,而是多设置了一层FTP服务器来缓冲所有的日志文件,然后再用Flume监听FTP服务器上指定的目录并拉取目录里的日志文件到HDFS服务器上(具体原因下面分析)。从生产环境推送日志文件到FTP服务器的操作可以通过Shell脚本配合Crontab定时器来实现。

网站点击流数据


图片来源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

一般在WEB系统中,用户对站点的页面的访问浏览,点击行为等一系列的数据都会记录在日志中,每一条日志记录就代表着上图中的一个数据点;而点击流数据关注的就是所有这些点连起来后的一个完整的网站浏览行为记录,可以认为是一个用户对网站的浏览session。比如说用户从哪一个外站进入到当前的网站,用户接下来浏览了当前网站的哪些页面,点击了哪些图片链接按钮等一系列的行为记录,这一个整体的信息就称为是该用户的点击流记录。这篇文章中设计的离线分析系统就是收集WEB系统中产生的这些数据日志,并清洗日志内容存储分布式的HDFS文件存储系统上,接着使用离线分析工具HIVE去统计所有用户的点击流信息。
本系统中我们采用Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式如下:

样例数据格式:
124.42.13.230 – – [18/Sep/2013:06:57:50 +0000] “GET /shoppingMall?ver=1.2.1 HTTP/1.1” 200 7200 “http://www.baidu.com.cn” “Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)”

格式分析:
1. 访客ip地址:124.42.13.230
2. 访客用户信息: – –
3. 请求时间:[18/Sep/2013:06:57:50 +0000]
4. 请求方式:GET
5. 请求的url:/shoppingMall?ver=1.10.2
6. 请求所用协议:HTTP/1.1
7. 响应码:200
8. 返回的数据流量:7200
9. 访客的来源url:http://www.baidu.com.cn
10. 访客所用浏览器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

HDFS

Apache Hadoop是用来支持海量数据分布式计算的软件框架,它具备高可靠性,高稳定性,动态扩容,运用简单的计算模型(MapReduce)在集群上进行分布式计算,并支持海量数据的存储。Apache Hadoop主要包含4个重要的模块,一个是 Hadoop Common,支持其它模块运行的通用组件;Hadoop Distributed File System(HDFS), 分布式文件存储系统;Hadoop Yarn,负责计算任务的调度和集群上资源的管理;Hadoop MapReduce,基于Hadoop Yarn的分布式计算框架。在本文的案例中,我们主要用到HDFS作为点击流数据存储,分布式计算框架我们将采用Spark RDDs Operations去替代MapReduce。

要配置Hadoop集群,首先需要配置Hadoop daemons, 它是所有其它Hadoop组件运行所必须的守护进程, 它的配置文件是

etc/hadoop/hadoop-env.sh

# set to the root of your Java installation
export JAVA_HOME=/usr/java/latest

Hadoop的运行需要Java开发环境的支持,一定要显示地标明集群上所有机器的JDK安装目录,即使你自己本机的环境已经配置好了JAVA_HOME,因为Hadoop是通过SSH来启动守护进程的,即便是NameNode启动自己本机的守护进程;如果不显示配置JDK安装目录,那么Hadoop在通过SSH启动守护进程时会找不到Java环境而报错。

在本文的案例中,我们只使用Hadoop HDFS组件,所以我们只需要配置HDFS的守护进程,NameNode daemons,SecondaryNameNode daemons以及DataNode daemons,它们的配置文件主要是core-site.xml和hdfs-site.xml:

etc/hadoop/core-site.xml



<configuration>
   <property>
      <name>fs.defaultFSname>
      <value>hdfs://ymhHadoop:9000value>
   property>
   <property>
       <name>hadoop.tmp.dirname>
       <value>/root/apps/hadoop/tmpvalue>
   property>
configuration>

fs.defaultFS属性是指定用来做NameNode的主机URI;而hadoop.tmp.dir是配置Hadoop依赖的一些系统运行时产生的文件的目录,默认是在/tmp/${username}目录下的,但是系统一重启这个目录下的文件就会被清空,所以我们重新指定它的目录

etc/hadoop/hdfs-site.xml




<configuration>
   <property>
      <name>dfs.replicationname>
      <value>1value>
   property>
    <property>
      <name>dfs.namenode.name.dirname>
      <value>/your/pathvalue>
   property>
   <property>
      <name>dfs.blocksizename>
      <value>268435456value>
   property>
   <property>
      <name>dfs.datanode.data.dirname>
      <value>/your/pathvalue>
   property>

configuration>

dfs.replication 是配置每一份在HDFS系统上的文件有几个备份;dfs.namenode.name.dir 是配置用户自定义的目录存储HDFS的业务日志和命名空间日志,也就是操作日志,集群发生故障时可以通过这份文件来恢复数据。dfs.blocksize,定义HDFS最大的文件分片是多大,默认256M,我们不需要改动;dfs.datanode.data.dir, 用来配置DataNode中的数据Blocks应该存储在哪个文件目录下。

最后把配置文件拷贝到集群的所有机子上,接下来就是启动HDFS集群,如果是第一次启动,记得一定要格式化整个HDFS文件系统

$HADOOP_PREFIX/bin/hdfs namenode -format 

接下来就是通过下面的命令分别启动NameNode和DataNode

$HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
$HADOOP_PREFIX/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script hdfs start datanode

收集用户数据

网站会通过前端JS代码或服务器端的后台代码收集用户浏览数据并存储在网站服务器中。一般运维人员会在离线分析系统和真实生产环境之间部署FTP服务器,并将生产环境上的用户数据每天定时发送到FTP服务器上,离线分析系统就会从FTP服务上采集数据而不会影响到生产环境。
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,Flume是Apache的一个顶级项目,与Hadoop也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。
Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server 产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分发给下一个装在分布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。
本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。
需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。

FTP服务器上的Flume配置文件如下:

    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = spooldir  
    agent.sources.origin.spoolDir = /export/data/trivial/weblogs  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.deserializer.maxLineLength = 2048  

    agent.sources.origin.interceptors = i2  
    agent.sources.origin.interceptors.i2.type = host  
    agent.sources.origin.interceptors.i2.hostHeader = hostname  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 10000  

    agent.sinks.target.type = avro  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hostname = 172.16.124.130  
    agent.sinks.target.port = 4545  

这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

在Hadoop服务器上的配置文件如下:

    agent.sources = origin  
    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = avro  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.bind = 0.0.0.0  
    agent.sources.origin.port = 4545  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 5000000  
    agent.channels.memorychannel.transactionCapacity = 1000000  

    agent.sinks.target.type = hdfs  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S  
    agent.sinks.target.hdfs.filePrefix = data-%{hostname}  
    agent.sinks.target.hdfs.rollInterval = 60  
    agent.sinks.target.hdfs.rollSize = 1073741824  
    agent.sinks.target.hdfs.rollCount = 1000000  
    agent.sinks.target.hdfs.round = true  
    agent.sinks.target.hdfs.roundValue = 10  
    agent.sinks.target.hdfs.roundUnit = minute  
    agent.sinks.target.hdfs.useLocalTimeStamp = true  
    agent.sinks.target.hdfs.minBlockReplicas=1  
    agent.sinks.target.hdfs.writeFormat=Text  
    agent.sinks.target.hdfs.fileType=DataStream  

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。用户分别在日志文件服务器及HDFS服务器端启动如下命令,便可以一直监听是否有新日志产生,然后拉取到HDFS文件系统中:

$ nohup bin/flume-ng agent -n $your_agent_name -c conf -f conf/$your_conf_name &

Spark

Spark是最近特别火的一个分布式计算框架,最主要原因就是快!和男人不一样,在大数据领域,一个框架会不会火,快是除了可靠性之外一个最重要的话语权,几乎所有新出的分布式框架或即将推出的新版本的MapReduce都在强调一点,我很快。Spark官网上给出的数据是Spark程序和中间数据运行在内存上时计算速度是Hadoop的100倍,即使在磁盘上也是比Hadoop快10倍。
每一个Spark程序都是提供了一个Driver进程来负责运行用户提供的程序,这个Driver进程会生成一个SparkContext,负责和Cluster Manager(可以是Spark自己提供的集群管理工具,也可以是Hadoop 的资源调度工具 Yarn)沟通,Cluster负责协调和调度集群上的Worker Node资源,当Driver获取到集群上Worker Node资源后,就会向Worker Node的Executor发送计算程序(通过Jar或者python文件),接着再向Exectutor发送计算任务去执行,Executor会启动多个线程并行运行计算任务,同时还会根据需求在Worker Node上缓存计算过程中的中间数据。需要注意的虽然Worker Node上可以启动多个物理JVM来运行不同Spark程序的Executor,但是不同的Spark程序之间不能进行通讯和数据交换。另一方面,对于Cluster Manager来说,不需要知道Spark Driver的底层,只要Spark Driver和Cluster Manager能互相通信并获取计算资源就可以协同工作,所以Spark Driver能较为方便地和各种资源调度框架整合,比如Yarn,Mesos等。
这里写图片描述
图片来源:http://spark.apache.org/docs/latest/cluster-overview.html

Spark就是通过Driver来发送用户的计算程序到集群的工作节点中,然后去并行计算数据,这其中有一个很重要的Spark专有的数据模型叫做RDD(Resilient
distributed dataset), 它代表着每一个计算阶段的数据集合,这些数据集合可以继续它所在的工作节点上,或者通过“shuffle”动作在集群中重新分发后,进行下一步的并行计算,形成新的RDD数据集。这些RDD有一个最重要的特点就是可以并行计算。RDD最开始有两种方式进行创建,一种是从Driver程序中的Scala Collections创建而来(或者其它语言的Collections),将它们转化成RDD然后在工作结点中并发处理,另一种就是从外部的分布式数据文件系统中创建RDD,如HDFS,HBASE或者任何实现了Hadoop InputFormat接口的对象。

对于Driver程序中的Collections数据,可以使用parallelize()方法将数据根据集群节点数进行切片(partitions),然后发送到集群中并发处理,一般一个节点一个切片一个task进行处理,用户也可以自定义数据的切片数。而对于外部数据源的数据,Spark可以从任何基于Hadoop框架的数据源创建RDD,一般一个文件块(blocks)创建一个RDD切片,然后在集群上并行计算。

在Spark中,对于RDDs的计算操作有两种类型,一种是Transformations,另一种是Actions。Transformations相当于Hadoop的Map组件,通过对RDDs的并发计算,然后返回新的RDDs对象;而actions则相当于Hadoop的Reduce组件,通过计算(我们这里说的计算就是function)汇总之前Transformation操作产生的RDDs对象,产生最终结果,然后返回到Driver程序中。特别需要说明的是,所有的Transformations操作都是延迟计算的(lazy), 它们一开始只会记录这个Transformations是用在哪一个RDDs上,并不会开始执行计算,除非遇到了需要返回最终结果到Driver程序中的Action操作,这时候Transformations才会开始真正意义上的计算。所以用户的Spark程序最后一步都需要一个Actions类型的操作,否则这个程序并不会触发任何计算。这么做的好处在于能提高Spark的运行效率,因为通过Transformations操作创建的RDDs对象最终只会在Actions类型的方法中用到,而且只会返回包含最终结果的RDDs到Driver中,而不是大量的中间结果。有时候,有些RDDs的计算结果会多次被重复调用,这就触发多次的重复计算,用户可以使用persist()或者cache()方法将部分RDDs的计算结果缓存在整个集群的内存中,这样当其它的RDDs需要之前的RDDs的计算结果时就可以直接从集群的内存中获得,提高运行效率。

在Spark中,另外一个需要了解的概念就是“Shuffle”,当遇到类似“reduceByKey”的Actions操作时,会把集群上所有分片的RDDs都读一遍,然后在集群之间相互拷贝并全部收集起来,统一计算这所有的RDDs,获得一个整体的结果而不再是单个分片的计算结果,接着再重新分发到集群中或者发送回Driver程序。在Shuffle过程中,Spark会产生两种类型的任务,一种是Map task,用于匹配本地分片需要shuffle的数据并将这些数据写入文件中,然后Reduce task就会读取这些文件并整合所有的数据。 所以说”Shuffle”过程会消耗许多本地磁盘的I/O资源,内存资源,网络I/O,附带还会产生许多的序列化过程。通常,repartition类型的操作,比如:repartitions和coalesce,ByKey类型的操作,比如:reduceByKey,groupByKey,join类型的操作,如:cogroup和join等,都会产生Shuffle过程。

接下来,来谈一谈Spark在集群环境下的一些特性,这部分内容非常非常重要,请大家一定要重点理解。首先,读者们一定要记住,Spark是通过Driver把用户打包提交的Spark程序序列化以后,分发到集群中的工作节点上去运行,对于计算结果的汇总是返回到Driver端,也就是说通常用户都是从Driver服务器上获取到最终的计算结果!在这个大前提下我们来探讨下面几个问题:
1. 关于如何正确地将函数传入RDD operation中,有两种推荐的方式,一种就是直接传函数体,另一种是在伴生对象中创建方法,然后通过类名.方法名的方式传入;如下面的代码所示

object DateHandler {
  def parseDate(s: String): String = { ... }
}

rdd.map(DateHandler.parseDate)

错误的传函数的方式如下:

Class MySpark {
 def parseDate(s: String): String = { ... }
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => this.parseDate(x))}
}
…………
val myspark = new MySpark
myspark.rddOperation(sc.rdd)
这样子的传递方式会把整个mySpark对象序列化后传到集群中,会造成不必要的内存开支。
因为向map中传入的“this.parseDate(x)”是一个对象实例和它里面的函数。

当在RDD operation中访问类中的变量时,也会造成传递整个对象的开销,比如:

Class MySpark {
 val myVariable
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => x + myVariable)}
}
这样也相当于x => this.x + myVariable,又关联了这个对象实例,
解决方法就是把这个类的变量传入方法内部做局部变量,
就会从访问对象中的变量变为访问局部变量值
def rddOperation(rdd:RDD[String]):RDD[String] = {val _variable = this.myVariable;rdd.map(x => x + _variable)}

2.第二个特别需要注意的问题就是在RDD operations中去更改一个全局变量,
在集群环境中也是很容易出现错误的,注意下面的代码:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

这段代码最终返回的结果还是0。这是因为这段代码连同counter是序列化后分发到集群上所有的节点机器上,不同的节点上拥有各自独立的counter,并不会是原先Driver上counter的引用,并且统计的值也不一样,最后统计结果也不会返回给Driver去重新赋值。Driver主机上的counter还是它原来的值,不会发生任何变化。如果需要在RDD operations中操作全局变量,就需要使用accumulator()方法,这是一个线程安全的方法,能在并发环境下原子性地改变全局变量的值。

3.对于集群环境下的Spark,第三个重要的是如何去合理地打印RDDs中的值。如果只是使用rdd.foreach(println()) 或者 rdd.map(println())是行不通的,一定要记住,程序会被分送到集群的工作节点上各自运行,println方法调用的也是工作节点上的输入输出接口,而用户获取数据和计算结果都是在Driver主机上的,所以是无法看到这些打印的结果。解决方法之一就是打印前将所有数据先返回Driver,如rdd.collect().foreach(println),但是这可能会让Driver瞬间耗光内存,因为collect操作将集群上的所有数据全部一次性返回给Driver。较为合理的操作为使用take() 方法先获取部分数据,然后再打印,如:rdd.take(100).foreach(println)。
4. 另外需要补充说明的是foreach(func)这个Action操作,它的作用是对集群上每一个datasets元素执行传入的func方法,这个func方法是在各个工作节点上分别执行的。虽然foreach是action操作,但是它并不是先全部将数据返回给Driver然后再在Driver上执行func方法,它返回的给Driver的Unit,这点要特别注意。所以foreach(func)操作里传入的func函数对Driver中的全局变量的操作或者打印数据等操作对于Driver来说都是无效的,这个func函数只运行在工作节点上。
5. 最后要提的是Spark的共享变量,其中一个共享变量就是使用accumulator方法封装的变量,而另一个共享变量就是广播变量(Broadcast Variables)。在谈广播变量之前,大家需要了解一个概念叫“stage”,每次进行shuffle操作之前的所有RDDs的操作都属于同一个stage。所以每次在shuffle操作时,上一个stage计算的结果都会被Spark封装成广播变量,并通过一定的高效算法将这些计算结果在集群上的每个节点里都缓存上一份,并且是read-only的,这样当下一个stage的任务再次需要之前stage的计算结果时就不用再重新计算了。用户可以自定义广播变量,一般是在某个stage的datasets需要被后续多个stage的任务重复使用的情况下设置会比较有意义。

日志清洗

当Flume从日志服务器上获取到Nginx访问日志并拉取到HDFS系统后,我们接下来要做的就是使用Spark进行日志清洗。
首先是启动Spark集群,Spark目前主要有三种集群部署方式,一种是Spark自带Standalone模式做为cluster manager,另外两种分别是Yarn和Mesos作为cluster manager。在Yarn的部署方式下,又细分了两种提交Spark程序的模式,一种是cluster模式,Driver程序直接运行在Application Master上,并直接由Yarn管理,当程序完成初始化工作后相关的客户端进程就会退出;另一种是client模式,提交程序后,Driver一直运行在客户端进程中并和Yarn的Application Master通信获取工作节点资源。在Standalone的部署方式下,也同样是细分了cluster模式和client模式的Spark程序提交方式,cluster模式下Driver是运行在工作节点的进程中,一旦完成提交程序的任务,相关的客户端进程就会退出;而client模式中,Driver会一直运行在客户端进程中并一直向console输出运行信息。本文案例中,使用Standalone模式部署Spark集群,同时我们选择手动部署的方式来启动Spark集群:

//启动 master 节点 启动完后可以通过 localhost:8080 访问Spark自带的UI界面
./sbin/start-master.sh

//启动 Worker 节点 
./sbin/start-slave.sh spark://HOST:PORT

//然后通过spark-submit script 提交Spark程序
//默认是使用client模式运行,也可以手动设置成 cluster模式
//--deploy-mode cluster
$bin/spark-submit --class com.guludada.Spark_ClickStream.VisitsInfo --master spark://ymhHadoop:7077 --executor-memory 1G --total-executor-cores 2 /export/data/spark/sparkclickstream.jar

下面是清洗日志的Spark代码,主要是过滤掉无效的访问日志信息:

package com.guludada.Spark_ClickStream

import scala.io.Source
import java.text.SimpleDateFormat;
import java.util.Locale;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.util.Date;

class WebLogClean extends Serializable {

  def weblogParser(logLine:String):String =  {

      //过滤掉信息不全或者格式不正确的日志信息
      val isStandardLogInfo = logLine.split(" ").length >= 12;

      if(isStandardLogInfo) {

        //过滤掉多余的符号
        val newLogLine:String = logLine.replace("- - ", "").replaceFirst("""\[""", "").replace(" +0000]", "");
        //将日志格式替换成正常的格式
        val logInfoGroup:Array[String] = newLogLine.split(" ");
        val oldDateFormat = logInfoGroup(1);
        //如果访问时间不存在,也是一个不正确的日志信息
        if(oldDateFormat == "-") return ""
        val newDateFormat = WebLogClean.sdf_standard.format(WebLogClean.sdf_origin.parse(oldDateFormat)) 
        return newLogLine.replace(oldDateFormat, newDateFormat)

      } else {

        return ""

      }
  }
}

object WebLogClean {

   val sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   def main(args: Array[String]) {

    val curDate = new Date(); 
    val weblogclean = new WebLogClean
    val logFile = "hdfs://ymhHadoop:9000/flume/events/"+WebLogClean.sdf_hdfsfolder.format(curDate)+"/*" // Should be some file on your system
    val conf = new SparkConf().setAppName("WebLogCleaner").setMaster("local")
    val sc = new SparkContext(conf)
    val logFileSource = sc.textFile(logFile,1).cache()

    val logLinesMapRDD = logFileSource.map(x => weblogclean.weblogParser(x)).filter(line => line != "");
    logLinesMapRDD.saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogClean.sdf_hdfsfolder.format(curDate)) 

  }

}

经过清洗后的日志格式如下:
这里写图片描述

接着为每一条访问记录拼上sessionID

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.UUID;
import java.util.Date;

class WebLogSession {

}

object WebLogSession {

   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(1)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(1)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def distinctLogInfoBySession(logInfoGroup:List[String]):List[String] = {

       val logInfoBySession:ListBuffer[String] = new ListBuffer[String]
       var lastRequestTime:Long = 0;
       var lastSessionID:String = "";

       for(logInfo <- logInfoGroup) {

         //某IP的用户第一次访问网站的记录做为该用户的第一个session日志
         if(lastRequestTime == 0) {

           lastSessionID = UUID.randomUUID().toString();
           //将该次访问日志记录拼上sessionID并放进按session分类的日志信息数组中
           logInfoBySession += lastSessionID + " " +logInfo
           //记录该次访问日志的时间,并用户和下一条访问记录比较,看时间间隔是否超过30分钟,是的话就代表新Session开始
           lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

         } else {

           //当前日志记录和上一次的访问时间相比超过30分钟,所以认为是一个新的Session,重新生成sessionID
           if(sdf_standard.parse(logInfo.split(" ")(1)).getTime - lastRequestTime >= 30 * 60 * 1000) {
               //和上一条访问记录相比,时间间隔超过了30分钟,所以当做一次新的session,并重新生成sessionID
               lastSessionID = UUID.randomUUID().toString();
               logInfoBySession += lastSessionID + " " +logInfo
               //记录该次访问日志的时间,做为一个新session开始的时间,并继续和下一条访问记录比较,看时间间隔是否又超过30分钟
               lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

           } else { //当前日志记录和上一次的访问时间相比没有超过30分钟,所以认为是同一个Session,继续沿用之前的sessionID

               logInfoBySession += lastSessionID + " " +logInfo
           }           
         }         
       }
       return logInfoBySession.toList
   }

   def main(args: Array[String]) {



      val curDate = new Date(); 
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("WebLogSession").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile, 1).cache()

      //将log信息变为(IP,log信息)的tuple格式,也就是按IP地址将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(IP[String],log信息[Iterator])中的日志按时间的升序排序
      //(其实这一步没有必要,本来Nginx的日志信息就是按访问先后顺序记录的,这一步只是为了演示如何在Scala语境下进行自定义排序) 
      //排完序后(IP[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => WebLogSession.dateComparator(A,B)))

      //将每一个IP的日志信息按30分钟的session分类并拼上session信息
      val logInfoBySessionRDD = sortedLogRDD.map(WebLogSession.distinctLogInfoBySession(_))
      //将List中的日志信息拆分成单条日志信息输出
      val logInfoWithSessionRDD =  logInfoBySessionRDD.flatMap(line => line).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   } 
}

拼接上sessionID的日志如下所示:
这里写图片描述

最后一步就是根据SessionID来整理用户的浏览信息,代码如下:

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date;

class VisitsInfo {

}

object VisitsInfo {

  val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
  val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(2)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(2)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def getVisitsInfo(logInfoGroup:List[String]):String = {

     //获取用户在该次session里所访问的页面总数
     //先用map函数将某次session里的所有访问记录变成(url,logInfo)元组的形式,然后再用groupBy函数按url分组,最后统计共有几个组
    val visitPageNum = logInfoGroup.map(log => (log.split(" ")(4),log)).groupBy(x => x._1).count(p => true)

    //获取该次session的ID
    val sessionID = logInfoGroup(0).split(" ")(0)

    //获取该次session的开始时间
    val startTime = logInfoGroup(0).split(" ")(2)

    //获取该次session的结束时间
    val endTime = logInfoGroup(logInfoGroup.length-1).split(" ")(2)

    //获取该次session第一次访问的url
    val entryPage = logInfoGroup(0).split(" ")(4)

    //获取该次session最后一次访问的url
    val leavePage = logInfoGroup(logInfoGroup.length-1).split(" ")(4)

    //获取该次session的用户IP
    val IP = logInfoGroup(0).split(" ")(1)

    //获取该次session的用户从哪个网站过来
    val referal = logInfoGroup(0).split(" ")(8)

     return sessionID + " " + startTime + " " + endTime + " " + entryPage + " " + leavePage + " " + visitPageNum + " " + IP + " " + referal;

   }

   def main(args: Array[String]) {

      val curDate = new Date();      
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("VisitsInfo").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile,1).cache()

      //将log信息变为(session,log信息)的tuple格式,也就是按session将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(session[String],log信息[Iterator])中的日志按时间的升序排序
      //排完序后(session[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => VisitsInfo.dateComparator(A,B)))

      //统计每一个单独的Session的相关信息
      sortedLogRDD.map(VisitsInfo.getVisitsInfo(_)).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/visits_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   }
}

最后整理出来的日志信息的格式和示例图:
SessionID 访问时间 离开时间 第一次访问页面 最后一次访问的页面 访问的页面总数 IP Referal
Session1 2016-05-30 15:17:00 2016-05-30 15:19:00 /blog/me /blog/others 5 192.168.12.130 www.baidu.com
Session2 2016-05-30 14:17:00 2016-05-30 15:19:38 /home /profile 10 192.168.12.140 www.178.com
Session3 2016-05-30 12:17:00 2016-05-30 15:40:00 /products /detail 6 192.168.12.150 www.78dm.com

这里写图片描述

Hive

Hive是一个数据仓库,让用户可以使用SQL语言操作分布式存储系统中的数据。在客户端,用户可以使用如何关系型数据库一样的建表SQL语句来创建数据仓库的数据表,并将HDFS中的数据导入到数据表中,接着就可以使用Hive SQL语句非常方便地对HDFS中的数据做一些增删改查的操作;在底层,当用户输入Hive Sql语句后,Hive会将SQL语句发送到它的Driver进程中的语义分析器进行分析,然后根据Hive SQL的语义转化为对应的Hadoop MapReduce程序来对HDFS中数据来进行操作;同时,Hive还将表的表名,列名,分区,属性,以及表中的数据的路径等元数据信息都存储在外部的数据库中,如:Mysql或者自带的Derby数据库等。
Hive中主要由以下几种数据模型组成:
1. Databases,相当于命名空间的作用,用来避免同名的表,视图,列名的冲突,就相当于管理同一类别的一组表的库。具体的表现为HDFS中/user/hive/warehouse/中的一个目录。
2. Tables,是具有同一模式的数据的抽象,简单点来说就是传统关系型数据库中的表。具体的表现形式为Databases下的子目录,里面存储着表中的数据块文件,而这些文件是从经过MapReduce清洗后的贴源数据文件块拷贝过来的,也就是使用Hive SQL 中的Load语句,Load语句就是将原先HDFS系统中的某个路径里的数据拷贝到/user/hive/warehouse/路径里的过程,然后通过Mysql中存储的元数据信息将这些数据和Hive的表映射起来。
3. Partitions,创建表时,用户可以指定以某个Key值来为表中的数据分片。从Tables的层面来讲,Partition就是表中新加的一个虚拟字段,用来为数据分类,在HDFS文件系统中的体现就是这个表的数据分片都按Key来划分并进入到不同的目录中,但是Hive不会保证属于某个Key的内容就一定会进入到某个分片中,因为Hive无法感知,所以需要用户在插入数据时自己要将数据根据key值划分到所对应的数据分片中,这样在以后才能提高查询效率。
4. Buckets(Clusters),是指每一个分片上的数据根据表中某个列的hash值组织在一起,也就是进入到同一个桶中,这样能提升数据查询的效率。分桶最大的意义在于增加join的效率。比如 select user.id, user.name,admin.tele from user join admin on user.id=admin.id, 已经根据id将数据分进不同的桶里,两个数据表join的时候,只要把hash结果相同的桶直接相连就行,提高join的效率。一般两张表的分桶数量要一致,才能达到join的最高效率,如果是倍数关系,也会提高join的效率但没有一致数量的分桶效率高,如果不是倍数关系分桶又不一致,那么效率和没分桶没什么区别。

Spark SQL

在作者之前的Hadoop文章里,使用MapReduce清洗完日志文件后,在Hive的客户端中使用Hive SQL去构建对应的数据仓库并对数据进行分析。和之前不同的是,在本篇文章中, 作者使用的是Spark SQL去对Hive数据仓库进行操作。因为文章篇幅有限,下面只对Spark SQL进行一个简单的介绍,更多具体的内容读者们可以去阅读官方文档。

Spark SQL是Spark项目中专门用来处理结构化数据的一个模块,用户可以通过SQL,DataFrames API,DataSets API和Spark SQL进行交互。Spark SQL可以通过标准的SQL语句对各种数据源中的数据进行操作,如Json,Parquet等,也可以通过Hive SQL操作Hive中的数据;DataFrames是一组以列名组织的数据结构,相当于关系型数据库中的表,DataFrames可以从结构化的数据文件中创建而来,如Json,Parquet等,也可以从Hive中的表,外部数据库,RDDs等创建出来;Datasets是Spark1.6后新加入的API,类似于RDDs,可以使用Transformations和Actions API 操作数据,同时提供了很多运行上的优化,并且用Encoder来替代Java Serialization接口进行序列化相关的操作。

DataFrames可以通过RDDs转化而来,其中一种转化方式就是通过case class来定义DataFrames中的列结构,也可以说是表结构,然后将RDDs中的数据转化为case class对象,接着通过反射机制获取到case class对表结构的定义并转化成DataFrames对象。转化成DF对象后,用户可以方便地使用DataFrames提供的“domain-specific”操作语言来操作里面的数据,亦或是将DataFrames对象注册成其对应的表,然后通过标准SQL语句来操作里面的数据。总之,Spark SQL提供了多样化的数据结构和操作方法让我们能以SQL语句方便地对数据进行操作,减少运维和开发成本,十分方便和强大!

而在本案例里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。
Visits数据分析
页面具体访问记录Visits的事实表和维度表结构
这里写图片描述

接下来启动spark shell,然后使用Spark SQL去操作Hive数据仓库

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

在spark shell顺序执行如下命令操作Hive数据仓库,在此过程中,大家会发现执行速度比在Hive客户端中快很多,原因就在于使用Spark SQL去操作Hive,其底层使用的是Spark RDDs去操作HDFS中的数据,而不再是原来的Hadoop MapReduce。

//创建HiveContext对象,并且该对象继承了SqlContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

//在数据仓库中创建Visits信息的贴源数据表:
sqlContext.sql("create table visitsinfo_spark(session string,startdate string,enddate string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate) into 4 buckets row format delimited fields terminated by ' '")

//将HDFS中的数据导入到HIVE的Visits信息贴源数据表中
sqlContext.sql("load data inpath '/spark_clickstream/visits_log/16-07-18' overwrite into table visitsinfo_spark partition(inputDate='2016-07-27')")

这里写图片描述

//  根据具体的业务分析逻辑创建ODS层的Visits事实表,并从visitsinfo_spark的贴源表中导入数据
sqlContext.sql("create table ods_visits_spark(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' '")

sqlContext.sql("insert into table ods_visits_spark partition(inputDate='2016-07-27') select vi.session,vi.startdate,vi.enddate,vi.entrypage,vi.leavepage,vi.viewpagenum,vi.ip,vi.referal from visitsinfo_spark as vi where vi.inputDate='2016-07-27'")

//创建Visits事实表的时间维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_time_spark(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' '")

// 将“访问时间”和“离开时间”两列的值合并后再放入时间维度表中,减少数据的冗余
sqlContext.sql("insert overwrite table ods_dim_visits_time_spark partition(inputDate='2016-07-27') select distinct ov.timeparam, substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits_spark as ov1 union select ov2.leavetime as timeparam from ods_visits_spark as ov2) as ov")

这里写图片描述

//创建visits事实表的URL维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_url_spark(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields terminated by ' '")

//将每个session的进入页面和离开页面的URL合并后存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits_spark as ov1 union select ov2.leavepage as pageurl from ods_visits_spark as ov2 ) as ov lateral view parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query")

//将每个session从哪个外站进入当前网站的信息存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.referal,b.host,b.path,b.query from ods_visits_spark as ov lateral view parse_url_tuple(substr(ov.referal,2,length(ov.referal)-2),'HOST','PATH','QUERY') b as host,path,query")

这里写图片描述

//查询访问网站页面最多的前20个session的信息
sqlContext.sql("select * from ods_visits_spark as ov sort by viewpagenum desc").show()

这里写图片描述

Troubleshooting

使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题

需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。

使用Flume拉取到HDFS中的文件格式错乱

这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致

启动Spark任务的时候会报任务无法序列化的错误

这里写图片描述
而这个错误的主要原因是Driver向worker通过RPC通信发送的任务无法序列化,很有可能就是用户在使用transformations或actions方法的时候,向这个方法中传入的函数里包含不可序列化的对象,如上面的程序中 logFileSource.map(x => weblogclean.weblogParser(x)) 向map中传入的函数包含不可序列化的对象weblogclean,所以要将该对象的相关类变为可序列化的类,通过extends Serializable的方法解决

在分布式环境下如何设置每个用户的SessionID

可以使用UUID,UUID是分布式环境下唯一的元素识别码,它由日期和时间,时钟序列,机器识别码(一般为网卡MAC地址)三部分组成。这样就保证了每个用户的SessionID的唯一性。

使用maven编译Spark程序时报错

在使用maven编译Spark程序时会报错,[ERROR] error: error while loading CharSequence, class file ‘/Library/Java/JavaVirtualMachines/jdk1.8.0_77.jdk/Contents/Home/jre/lib/rt.jar(java/lang/CharSequence.class)’ is broken
如图:
这里写图片描述
主要原因是Scala 2.10 和 JDK1.8的版本冲突问题,解决方案只能是将JDK降到1.7去编译

要在Spark中使用HiveContext,配置完后启动spark-shell报错

要在Spark中使用HiveContext,将所需的Hive配置文件拷贝到Spark项目的conf目录下,并且把连接数据库的Driver包也放到了Spark项目中的lib目录下,然后启动spark-shell报错,主要还是找不到CLASSPATH中的数据库连接驱动包,如下图:
这里写图片描述
这里写图片描述
目前作者想到的解决方案比较笨拙:就是启动spark-shell的时候显示地告诉驱动jar包的位置

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

from:http://blog.csdn.net/ymh198816/article/details/52014315