All posts by dotte

谷歌大神Jeff Dean:大规模深度学习最新进展

在AlphaGo与李世石比赛期间,谷歌天才工程师Jeff Dean在Google Campus汉城校区做了一次关于智能计算机系统的大规模深度学习(Large-Scale Deep Learning for Intelligent Computer Systems)的演讲。本文是对他这次演讲的总结。完整演讲视频(如下):

如果你无法理解信息里包含的内容,那么就会很难将其组织起来。

自从AlphaGo与李世石的比赛——这是约翰·亨利对战蒸汽锤的现代版本——吸引了全世界,再次滋生了对「人工智能毁灭世界」的恐惧感,似乎此时一睹Jeff的演讲是绝佳时刻。如果你认为AlphaGo现在很好,就等待它的beta版本吧。

Jeff当然提到了谷歌的著名语录:组织这个世界的信息,使信息唾手可得并变得有用。

过去,我们可能会将「组织」和收集、清除、存储、索引、报告和搜索数据联系起来。所有这些都是谷歌早期精通的业务。而这些任务完成后,谷歌已经开始进行下一项挑战了。

现在,组织意味着理解。

此次演讲的一些重点:

真正的神经网络由几亿个参数组成。谷歌现在所拥有的技能在于如何建造并快速训练这些大型模型来处理大量数据集,并用它们去解决实际问题,之后快速将这些模型部署到不同平台上的大量产品中(手机、传感器、云等等)。

神经网络在90年代没有得到快速发展是由于缺乏足够的计算能力和大型的数据集。你能看到谷歌对算法的天然热爱是如何与他们的大量基础设施结合到一起的,也能看到不断扩大的数据集如何为谷歌的人工智能创造了完美的推动。

谷歌和其他公司的一个关键区别就在于,当他们在2011年启动谷歌大脑计划时,他们并没有将他们的研究独立成该公司一个单独的研究部门,成为象牙塔一般的存在。而是项目团队和其他团队紧密合作,比如安卓、Gmail 和photo等部门,以确实改进它们的特性,解决困难的问题。这对每一家公司来说都是非常珍贵的一刻。通过和你的人一起工作将研究进行实际应用。

这一想法十分强大:他们知道他们能够获取完整的子系统,有些可能是机器学习到的,用更加通用的端对端的机器学习块进行替换。通常当你有很多复杂的子系统时,总会有很多复杂的代码将这些系统拼接起来。如果能够用数据和非常简单的算法将这一切进行替换的话就再好不过了。

机器学习很快将会变得更好。引用Jeff的话说:机器学习领域的发展非常快。一篇论文发布出来,一周内全球众多研究团体会下载这篇论文,阅读、解析论文,验证论文的内容,然后把自己对论文的延展发布到arXiv.org上。这与计算机学的其他领域不同,他们首先需要提交文件,而后六个月会议讨论决定是否接收,再过三个月会议上才会有结果。这就耗费了一年时间。电子论文能把这个时间压缩到一周是非常惊人的。

技术能够非常神奇的结合起来。谷歌翻译团队写了一个APP,能够使用计算机视觉在取景器上识别文本。在翻译完文本后,可以把翻译后的内容自动添加到图片上。另外一个例子是写图片字幕。把图片识别和一序列一序列的神经网络结合起来。可以想象,这些模块化的内容在未来将何等紧密的结合起来。

有强大功能的模型要小到足以在智能手机上运行。科技想要想取代智力必须走到这一步。它不能依靠网络连接外部的「云大脑」。既然TensorFlow模型能够在手机上运行,那这一点是有可能实现的。

如果你还没有思考深度神经网络如何解决数据理解问题,那你就要开始思考了。这条起始线从现在开始,但它的实现是非常明了的,我们看到了很多难题在深度学习网络面前都迎刃而解。

Jeff 发表的讲话都非常的棒,这次毫不例外。内容非常直接有趣,有深度,还非常容易理解。如果你想了解深度学习或了解Googel打算做什么,这些内容就值得一看了。

理解意味着什么?

当一个人看到街道景象时,他能轻而易举地挑选出图片上的文本,了解到有的商店卖纪念品,有家店价格特别低等信息。但直到现在,计算机依然不能从图片中提取出这些信息。

170007kiymh23gx8zg28i7.jpg

如果计算机想要从图片中了解现实世界,它需要能够从中挑选出有趣的信息点,阅读文本并理解它。

在未来,小型移动设备将主宰着计算机交互。这些设备都需要不同类型的界面。需要真的能够理解并生成对话。

我们在搜索引擎中输入:[汽车零部件]。旧的谷歌版本会因为关键词匹配给你第一条结果,但更好的结果其实是第二个结果。真正的理解是这个问题深层次的意义是什么,并非字眼的表面意义。这才是构建好的搜索与语言理解产品所需要的。

170008plfww7fk2woqw75o.jpg

谷歌深度神经网络小历史

谷歌大脑计划于2011年启动,聚焦于真正推动神经网络科学能达到的最先进的技术。

神经网络已经存在很多年了,出现于19世纪60年代至70年代之间,在80年代晚期和90年代早期红极一时,然后逐渐暗淡。主要因为两个问题:1)缺乏必备的计算能力去训练大量的模型,这意味着神经网络不能应用于包含大量有趣的数据集的大型问题。2)缺乏大量的有趣的数据集。

谷歌开始只有几个产品团队工作。随着这些团队发布一些很好的、能解决以前不能解决的问题的产品。名声渐起,很快,更多的团队加入其中帮助解决问题。

谷歌需要利用深度学习技术的产品/领域:安卓,Apps,药物发现,谷歌邮箱,图像理解,地图,自然语言,图片,机器人,语音翻译,等等。

深度学习能应用于如此完全不同的项目的原因是他们涉及相同的基石,这些基石可用于不同的领域:语音、文本、搜索查询、图像、视频、标签、实体(一种特定的软件模块)、文字、音频特性。你可以输入一种类型的信息,决定你想要输出信息类型,收集训练数据集指示出你想要计算的功能。然后,你可以放手不管了。

这些模型十分奏效,因为你输入的是非常原始的数据。你不必给出数据大量的有趣特点,模型的力量足以让它自动地通过观察许多许多例子决定数据集的有趣之处。

你可以学习常见的表征,这种学习很可能是跨领域的。例如,一辆『汽车』可以指图像中与真实相同的汽车。

他们已经学到他们可以聚集一大堆的子系统,其中一些可能是由机器学习的,然后用更通用的端对端的机器学习块代替它。通常当你有很多复杂的子系统时,往往有大量复杂的代码将这些子系统缝结在一起。如果你能用数据和简单的算法代替所有复杂代码,那就太好了。

什么是单个深度神经网络?

神经网络从数据中学习真正复杂的函数。从一端输入内容转换成另一端的输出内容。

这一函数不像计算x2,而是真正复杂的函数。当你输入原始像素,比如一只猫是,输出结果就会是事物的类别。

170050i80a2do1o3zy80yg.png

深度学习中的「深度」是指神经网络的层的数量。

对于深度,一个好的属性是系统是由简单的可训练的数学函数的集合构成的。

深度神经网络与大量机器学习方式是兼容的。

例如,你输入猫的图片,输出的是一张人为标注为猫的图像,这叫作监督式学习。你可以给系统列举大量的监督式样例,并且将学习结合一个函数,这个函数与在监督式例子所描述的是相似的。

你也可以进行非监督式训练,你只得到图像而不知道图像里面的什么。然后系统可以依靠在众多图片中出现的模式学会挑选。所以,即使不知道图像叫作什么,它也可以在所有这些有猫的图形辨别出共同的事物来。

这也和更多像强化学习这样的外来技术是兼容的。强化学习是非常重要的技术,它正在被AlphaGo使用。

什么是深度学习?

神经网络模型可以说是基于我们所认识的大脑运作的方式,它并不是对神经元真正工作的详细模拟,而是一个简单抽象的神经元版本。

170008hwjxwajxw66wawwm.jpg

一个神经元能够接收许多输入信息,真实的神经元会将不同的优势(strengths)与不同的输入相联系。人工智能网络试着学习为所有那些边缘,亦即与这些不同输入关联的优势进行加权。

真实的神经元吸收一些输入与优势的组合,并决定是否发出一个脉冲。人工神经元不仅仅会发出脉冲,还会发出一个实数值。这些神经元计算的函数是输入的加权求和乘以非线性函数的权重。

现今通常所用的非线性函数是ReLU(max(0,x))。在上世纪九十年代,大部分非线性函数都是更加平滑 (https://www.quora.com/What-is-special-about-rectifier-neural-units-used-in-NN-learning)的 sigmoid或tanh函数。当神经元不放电的时候会取真正的零值,而不是非常接近零的数值的优秀特性,从而帮助优化系统。

例如,如果神经元有着三个输入X1,X2,X3,分别有着0.21,0.3,0.7的权重,那么计算函数将为:y = max(0, -.0.21*x1 + 0.3*x2 + 0.7*x3)。

在识别图片里是一只猫还是一只狗的过程中,图像会经过多层级处理,基于它们的输入神经元可以决定是否发射脉冲。

170008mv90e91mit1zwtm1.jpg

最底层的神经元只处理一小部分像素,更高层的神经元则会处理下层神经元的输出并决定是否发射脉冲。

模型会如此向上直至最后一层处理完毕,举个例子,这是一只猫。在这种情况下它错了,这是一只狗(尽管我也认为那是一只猫,那是一只在篮子里的狗吗?)。

输出错误的信号会反馈回系统中,接着其余模型会做出调整以让它在下一次处理图片时更有可能给出正确的答案。

调整整个模型所有的边缘权重以增大获得正确结果的可能性,这就是神经网络的目标。人们在所有的样本都如此处理,这样在大部分的样本中都会得到正确的输出。

学习算法非常简单。循环计算步骤如下:

随机选择一个训练样本「(输入,标签)」。例如,一张猫的图片,以及预期输出「猫」。

用「输入」运行神经网络,并观察它的结果。

调整边缘权重,让输出更接近与标签」。

该如何调整边缘权重以让输出接近标签呢?

反向传播法:这里是一篇针对此的推荐文章:Calculus on Computational Graphs: Backpropagation (http://colah.github.io/posts/2015-08-Backprop/)。

当神经网顶层选择的是猫而不是狗的时候,通过微积分链式法则来调整权重参数使得网络可以做更准确的预测。

170008znz6akknu8k6v7as.jpg

你需要和权重的箭头保持同一方向,让它更有可能认为这是一只狗。不要跳一大步,因为这可是一个复杂坎坷的表面。小步前进会让结果在下一次更有可能变成狗。通过大量迭代以及对样本的观察,结果就越有可能变成狗。

通过链式法则你可以理解底层的参数变化会如何影响输出。这意味着神经网络网络的变化如同涟漪般波及至输入,调整整个模型,并增大它说出狗的可能性。

真的神经网络由数以亿计参数组成,因此你正在一个亿维空间内做调整,并试着理解那是怎样影响网络输出结果的。

神经网络的很多优秀特性

神经网络可以运用到多个不同领域,用来解决不同的问题:

文本:英语和其他语言包含数万亿的单词。现有很多对应的文字资料,包含句与句对应的一种源语言文字与其翻译版的另一种语言文字。

视觉数据:数十亿的图像和视频。

声音:每天会产生几万小时的音频数据;

用户行为:不同的应用程序都在产生数据,无论你在搜索引擎敲下的字符还是在邮箱里标记的垃圾邮件,这些用户行为里可以不断被学习,并用来给你「定制」智能系统。

知识图谱:数十亿打标签的RDF triple数据。

你给的数据越多,其反馈的结果越好,你也会让这个模型更大。

如果你投入更多的数据却不去扩大你的模型,会进入一个模型能力的饱和状态,此时,模型学习到的只是关于你的数据集最显而易见的事实。

通过增加模型的规模,模型不仅可以记住一些明显的特征,还会记住一些只是偶然在数据集中出现的细微特征。

打造更大的模型需要更多数据和更强大的计算能力。谷歌一直在做的就是如何规模化计算量并投入到这些问题的解决中,从而训练更大的模型。

深度学习给谷歌带来哪些影响?

语音识别

语音识别团队第一个和谷歌大脑团队合作部署神经网络。在谷歌大脑团队帮助下,部署上线了一个新的、基于神经网络的语音模型,不再使用之前的隐马尔科夫模型。

声学模型的问题是从150毫秒的语音里预测其中10毫秒的声音是什么。类似与「ba」还是「ka」。接着你有了这些预测的完整序列,然后将它们和语言模型对接起来,以理解用户在说什么。

这个模型将识别错误率降低了30%,意义非常重大。此后语音团队继续在构建更加复杂的模型,并结合更好的神经网络降低错误率。现在你在手机上说话,语音识别已经比三到五年前好太多了。

Image 挑战赛

大约六年前, ImageNet的数据库公开,大约有100万图像数据,这个巨大的图像数据库对于推进计算机视觉的发展意义重大。

图像被分为1000个不同种类,每个种类大约1000张照片;

大约有1000张不同的豹子照片、1000张不同的汽车、滑板车照片等等;

其中有个复杂的因素:并非所有的标签都是正确的;

比赛的目标是概括出照片的新的类型。对于一张新照片,你能判断出来上面是猎豹还是樱桃吗?

在神经网络运用到比赛之前,这项比赛的错误率为26℅。2014年,谷歌赢得比赛时的错误率为6.66%。2015年的时候,获胜团队的错误率降低到3.46%。

这是一个巨大而且有深度的模型。每个盒子都布满了完整层级的神经元,它们正在进行卷积运算,关于这方面的详细情况,可以查看这篇论文《Going Deeper with Convolutions》

170008vm9rb9h9j5jqabzi.jpg

一个名叫 Andrej Karpathy 的人也参与了比赛,他的错误率是5.1%,他后来还写了篇文章《What I learned from competing against a ConvNet on ImageNet.》

神经网络模型擅长什么?

神经网络模型非常擅长识别精细程度的差别。比如,计算机擅长辨别人类不善于分辨的犬种。人类可能看到一朵花就只知道那是一朵花,计算机可以分辨那是一朵「芙蓉」或是一朵「大丽花」。

神经网络模型擅长归纳。比如不同种类的饭菜,尽管看起来不一样,但都会被标记为「饭菜」。

当计算机出错时,错误的原因是合理的。比如一只蛞蝓看起来很像一条蛇。

谷歌照片搜索

检查照片的像素并理解图像中的内容,这是个很强大的能力。

Google Photos 团队在没有标记它们的情况下部署了这一能力。你可以在没有标记图片的情况下搜索到雕像、尤达、图画、水等图片。

街景影像

在街景影像中,你希望可以阅读到所有的文本。这是更为精细更为具体的视觉任务。

首先需要能够找到图像中的文本。模型基本上都是被训练用来预测像素热图的:哪些像素包含文本,哪些不包含。训练数据是绘制于文本像素周围的多边形。

因为训练数据包含不同的字符集,它可以找到多种不同语言的文本。它可以识别大字体和小字体,离镜头近的和离得很远的文字,以及不同颜色的文本。

这是一个训练相对简单的模型。这是一个试图预测每个像素是否包含文本的传统的网络。

谷歌搜索排名的RankBrain

RankBrain于2015年推出,是谷歌第三重要的搜索排名因素。了解更多:谷歌将其利润丰厚的网络搜索交给人工智能机器。

搜索排名是不同的,因为你想要能够理解该模型,你想理解为什么它会做出特定的决策。

这是搜索排名团队犹豫在搜索排名中使用神经网络的一个原因。当系统出错时,他们希望了解什么会这样。

调试工具已被制造出来,而且模型也能被充分地理解,以克服这种异议。

一般来说你不想手动调整参数。你尝试理解为什么模型会做出那样的预测并搞清楚是否与训练数据相关,是与问题不匹配吗?你可能在一个分布式数据上进行训练,然后将其应用于另一个。通过搜索查询的分布,模型每天都能获得一点改变。因为事件在改变,模型也一直在改变。你必须了解你的分布是否是稳定的,比如在语音识别中,人们的声音并不会发生太大改变。查询和文档内容经常在改变,所以你必须确保你的模型是新鲜的。更一般地,我们需要打造更好的用于理解这些神经网络内部状况的工具,搞清楚是什么得出了预测。

序列至序列(Sequence-to-Sequence)映射模型

世界上许多问题都可归入到一个序列映射到另一个序列的框架中。谷歌的Sutskever、Vinyals 和 Le 在这个主题上写了一篇开关性的论文:使用神经网络的序列到序列学习 (http://papers.nips.cc/paper/5346-sequence-to-sequence-learning-with-neural-networks.pdf)。

特别地,他们研究了语言翻译,将英语翻译成法语中的问题。翻译事实上只是将英语句子序列映射到法语句子序列。

神经网络非常擅长学习非常复杂的功能,所以这个模型学习了映射英语句子到法语句子的功能。

170009nfzopeod6jkhp6pr.jpg

一种语言的一个句子通过EOS(end of sentence)信号一次输入一个词。当模型看到EOS 开始产出其它语言对应的句子时,模型就得到了训练。训练数据是具有同样含义的不同语言中的配对句子。它只是试图该函数建模。

模型会在每一步发出你的词汇中所有词条输入的概率分布。在推理而不是训练时间,你需要做一点搜索。如果你必须最大化每个词的概率,你并不一定会得到最可能的句子。直到找到最大可能的句子,联合概率的搜索才完成。

该系统是现在公共翻译服务中最先进的。其它翻译系统是一堆手写的代码或这个翻译问题的子块的机器学习模型,而非完全的端到端学习系统。

人们对这一模型的兴趣在暴增,因为很多问题都可被映射到序列到序列的方法。

智能回复(Smart Reply)

Smart Reply是序列到序列在产品中的一个应用案例。在手机上,你希望快速回复邮件,而打字又让人痛苦。

和 Gmail 团队合作,他们开发了一个能预测一条信息可能的回复的系统。

第一步是训练一个小模型以预测一条信息是否是可以快速回复的信息。如果是,就会激活一个更大的计算上更昂贵的模型;该模型将该信息作为一个序列,并尝试预测回复的单词序列。

比如,对于一封询问感恩节邀请的电子邮件,可预测到的回复有三个:把我们算上;我们会去;抱歉我们去不了。

Inbox 应用中惊人数量的回复都是通过 Smart Reply 生成的。

图片说明

生成一张图片说明时,你会试着让机器尽可能写出类似人类基于图片会做出的说明。

采用已经开发出来的图片模型,以及已经研发出来的Sequence-to-Sequence模型,把它们插在一起。图片模型被用作输入。

它被训练用来生成说明。训练数据集拥有五种不同的人给出的五种不同说明的图片。10万到20万的图片需要写70万句的说明。

一张婴儿怀抱泰迪熊的图片,电脑这么写的:一个抱着填充玩具动物孩子的特写;一个婴儿在泰迪熊旁边睡着了。

还没有达到人类理解水平,但机器出错时,结果可能会有趣。

综合视觉+翻译

技术能够综合起来。翻译团队编写了使用了在取景器中识别文本的计算机视觉APP。翻译文本,然后给图片叠加翻译文本(让人印象非常深刻,约37;29)。

模型足够小,整个计算都在设备上运行。

迭代(turnaround)时间和对研究的影响

在一天内完成单个CPU花费6周才能完成的训练

谷歌真的关心能够快速迭代研究。它的想法是快速的训练模型。理解什么运行良好,什么运行欠佳,找出下一组要运行的实验。

一个模型应该在在几分钟几小时内就能可训练,而不是几天甚至几个礼拜。让每个做这类研究的人更加富有生产力。

如何快速训练模型?

模型的并行性

一个神经网络有许多内在的并行性。

所有不同的个体神经元几乎都是彼此独立的,当你计算它们时,特别是,加入你有Local Receptive Fields,这是一个神经元从其下方少量神经元那里接受输入的地方。

能够跨越不同GPU卡上的不同机器对工作进行划分,只有跨越边界的数据才需要交流。

170010ll6vm9qjd2q96doh.jpg

数据的并行性

当你对模型的参数集进行优化时,不应该在中央服务的一台机器上进行,这样你就有不同的模型副本,通过它们之间的合作来进行参数优化。

在训练中理解不同的随机数据片段。每一个副本都会获得模型中当前的参数集,通过对相当规模数据的理解来判断出梯度,找出需要对参数所作的调整,并且将调整值发回至中央参数集服务器。参数服务器会对参数进行调整。不断重复这个过程。

这会在多个副本之间完成。有时他们会使用500台机器来生成500个模型副本,以便迅速实现参数的优化和处理数据。

这个过程可以异步进行,每个数据分任务在各自独自的循环运算中,获取参数,计算梯度并将它们传回,不会受到其他彼此的控制和同步。结果是,按照50-100的副本规模进行练习,对许多模型来说是可行的。

Q&A

如果不是诸如谷歌这样的大公司,无法获取海量数据集,你会怎么做?从一个运行良好的模型开始,用公共数据集进行训练。公共数据集普遍可以获取。然后用更加适合你问题的数据进行训练。当你从一个类似并且公开可获取的数据组开始时,针对你的特殊问题,可能只需要1,000或者10,000标签实例。ImageNet就是这种处理可行的好例子。

身为一个工程师,你所犯过的最大错误是什么?没有在BigTable里放入分布式事务处理能力。如果你想要更新多条数据,你不得不运作你自己的事务处理流程。没有放入事务处理能力是因为会增加系统设计的复杂度。回想起来,很对团队想要有那种能力,他们各自独立(在上层)去添加这个能力,也获得了不同程度成功。我们应该在核心系统实现事务处理能力。它在内部应用场景也会很有用。Spanner系统增加了事务处理搞定了这个问题。

英文链接:Jeff Dean on Large-Scale Deep Learning at Google

机器学习驱动编程:新世界的新编程

如果今天要重新建立Google,这家公司的大部分系统将会是通过学习得来的,而非编写代码而来。Google的25,000名开发者中原本有大概10%精通机器学习,现在这一比例可能会达到100% — Jeff Dean

和天气一样,大家都会抱怨编程,但谁也没有为此做点什么。这种情况正在发生变化,就像突如其来的暴风雨一样,这些变化也来自一个出乎意料的方向:机器学习/深度学习。

我知道很多人对深度学习都听厌了。谁不是呢?但是编程技术很长时间来都处在一成不变的情况下,是时候做点什么改变这一情况了。

各种有关编程的“战斗”还在继续,但解决不了任何问题。函数还是对象,这种语言还是那种语言,这朵公有云还是那朵公有云或者这朵私有云亦或那朵“填补了空白”的云,REST还是Unrest,这种字节级编码还是另一种编码,这个框架还是那个框架,这种方法论还是那种方法论,裸机还是容器或者虚拟机亦或Unikernel,单层还是微服务或者NanoService,最终一致还是事务型,可变(Mutable)还是不可变(Immutable),DevOps还是NoOps或者SysOps,横向扩展还是纵向扩展,中心化还是去中心化,单线程还是大规模并行,同步还是异步。诸如此类永无止境。

年复一年,天天如此。我们只是创造了调用函数的不同方法,但最终的代码还需要我们人类来编写。更强大的方法应该是让机器编写我们需要的函数,这就是机器学习大展拳脚的领域了,可以为我们编写函数。这种年复一年天天如此的无聊活动还是交给机器学习吧。

机器学习驱动的编程

我最初是和Jeff Dean聊过之后开始接触用深度学习方式编程的。当时我针对聊天内容写了篇文章:《Jeff Dean针对Google大规模深度学习的看法》。强烈建议阅读本文了解深度学习技术。围绕本文的主题,本次讨论的重点在于深度学习技术如何有效地取代人工编写的程序代码:

Google的经验是对于大量子系统,其中一些甚至是通过机器学习方式获得的,使用更为通用的端到端机器学习系统取代它们。通常当你有大量复杂的子系统时,必须通过大量复杂代码将它们连接在一起。Google的目标是让大家使用数据和非常简单的算法取代这一切。

机器学习将成为软件工程的敏捷工具

Google研究总监Peter Norvig针对这个话题进行过详细的讨论:用深度学习与可理解性对抗软件工程和验证。他的大致想法如下:

  • 软件。你可以将软件看作为函数构建的规范,而实现相应的函数就可以满足规范的要求。
  • 机器学习。以(x,y)对为例,你会猜测某一函数会使用x作为参数并生成y作为结果,这样就能很好地概括出x这个新值。
  • 深度学习。依然是以(x,y)对为例,可以通过学习知道你所组建的表征(Representation)具备不同级别的抽象,而非简单直接的输入和输出。这样也可以很好地概括出x这个新值。

软件工程师并不需要介入这个循环,只需要将数据流入机器学习组件并流出所需的表征。

表征看起来并不像代码,而是类似这样:

这种全新类型的程序并非人类可以理解的函数分解(Functional decomposition),看起来更像是一堆参数。

在机器学习驱动的编程(MLDP)世界里,依然可以有人类的介入,不过这些人再也不叫“程序员”了,他们更像是数据科学家。

可以通过范例习得程序的部分内容吗?可以。

  • 有一个包含超过 2000 行代码的开源拼写检查软件的例子,这个软件的效果始终不怎么好。但只要通过包含 17 行代码的朴素贝叶斯分类器就能以同样性能实现这个软件的功能,但代码量减少了100倍,效果也变得更好。
  • AlphaGo是另一个例子,该程序的结构是手工编写的,所有参数则是通过学习得到的。

只通过范例可以习得完整的程序吗?简单的程序可以,但大型传统程序目前还不行。

  • 在Jeff Dean的讲话中,他介绍了一些有关使用神经网络进行序列学习的顺序的细节。借助这种方法可以从零开始构建最尖端的机器翻译程序,这将是一种端到端学习获得的完整系统,无需手工编写大量代码或机器学习的模型即可解决一系列细节问题。
  • 另一个例子是学习如何玩Atari出品的游戏,对于大部分游戏机器可以玩得和人一样好,甚至比人玩得更好。如果按照传统方式对不同组件进行长期的规划,效果肯定不会像现在这么好。
  • 神经图灵机(Neural Turing Machine)也是一次学习如何用程序编写更复杂程序的尝试,但Peter认为这种技术不会走的太远。

正如你所期待的,Norvig先生的这次讲话非常棒,很有远见,值得大家一看。

Google正在以开源代码的方式从GitHub收集数据

深度学习需要数据,如果你想要创建一个能通过范例学习如何编程的AI,肯定需要准备大量程序/范例作为学习素材。

目前Google和GitHub已经开始合作让开源数据更可用。他们已经从GitHub Archive将超过3TB的数据集上传至BigQuery,其中包含超过280万个开源GitHub代码库中包含的活动数据,以及超过1.45亿次提交,和超过20亿不同的文件路径。

这些都可以当作很棒的训练数据。

就真的完美无缺吗?

当然不是。神经网络如同“黑匣子”般的本质使其难以与其他技术配合使用,这就像试图将人类潜意识行为与意识行为背后的原因结合在一起一样。

Jeff Dean还提到了Google搜索评级团队在搜索评级研究工作中运用神经网络技术时的犹豫。对于搜索评级,他们想要了解整个模型,了解做出某一决策的原因。当系统出现错误后他们还想了解为什么会出现这样的错误。

为了解决这样的问题,需要创建配套的调试工具,而相关工具必须具备足够的可理解性。

这样的做法会有效的。针对搜索结果提供的机器学习技术RankBrain发布于2015年,现在已成为第三大最重要的搜索评级指标(指标共有100项)。详情可访问:Google将利润丰厚的网页搜索技术交给AI计算机处理

Peter还通过一篇论文《机器学习:技术债的高利率信用卡》进一步介绍了相关问题的更多细节。缺乏明确的抽象层,就算有Bug你也不知道到底在哪。更改任何细节都要改变全局,很难进行纠正,纠正后的结果更是难以预测。反馈环路,机器学习系统生成数据后会将这些数据重新送入系统,导致反馈环路。诱惑性损害(Attractive Nuisance),一旦一个系统使得所有人都想使用,这样的系统很可能根本无法在不同上下文中使用。非稳态(Non-stationarity),随着时间流逝数据会产生变化,因此需要指出到底需要使用哪些数据,但这个问题根本没有明确答案。配置依赖性,数据来自哪里?数据准确吗?其他系统中出现的改动是否会导致这些数据产生变化,以至于今天看到的结果和昨天不一样?训练用数据和生产用数据是否不同?系统加载时是否丢弃了某些数据?缺乏工具,标准化软件开发有很多优秀的工具可以使用,但机器学习编程是全新的,暂时还没有工具可用。

MLDP是未来吗?

这并不是一篇“天呐,世界末日到了”这样的文章,而是一篇类似于“有些事你可能没听说过,世界又将为此产生变化,这多酷啊”的文章。

这种技术依然处于非常早期的阶段。对程序的部分内容进行学习目前已经是可行的,而对大规模复杂程序进行学习目前还不实用。但是当我们看到标题类似于Google如何将自己重塑为一家“机器学习为先”的公司这样的文章,可能还不明白这到底真正意味着什么。这种技术不仅仅是为了开发类似AlphaGo这样的系统,最终结果远比这个还要深刻。

相关文章

查看英文原文Machine Learning Driven Programming: A New Programming for a New World

from:http://www.infoq.com/cn/articles/machine-learning-programing

Pinterest谈实战经验:如何在两年内实现零到数百亿的月访问

Pinterest一直保持着指数增长,每一个半月都会翻一翻。在两年内,他们实现了从0到数百亿的月PV;从开始的两个创始人加一个工程师增长到现在超过40个工程师,从一个小型的MySQL服务器增长到180个Web Enigne、240个API Enigne、88个MySQL DB(cc2.8xlarge,每个DB都会配置一个从属节点)、110个Redis Instance以及200个Mmecache Instance。

在一个名为 《Scaling Pinterest》 的主题演讲上,Pinterest的Yashwanth NelapatiMarty Weiner为我们讲述了这个戏剧性的过程。当然扩展到当下规模,Pinterest在众多选择中不可避免的走了许多的弯路,而Todd Hoff认为其中最宝贵的经验该归结于以下两点:

  1. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  2. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障,这导致他们对工具的选择有着特殊的偏好:成熟、简单、优秀、知名、被更多的用户喜爱、更好的支持、稳定且杰出的表现、通常情况下无故障以及免费。使用这些标准,他们选择了MySQL、Solr、Memcache、Redis、Cassandra,同时还抛弃了MongoDB。

同样这两个点是有关联的,符合第二个原则的工具就可以通过投入更多的主机进行扩展。即使负载的增加,项目也不会出现很多故障。即使真的出现难以解决的问题,至少有一个社区去寻找问题解决的方案。一旦你选择过于复杂和挑剔的工具,在扩展的道路上将充满荆棘。

需要注意的是所有他们选择的工具都依靠增加分片来进行扩展,而非通过集群。讲话中还阐述了为什么分片优于集群以及如何进行分片,这些想法可能是之前你闻所未闻的。

下面就看一下Pinterest扩展的阶段性时间轴:

项目背景

  • Pins是由其它零零碎碎信息集合成的图片,显示了对客户重要的信息,并且链接到它所在的位置。
  • Pinterest是一个社交网络,你可以follow(关注)其他人以及board。
  • 数据库:Pinterest的用户拥有board,而每个board都包含pin;follow及repin人际关系、验证信息。

1. 2010年3月发布——寻找真我的时代

在那时候,你甚至不知道需要建立一个什么样的产品。你有想法,所以你快速的迭代以及演变。而最终你将得到一些很小的MySQL查询,而这些查询在现实生活中你从未进行过。

Pinterest初期阶段的一些数字:

  • 2个创始人
  • 1个工程师
  • Rackspace
  • 1个小的网络引擎
  • 1个小的MySQL数据库
  • 2011年11月

仍然是小规模,产品通过用户反馈进行演变后的数字是:

  • Amazon EC2 + S3 + CloudFront
  • 1 NGinX, 4 Web Engines (用于冗余,不全是负载)
  • 1 MySQL DB + 1 Read Slave (用于主节点故障情况)
  • 1 Task Queue + 2 Task Processors
  • 1 MongoDB (用于计数)
  • 2 Engineers

2. 贯穿2011年——实验的时代

迈上疯狂增长的脚步,基本上每1个半月翻一翻。

  • 当你增长的如此之快,每一天每一星期你可能都需要打破或者抛弃一些东西。
  • 在这个时候,他们阅读大量的论文,这些论文都阐述着只需要添加一台主机问题就会得以解决。他们着手添加许多技术,随后又不得不放弃。
  • 于是出现了一些很奇怪的结果
  • Amazon EC2 + S3 + CloudFront
  • 2NGinX, 16 Web Engines + 2 API Engines
  • 5 Functionally Sharged MySQL DB + 9 read slaves
  • 4 Cassandra Nodes
  • 15 Membase Nodes (3 separate clusters)
  • 8 Memcache Nodes
  • 10 Redis Nodes
  • 3 Task Routers + 4 Task Processors
  • 4 Elastic Search Nodes
  • 3 Mongo Clusters
  • 3个工程师
  • 5个主数据库技术,只为了独立其中的数据。
  • 增长太快以至于MySQL疲于奔命,所有其它的技术也达到了极限。
  • 当你把事物用至极限时,这些技术都会以各自不同的方式出错。
  • 开始抛弃一些技术,并且自我反省究竟需要些什么,基本上重做了所有的架构。

3. 2012年2月——成熟的时代

  • 在重做了所有的架构后,系统呈现了如下状态
  • Amazon EC2 + S3 + Akamai, ELB
  • 90 Web Engines + 50 API Engines
  • 66 MySQL DBs (m1.xlarge) +,每个数据库都配备了从属节点
  • 59 Redis Instances
  • 51 Memcache Instances
  • 1 Redis Task Manager + 25 Task Processors
  • Sharded Solr
  • 6个工程师
  • 现在采用的技术是被分片的MySQL、Redis、Memcache和Solr,有点在于这些技术都很简单很成熟。
  • 网络传输增长仍然保持着以往的速度,而iPhone传输开始走高。

4. 2012年10月12日 —— 收获的季节

大约是1月份的4倍

  • 现在的数据是:
  • Amazon EC2 + S3 + Edge Cast,Akamai, Level 3
  • 180 Web Engines + 240 API Engines
  • 88 MySQL DBs (cc2.8xlarge) ,同样每个数据库都有一个从属节点
  • 110 Redis Instances
  • 200 Memcache Instances
  • 4 Redis Task Manager + 80 Task Processors
  • Sharded Solr
  • 40个工程师(仍在增长)
  • 需要注意的是,如今的架构已趋近完美,应对增长只需要投入更多的主机。
  • 当下已开始转移至SSD

下面一览该演讲中的干货,决策的制定:

为什么会选择EC2和S3

  1. 相当好的可靠性,即使数据中心发生故障。多租户会增加风险,但是也不是太坏。
  2. 良好的报告和支持。它们(EC2和S3)有着良好的架构,并且知道问题所在。
  3. 完善的周边设施,特别是在你需要快速增长时。你可以从APP Engine处获得maged cache、负载均衡、MapReduce、数据库管理以及其它你不想自己动手编写的组件,这可以加速你应用程序的部署,而在你工程师空闲时,你可以着手编写你需要的一切。
  4. 新的实例可以在几秒内就绪,这就是云的力量;特别是在只有两个工程师的初期,不需要去担心容量规划,更不需要花两个星期去建立自己的Memcache,你可以在数分钟内添加10个Memcached。
  5. 缺点:有限的选择。直到最近,才可以选择使用SSD,同时无法获得太大的内存配置。
  6. 优点:你不需要给大量的主机进行不同的配置。

为什么会选择MySQL

  1. 非常成熟。
  2. 非常稳定。不会宕机,并且不会丢失数据。
  3. 在招聘上具有优势,市场上有大把的人才。
  4. 在请求呈直线上升时,仍能将相应时间控制在一定的范围内,有些数据库技术在面对请求的飙升时表现并不是很好。
  5. 非常好的周边软件支持——XtraBackup、Innotop、Maatkit。
  6. 可以从类似Percona这样的公司得到优秀的技术支持。
  7. 开源(免费)——这一点非常重要,特别是在资金缺乏的初期

为什么使用Memcache

  • 非常成熟。
  • 非常简单。可以当成是一个socket哈希表
  • 杰出稳定的表现
  • 知名并为大量用户喜爱
  • 永不崩溃
  • 开源

为什么选择Redis

  • 虽然还不够成熟,但是非常简单及优秀
  • 提供了大量的数据结构类型
  • 提供多种的选择进行持久化和备份:你可以备份而非持久化,选择备份的话你还可以选择多久备份一次;同样你还可以选择使用什么方式进行持久化,比如MySQL等。
  • Home feed被储存在Redis上,每3个小时保存一次;然而并不是3个小时持久化一次,只是简单的每3个小时备份一次。
  • 如果你存储数据的主机发生故障,丢失的也只是备份周期内的数据。虽然不是完全可靠,但是非常简单。避免了复杂的持久化及复制,这样的架构简单且便宜。
  • 知名并为大量用户喜爱
  • 稳定且杰出的表现
  • 很少出故障。有一些专有的故障模型,你需要学会解决。这也是成熟的优势,只需要学习就可以解决。
  • 开源

Solr

  1. 只需要几分钟的安装时间,就可以投入使用
  2. 不能扩展到多于一台的机器上(最新版本并非如此)
  3. 尝试弹性搜索,但是以Pinterest的规模来说,可能会因为零碎文件和查询太多而产生问题。
  4. 选择使用Websolr,但是Pinterest拥有搜索团队,将来可能会开发自己的版本。

集群vs.分片

  • 在迅速扩展的过程中,Pinterest认识到每次负载的增加,都需要均匀的传播他们的数据。
  • 针对问题先确定解决方案的范围,他们选择的范围是集群和分片之间的一系列解决方案。

集群——所有的操作都是通过自动化

  • 比如:Cassandra、MemBase、HBase
  • 结论:没有安全感,将来可能会比较成熟,但是当下这个解决方案中还存在太多的复杂性和故障点。
  • 特性:
  • 数据自动分布
  • 节点间转移数据
  • 需要平衡分配
  • 节点间的相互通信,需要做很多措施用于防止干扰、无效传递及协商。
  • 优点:
  • 自动扩展你的数据存储,最起码论文中是这么说的。
  • 便于安装
  • 数据上的空间分布及机房共置。你可以在不同区域建立数据中心,数据库会帮你打理好一切。
  • 高有效性
  • 负载平衡
  • 不存在单点故障
  • 缺点:
  • 仍然不成熟。
  • 本质上说还很复杂。一大堆的节点必须对称协议,这一点非常难以解决。
  • 缺少社区支持。社区的讨论因为产品方向的不同而不能统一,而在每个正营中也缺乏强有力的支持。
  • 缺乏领域内资深工程师,可能大多数的工程师都还未使用过Cassandra。
  • 困难、没有安全感的机制更新。这可能是因为这些技术都使用API并且只在自己的领域内通行,这导致了复杂的升级路径。
  • 集群管理算法本身就用于处理SPOF(单点故障),如果存在漏洞的话可能就会影响到每个节点。
  • 集群管理器代码非常复杂,并且需要在所有节点上重复,这就可能存在以下的故障模式:
  • 数据平衡失控。当给集群中添加新的主机时,可能因为数据的拷贝而导致集群性能下降。那么你该做什么?这里不存在去发现问题所在的工具。没有社区可以用来求助,同样你也被困住了,这也是Pinterest回到MySQL的原因。
  • 跨节点的数据损坏。如果这里存在一个漏洞,这个漏洞可能会影响节点间的日志系统和压缩等其它组件?你的读延时增加,所有的数据都会陷入麻烦以及丢失。
  • 错误负载平衡很难被修复,这个现象十分普遍。如果你有10个节点,并且你注意到所有的负载都被堆积到一个节点上。虽然可以手动处理,但是之后系统还会将负载都加之一个节点之上。
  • 数据所有权问题,主次节点转换时的数据丢失。集群方案是非常智能的,它们会在特定的情况下完成节点权利的转换,而主次节点切换的过程中可能会导致数据的部分丢失,而丢失部分数据可能比丢失全部还糟糕,因为你不可能知道你究竟丢失了哪一部分。

分片——所有事情都是手动的

  • 结论:它是获胜者。Todd Hoff还认为他们的分片架构可能与Flickr架构类似。
  • 特性:
  • 分片可以让你摆脱集群方案中所有不想要的特性。
  • 数据需要手动的分配。
  • 数据不会移动。Pinterest永远都不会在节点间移动,尽管有些人这么做,这让他们在一定范围内站的更高。
  • 通过分割数据的方式分配负载。
  • 节点并没有互相通信,使用一些主节点控制程序的运行。
  • 优点:
  • 可以分割你的数据库以提高性能。
  • 空间分布及放置数据
  • 高有效性
  • 负载平衡
  • 放置数据的算法非常简单。主要原因是,用于处理单点故障的代码只有区区的半页,而不是一个复杂的集群管理器。并且经过短暂的测试就知道它是否能够正常工作。
  • ID生成非常简单
  • 缺点:
  • 不可以执行大多数的join。
  • 失去所有事务的能力。在一个数据库上的插入可能会成功,而在另一个上会失败。
  • 许多约束必须放到应用程序层。
  • 模式的转变需要从长计议。
  • 报告需要在所有分片上执行查询,然后需要手动的进行聚合。
  • Join在应用程序层执行。
  • 应用程序必须容忍以上所有问题。

什么时候进行分片

  1. 如果你的项目拥有PB级的数据,那么你需要立刻对其进行分片。
  2. Pin表格拥有百万行索引,索引大小已经溢出内存并被存入了磁盘。
  3. Pinterest使用了最大的表格,并将它们(这些索引)放入自己的数据库。
  4. 然后果断的超过了单数据库容量。
  5. 接着Pinterest必须进行分片。

分片的过渡

  • 过渡从一个特性的冻结开始。
  • 确认分片该达到什么样的效果——希望尽少的执行查询以及最少数量的数据库去呈现一个页面。
  • 剔除所有的MySQL join,将要做join的表格加载到一个单独的分片去做查询。
  • 添加大量的缓存,基本上每个查询都需要被缓存。
  • 这个步骤看起来像:
  • 1 DB + Foreign Keys + Joins
  • 1 DB + Denormalized + Cache
  • 1 DB + Read Slaves + Cache
  • Several functionally sharded DBs+Read Slaves+Cache
  • ID sharded DBs + Backup slaves + cache
  • 早期的只读从属节点一直都存在问题,因为存在slave lag。读任务分配给了从属节点,然而主节点并没有做任何的备份记录,这样就像一条记录丢失。之后Pinterest使用缓存解决了这个问题。
  • Pinterest拥有后台脚本,数据库使用它来做备份。检查完整性约束、引用。
  • 用户表并不进行分片。Pinterest只是使用了一个大型的数据库,并在电子邮件和用户名上做了相关的一致性约束。如果插入重复用户,会返回失败。然后他们对分片的数据库做大量的写操作。

如何进行分片

  • 可以参考Cassandra的ring模型、Membase以及Twitter的Gizzard。
  • 坚信:节点间数据传输的越少,你的架构越稳定。
  • Cassandra存在数据平衡和所有权问题,因为节点们不知道哪个节点保存了另一部分数据。Pinterest认为应用程序需要决定数据该分配到哪个节点,那么将永远不会存在问题。
  • 预计5年内的增长,并且对其进行预分片思考。
  • 初期可以建立一些虚拟分片。8个物理服务器,每个512DB。所有的数据库都装满表格。
  • 为了高有效性,他们一直都运行着多主节点冗余模式。每个主节点都会分配给一个不同的可用性区域。在故障时,该主节点上的任务会分配给其它的主节点,并且重新部署一个主节点用以代替。
  • 当数据库上的负载加重时:
  • 先着眼节点的任务交付速度,可以清楚是否有问题发生,比如:新特性,缓存等带来的问题。
  • 如果属于单纯的负载增加,Pinterest会分割数据库,并告诉应用程序该在何处寻找新的节点。
  • 在分割数据库之前,Pinterest会给这些主节点加入一些从属节点。然后置换应用程序代码以匹配新的数据库,在过渡的几分钟之内,数据会同时写入到新旧节点,过渡结束后将切断节点之间的通道。

ID结构

  • 一共64位
  • 分片ID:16位
  • Type:10位—— Board、User或者其它对象类型
  • 本地ID——余下的位数用于表中ID,使用MySQL自动递增。
  • Twitter使用一个映射表来为物理主机映射ID,这将需要备份;鉴于Pinterest使用AWS和MySQL查询,这个过程大约需要3毫秒。Pinterest并没有让这个额外的中间层参与工作,而是将位置信息构建在ID里。
  • 用户被随机分配在分片中间。
  • 每个用户的所有数据(pin、board等)都存放在同一个分片中,这将带来巨大的好处,避免了跨分片的查询可以显著的增加查询速度。
  • 每个board都与用户并列,这样board可以通过一个数据库处理。
  • 分片ID足够65536个分片使用,但是开始Pinterest只使用了4096个,这允许他们轻易的进行横向扩展。一旦用户数据库被填满,他们只需要增加额外的分片,然后让新用户写入新的分片就可以了。

查找

  • 如果存在50个查找,举个例子,他们将ID分割且并行的运行查询,那么延时将达到最高。
  • 每个应用程序都有一个配置文件,它将给物理主机映射一个分片范围。
  • “sharddb001a”: : (1, 512)
  • “sharddb001b”: : (513, 1024)——主要备份主节点
  • 如果你想查找一个ID坐落在sharddb003a上的用户:
  • 将ID进行分解
  • 在分片映射中执行查找
  • 连接分片,在数据库中搜寻类型。并使用本地ID去寻找这个用户,然后返回序列化数据。

对象和映射

  • 所有数据都是对象(pin、board、user、comment)或者映射(用户由baord,pin有like)。
  • 针对对象,每个本地ID都映射成MySQL Blob。开始时Blob使用的是JSON格式,之后会给转换成序列化的Thrift。
  • 对于映射来说,这里有一个映射表。你可以为用户读取board,ID包含了是时间戳,这样就可以体现事件的顺序。
  • 同样还存在反向映射,多表对多表,用于查询有哪些用户喜欢某个pin这样的操作。
  • 模式的命名方案是:noun_verb_noun: user_likes_pins, pins_like_user。
  • 只能使用主键或者是索引查找(没有join)。
  • 数据不会向集群中那样跨数据的移动,举个例子:如果某个用户坐落在20分片上,所有他数据都会并列存储,永远不会移动。64位ID包含了分片ID,所以它不可能被移动。你可以移动物理数据到另一个数据库,但是它仍然与相同分片关联。
  • 所有的表都存放在分片上,没有特殊的分片,当然用于检测用户名冲突的巨型表除外。
  • 不需要改变模式,一个新的索引需要一个新的表。
  • 因为键对应的值是blob,所以你不需要破坏模式就可以添加字段。因为blob有不同的版本,所以应用程序将检测它的版本号并且将新记录转换成相应的格式,然后写入。所有的数据不需要立刻的做格式改变,可以在读的时候进行更新。
  • 巨大的胜利,因为改变表格需要在上面加几个小时甚至是几天的锁。如果你需要一个新的索引,你只需要建立一张新的表格,并填入内容;在不需要的时候,丢弃就好。

呈现一个用户文件界面

  1. 从URL中取得用户名,然后到单独的巨型数据库中查询用户的ID。
  2. 获取用户ID,并进行拆分
  3. 选择分片,并进入
  4. SELECT body from users WHERE id =
  5. SELECT board_id FROM user_has_boards WHERE user_id=
  6. SELECT body FROM boards WHERE id IN ()
  7. SELECT pin_id FROM board_has_pins WHERE board_id=
  8. SELECT body FROM pins WHERE id IN (pin_ids)
  9. 所有调用都在缓存中进行(Memcache或者Redis),所以在实践中并没有太多连接数据库的后端操作。

脚本相关

  1. 当你过渡到一个分片架构,你拥有两个不同的基础设施——没有进行分片的旧系统和进行分片的新系统。脚本成为了新旧系统之间数据传输的桥梁。
  2. 移动5亿的pin、16亿的follower行等。
  3. 不要轻视项目中的这一部分,Pinterest原认为只需要2个月就可以完成数据的安置,然而他们足足花了4至5个月时间,别忘了期间他们还冻结了一项特性。
  4. 应用程序必须同时对两个系统插入数据。
  5. 一旦确认所有的数据都在新系统中就位,就可以适当的增加负载来测试新后端。
  6. 建立一个脚本农场,雇佣更多的工程师去加速任务的完成。让他们做这些表格的转移工作。
  7. 设计一个Pyres副本,一个到GitHub Resque队列的Python的接口,这个队列建立在Redis之上。支持优先级和重试,使用Pyres取代Celery和RabbitMQ更是让他们受益良多。
  8. 处理中会产生大量的错误,用户可能会发现类似丢失board的错误;必须重复的运行任务,以保证在数据的处理过程中不会出现暂时性的错误。

开发相关

  • 开始尝试只给开发者开放系统的一部分——他们每个人都拥有自己的MySQL服务器等,但是事情改变的太快,以至于这个模式根本无法实行。
  • 转变成Facebook模式,每个人都可以访问所有东西,所以不得不非常小心。

未来的方向

  • 基于服务的架构
  • 当他们发现大量的数据库负载,他们开始布置大量的应用程序服务器和一些其它的服务器,所有这些服务器都连接至MySQL和Memcache。这意味着在Memcache上将存在3万的连接,这些连接将占用几个G的内存,同时还会产生大量的Memcache守护进程。
  • 为了解决这个问题,将这些工作转移到了一个服务架构。比如:使用一个follower服务,这个服务将专注处理follower查询。这将接下30台左右的主机去连接数据库和缓存,从而减少了连接的数量。
  • 对功能进行隔离,各司其职。让一个服务的开发者不能访问其它的服务,从而杜绝安全隐患。

学到的知识

  1. 为了应对未来的问题,让其保持简单。
  2. 让其变的有趣。只要应用程序还在使用,就会有很多的工程师加入,过于复杂的系统将会让工作失去乐趣。让架构保持简单就是大的胜利,新的工程师从入职的第一周起就可以对项目有所贡献。
  3. 当你把事物用至极限时,这些技术都会以各自不同的方式发生故障。
  4. 如果你的架构应对增长所带来的问题时,只需要简单的投入更多的主机,那么你的架构含金量十足。
  5. 集群管理算法本身就用于处理SPOF,如果存在漏洞的话可能就会影响到每个节点。
  6. 为了快速的增长,你需要为每次负载增加的数据进行均匀分配。
  7. 在节点间传输的数据越少,你的架构越稳定。这也是他们弃集群而选择分片的原因。
  8. 一个面向服务的架构规则。拆分功能,可以帮助减少连接、组织团队、组织支持以及提升安全性。
  9. 搞明白自己究竟需要什么。为了匹配愿景,不要怕丢弃某些技术,甚至是整个系统的重构。
  10. 不要害怕丢失一点数据。将用户数据放入内存,定期的进行持久化。失去的只是几个小时的数据,但是换来的却是更简单、更强健的系统!

原文链接: Scaling Pinterest – From 0 To 10s Of Billions Of Page Views A Month In Two Years (编译/仲浩 审校/王旭东)

from:http://www.csdn.net/article/2013-04-16/2814902-how-pinterest-scaling-0-to-billions-pv

Amazon’s AWS

原文链接:A Beginner’s Guide To Scaling To 11 Million+ Users On Amazon’s AWS

译者:杰微刊–汪建

 

一个系统从一个用户到多于1100万用户访问,你将如何对你的系统进行扩展?Amazon的web服务解决方案架构师乔尔?威廉姆斯就此话题给出了一个精彩的演讲:2015扩展你的第一个一千万用户。
如果你是一个拥有较丰富的AWS使用经验的用户,这个演讲将不太适合你,但如果你作为一个刚接触云、刚接触AWS的新用户,或者你还没有跟上Amazon源源不断对外发布的AWS新特性,它将是一个很好的入门资料。
正如大家所期望的,这个演讲讨论Amazon服务如何针对问题提出先进且主流的解决方案,Amazon平台总是令人印象深刻且拥有指导性。对于如何把所有产品组合在一起Amazon做了大量工作去提取出用户需要的是什么,并且确保Amazon对于每个用户的需求都拥有一个产品能满足这部分的需求。
演讲的一些有趣的要点:

1、一般刚开始时使用SQL而在必要时刻转向NoSQL。
2、一致的观点是通过引入组件去解耦系统,使用组件便于扩展并且组件故障不会影响到其他模块。组件便于使系统分层和构建微服务。
3、只把区别于已有任务的部分作为你的业务逻辑,不要重复发明轮子。
4、可伸缩性和冗余性不是两个互相独立的概念,你经常要将两个概念同时放在一起考虑。
5、没有提及成本,成为AWS解决方案被批评的一个主要方面。

 

基本情况
AWS覆盖全世界12个国家区域

1. 每个区域都对应着世界上的一个物理位置,每个位置都有弹性计算云提供多个可用区域(Availability Zones),这些区域包含北美、南美、欧洲、中东、非洲、亚太等地区。
2. 每个可用区域(AZ)实质上是单个数据中心,尽管它可由多个数据中心构造。
3. 每个可用区域都拥有很强的隔离性,他们各自拥有独立的电源和网络。
4. 可用区域之间只能通过低延迟网络互相连接,它们可以相距5或15英里,但网络的速度相当快以至于你的应用程序像在同一个数据中心。
5. 每个区域至少有2个可用区域,可用区域总共有32个。
6. 借助若干可用区域即可构建一个高可用的架构供你的应用使用。
7. 在即将到来的2016年将会增加至少9个可用区域和4个区域。

 

AWS在世界上拥有53个边缘位置
1. 这些边缘位置被用于Amazon的内容分发网络CDN、Route53、CloudFront以及Amazon的DNS管理服务器。
2. 边缘位置使用户可以在世界的任何角落低延迟地访问网页。
构建块服务
1. AWS已经使用多个可用区域构架了大量服务供使用,这些服务内部拥有高可用性和容错性。以下是可供使用的服务列表。
2. 你可以在你的应用中直接使用这些服务,它们是收费的,但使用它们你可以不必自己考虑高可用性。
3. 每个可用区域都提供很多服务,包括CloudFront, Route 53, S3, DynamoDB, 弹性负载均衡, EFS, Lambda, SQS, SNS, SES, SWF。
4. 即使这些服务只存在于一个单一的可用区域,通过使用这些服务任然可以构建一个高可用架构。
一个用户
在这种情况下,你是作为仅有的用户,你仅仅只想让web应用跑起来。
你的架构看起来像下面几点:

1. 运行在单独的实例上,可能是t2.micro型。实例类型包括了CPU、内存、存储和网络的不同组合,通过选择这些不同实例类型组成一个适合你的web应用的资源。
2. 在单独的实例上运行整个web栈,例如web应用程序、数据库以及各种管理系统等。
3. 使用Amazon的Route53作为DNS服务。
4. 在此实例上添加一个的弹性IP。
5. 在一段时间内运行的良好。
纵向扩展
1、你需要一个更大的容器放置你的应用,最简单的扩展方法是选择一个更大的实例类型,例如c4.8xlarge或者m3.2xlarge。
2、这种方法称为纵向扩展。
3、需要做的仅仅是选择一个新型实例取代原来的实例,应用跑起来即可以更加强大。
4、提供多种不同的硬件配置混搭选择,可以选择一个244G内存的系统(2TB的RAM即将到来),也可以选择40个CPU内核的系统,可以组成I/0密集型实例、CPU密集型实例以及高存储型实例。
5、Amazon的一些服务使用可配置的IOPS选项来保证性能,你可以使用小一点的实例去跑你的应用,对于需要扩展的服务独立使用Amazon的可扩展服务,例如DynamoDB。
6、纵向扩展有一个很大的问题:它不具备failover功能,同时也不具备冗余性。就像把所有鸡蛋都放在同一个篮子里,一旦实例发生故障你的web也会宕掉。
7、一个单独的实例最终能做到的也就这些,想要更加强大需要其他的措施。
10+用户
将单个主机分为多个主机
1. Web应用使用一台主机。
2. 数据库使用一台主机,你可以在上面跑任意数据库,只要负责数据库的管理。
3. 将主机分离成多个主机可以让web应用和数据库各自独立对自己进行扩展,例如在某种情况下可能你需要的数据库比web应用更大的规模。
或者你可以不自己搭建数据库转而使用Amazon的数据库服务
1. 你是一个DBA吗?你真的想要担心数据备份的问题吗?担心高可用?担心数据库补丁?担心操作系统?
2. 使用Amazon数据库服务有一大优势,你只要简单一点击即可完成多可用区域的数据库的安装,而且你不必担心这些可用区域之间的数据备份或其他类似的事情,这些数据库具备高可用性高可靠性。
正如你所想,Amazon有几种类型的完全托管数据库服务供出售:
1. Amazon RDS(Relational Database Service),可供选择的数据库类型相当多,包括Microsoft SQL Server, Oracle, MySQL, PostgreSQL, MariaDB, Amazon Aurora.
2. Amazon DynamoDB,一个NoSQL数据库。
3. Amazon Redshift,一个PB级的数据仓库系统。
更多Amazon 特性
1. 拥有自动扩展存储到64TB的能力,你不再需要限定你的数据存储。
2. 多大15个读副本。
3. 持续增量备份到S3。
4. 多达6路备份到3个可用区域,有助于处理故障。
5. MySQL兼容。
用SQL数据库取代NoSQL数据库
1. 建议使用SQL数据库。
2. SQL数据库相关技术完善。
3. 存在大量开源源码、社区、支持团队、书籍和工具。
4. 千万用户级别系统的还不足以拖垮SQL数据库,除非你的数据非常巨大。
5. 具有清晰的扩展模式。
什么时候你才需要使用NoSQL数据库
1. 如果你一年需要存储超过5TB的数据,或者你有一个令人难以置信的数据密集任务。
2. 你的应用具有超低延迟需求。
3. 你的应用需要一个非常高的吞吐量,需要在数据的读写I/O上进行优化。
4. 你的应用没有任何关系型数据。
100+用户
在web层进行主机分离。
使用Amazon RDS存储数据,它把数据库的所有工作都揽下了。
上面两点做好即可。

 

1000+用户
现在你构建的应用存在可用性问题,你的web应用将会宕掉如果你web服务的主机宕掉了。
你需要在另外一个可用区域上搭建另外一个web实例,由于可用区域之间拥有毫秒级别的低延迟,他们看起来就像互相挨着。
同样,你需要在另外一个可用区域上搭建一个RDS数据库slave,组成主备数据库,一旦主数据库发生故障你的web应用将会自动切换到slave备数据库。由于你的应用总是使用相同的端,failover不会带给应用任何改变。
在两个可用区域中分布着两个web主机实例,使用弹性负载均衡器(ELB)将用户访问分流到两个web主机实例。
弹性负载均衡器(ELB)
1. ELB是一个高可用的负载均衡器,它存在于所有的可用区域中,对于你的应用来说它是一个DNS服务,只需要把他放到Route53即可,它就会在你的web主机实例中进行负载分发。
2. ELB有健康检查机制,这个机制保证流量不会分发到宕掉的主机上。
3. 不用采取任何措施即可完成扩展,当它发现额外流量时它将在后台通过横向和纵向扩展,随着你的应用不断扩展,它也会自动不断扩展,而且这些都是系统自动完成的,你不必对ELB做任何管理。
10000到100000用户
前面例子中说到ELB后面挂载两个web主机实例,而实际上你可以在ELB后面挂载上千个主机实例,这就叫横向扩展。
添加更多的读副本到数据库中,或者添加到RDS中,但需要保持副本的同步。
通过转移一些流量到web层服务器减轻web应用的压力,例如从你的web应用中将静态内容抽离出来放到Amazon S3和Amazon CloudFront上,CloudFront是Amazon的CDN,它会将你的静态内容保存在全世界的53个边缘地区,通过这些措施提高性能和效率。
Amazon S3是一个对象仓库。
1. 它不像EBS,它不是搭载在EC2实例上的存储设备,它是一个对象存储而不是块存储。
2. 对于静态内容如JavaScript、css、图片、视频等存放在Amazon S3上再合适不过,这些内容没必要放到EC2实例上。
3. 高耐用性,11个9的可靠性。
4. 无限制的可扩展,只要你想可以往里面扔尽可能多的数据,用户在S3上存储了PB级别的数据。
5. 支持最大5TB的对象存储。
6. 支持加密,你可以使用Amazon的加密,或者你自己的加密,又或者第三方的加密服务。
Amazon CloudFront对你的内容提供缓存
1. 它将内容缓存在边缘地区以便供你的用户低延迟访问。
2. 如果没有CDN,将导致你的用户更高延迟地访问你的内容,你的服务器也会因为处理web层的请求而处于更高的负载。
3. 例如有个客户需要应对60Gbps的访问流量,CloudFront将一切都处理了,web层甚至都不知道有这么大的访问流量存在。
你还可以通过转移session状态减轻你的web层的负载
1. 将session状态保存到ElastiCache或DynamoDB。
2. 这个方法也让你的系统在未来可以自动扩展。
你也可以将数据库的一些数据缓存在ElastiCache减轻应用负载
数据库没有必要处理所有获取数据的请求,缓存服务可以处理这些请求从而让宝贵的数据库资源处理更加重要的操作。
Amazon DynamoDB——全托管的NoSQL数据库
1. 根据你自己想要的吞吐量,定制你想要的读写性能。
2. 支持高性能。
3. 具备分布式和容错性,它部署在多个可用区域中。
4. 它以kv结构存储,且支持JSON格式。
5. 支持最大400k大的文件。
Amazon Elasticache ——全托管的Memcached或Redis
1. 维护管理一个memcached集群并不会让你赚更多的钱,所以让Amazon来做。
2. Elasticache集群会自动帮你扩展,它是一个具备自我修复特性的基础设施,如果某些节点宕掉了其它的新节点即会自动启动。
你也可以转移动态内容到CloudFront减轻负载
众所周知CloudFront能处理静态内容,例如文件,但除此之外它还还能处理某些动态内容,这个话题不再进行深入的探讨,可以看看这个链接。
自动扩展
对于黑色星期五,假如你不用做任何扩展就足够处理这些峰值流量,那么你是在浪费钱。如果需求和计算能力相匹配自然是最好的,而这由自动扩展帮你实现,它会自动调整计算集群的大小。
作为用户,你可以决定集群的最小实例数和最大实例数,通过实例池中设置最小和最大实例数即可。
云监控是一种嵌入应用的管理服务
1. 云监控的事件触发扩展。
2. 你准备扩展CPU的数量吗?你准备优化延迟吗?准备扩展带宽吗?
3. 你也可以自定义一些指标到云监控上,如果你想要指定应用针对某些指标自动扩展,只需将这些指标放到云监控上,告诉根据云监控根据这些指标分别扩展哪些资源。
500000+用户
前面的配置可以自动扩展群组添加到web层,在两个可用区域里自动扩展群组,也可以在三个可用区域里扩展,在不同可用区域中的多实例模式不经可以确保可扩展性,同时也保证了可用性。
论题中的案例每个可用区域只有3个web层实例,其实它可以扩展成上千个实例,而你可以设置实例池中最小实例数为10最大实例数为1000。
ElastiCache用于承担数据库中热点数据的读负载。
DynamoDB用于Session数据的负载。
你需要增加监控、指标以及日志。
1. 主机级别指标,查看自动扩展的集群中的某一CPU参数,快速定位到问题的位置。
2. 整体级别指标,查看弹性负载均衡的指标判断整个实例集群的整体性能。
3. 日志分析,使用CloudWatch日志查看应用有什么问题,可以使用CloudTrail对这些日志进行分析管理。
4. 外部站点监控,使用第三方服务例如New Relic或Pingdom监控作为终端用户看到了什么情况。
你需要知道你的用户的反馈,他们是不是访问延迟很慢,他们在访问你的web应用时是不是出现了错误。
从你的系统结构中尽可能多地排出性能指标,这有助于自动扩展的决策,你可不希望你的系统CPU使用率才20%。
自动化运维
随着基础设施越来越大,它扩展到了上千个实例,我们有读副本,我们有水平横线扩展,对于这些我们需要一些自动化运维措施去对他们进行管理,我们可不希望对着每个实例一个一个单独地管理。
动化运维工具分为两个层级
1. DIY层,包括Amazon EC2和AWS CloudFormation。
2. 更高层次的服务,包括AWS Elastic Beanstalk和AWS OpsWorks。
AWS Elastic Beanstalk,为你的应用自动管理基础设施,很方便。
AWS OpsWorks,应用程序管理服务,用于部署和操作不同形态规模的应用程序,它还能做到持续集成。
AWS CloudFormation
1. 提供了最大的灵活性,它提供了你的应用栈的模板,它可以构建你的整个应用栈,或者仅仅是应用栈中的某个组件。
2. 如果你要更新你的应用栈你只要更新CloudFormation模板,它将更新你的整个应用。
3. 它拥有大量的控制,但缺乏便利性。
AWS CodeDeploy,部署你的程序到整个EC2实例集群
1. 可以部署一到上千个实例。
2. Code Deploy可以指向一个自动扩展配置。
3. 可连同Chef和Puppet一起使用。
解耦基础设施
使用SOA/微服务,从你的应用抽离出不同服务,就像前面你将web层与数据库层分离出来那样,再分别创建这些服务。
这些独立出来的服务就可以根据自己需要扩展,它给你系统的扩展带来了灵活性,同时也保证了高可用性。
SOA是Amazon搭建架构关键的组成部分。
松耦合解放了你
1. 你可以对某些服务单独地扩展和让它失效。
2. 如果一个工作节点从SQS拉取数据失败,没有没关系?没有,只要重启另外一个工作节点即可,所有操作都有可能发生故障,所以一定要搭建一个可以处理故障的架构,提供failover功能。
3. 将所有模块设置成黑盒。
4. 把交互设计成松耦合方式。
5. 优先考虑内置了冗余性和可扩展性的服务,而不是靠自己构建实现。
不要重复发明轮子
只需把你区别于已有任务的部分作为你的业务逻辑。
Amazon的很多服务本身具备容错能力,因为他们跨多个可用区域,例如:队列、邮件、转码、搜索、数据库、监控、性能指标采集、日志处理、计算等服务,没有必要自己搭建。
SQS:队列服务
1. Amazon提供的第一个服务。
2. 它是跨可用区域的所以拥有容错性。
3. 它具备可扩展性、安全性、简单性。
4. 队列可以帮助你的基础设施上的不同组件之间传递消息。
5. 以图片管理系统为例,图片收集系统和图片处理系统是两个不同的系统,他们各自都可以独立地扩展,他们之间具备松耦合特性,摄取照片然后扔进队列里面,图片处理系统可以拉取队列里面的图片再对其进行其他处理。
AWS Lambda,用于代码部署和服务管理。
1. 提供解耦你的应用程序的工具。
2. 在前面图片系统的例子中,Lambda可以响应S3的事件,就像S3中某文件被增加时Lambda相关函数会被自动触发去处理一些逻辑。
3. 已经在EC2上集成,供应用扩展。
百万级别用户
当用户数量达到百万级别时,这就要求前面提到的所有方案都要综合考虑。
1. 扩展多为可用区域。
2. 在所有层之间使用弹性负载均衡,不仅在web层使用,而且还要在应用层、数据层以及应用包含的其他所有层都必须使用弹性负载均衡。
3. 自动伸缩能力。
4. 面向服务的架构体系。
5. 巧妙使用S3和CloudFront部署一部分内容。
6. 在数据库前面引入缓存。
7. 将涉及状态的对象移除出Web层。
使用Amazon SES发送邮件。
使用CloudWatch监控。

 

千万级别用户
当我们的系统变得越来越大,我们会在数据层遇到一些问题,你可能会遇到竞争写主库的数据库问题,这也就意味着你最多只能发送这么多写流量到一台服务器上。
你如何解决此问题?
1. Federation,根据你的应用功能把数据库分成多个库。
2. Sharding,分表分片,使用多个服务器分片。
3. 把部分数据迁移到其他类型的数据库上,例如NoSQL、graph等。
Federation——根据应用功能切分成多个库
1. 例如,创建一个论坛数据库、一个用户数据库、一个产品数据库,你可能之前就是一个数据库包含这所有类型的数据,所以现在要将他们拆分开。
2. 按照功能分离出来的数据库可以各自独立进行扩展。
3. 缺点:不能做跨数据库查询。
Sharding——将数据分割到多主机上
1. 应用层变得更加复杂,扩展能力更强。
2. 例如,对于用户数据库,三分之一的用户被发送到一个分片上,三分之一发到另一个分片上,最后三分之一发到第三个分片。
将数据迁移到其他类型的数据库上
1. 考虑NoSQL数据库。
2. 如果你的数据不要求复杂的join操作,比如说排行榜,日志数据,临时数据,热表,元数据/查找表等等,满足这些情况可以考虑迁移到NoSQL数据库上。
3. 这意味着他们可以各自单独扩展。
11000000用户
扩展是一个迭代的过程,当你的系统变得越来越大,你总有更多的事情需要你解决。
调整你的应用架构。
更多的SOA特性和功能。
从多可用区域到多区域。
自定义解决方案去解决你的特定问题,当用户量到达十亿级别时自定义解决方案是必要的。
深入分析你的整个应用栈。
回顾
使用多可用区域的基础设施提升可靠性。
使用自带扩展能力的服务,比如ELB,S3,SQS,SNS,DynamoDB等等。
每一层级都建立冗余,可扩展性和冗余性不是两个分开单独的概念,经常需要同时考虑两者。
刚开始使用传统关系型数据库。
在你的基础设施的里面和外面都考虑缓冲数据。
在你的基础设施中使用自动化工具。
确保你的应用有良好的指标采样、系统监控、日志记录,确保收集你的用户访问你的应用过程中产生的问题。
将各个层分拆成独立的SOA服务,让这些服务能保持最大的独立性,可以各自进行扩展,及时发生故障也不波及其他。
一旦做了足够的准备及可使用自动扩展功能。
不重复发明轮子,尽量使用托管服务而不是自己构建,除非非要不可。
必要的情况下转向NoSQL数据库。

参考资料
On HackerNews / On Reddit

http://aws.amazon.com/documentation

http://aws.amazon.com/architecture

http://aws.amazon.com/start-ups

http://aws.amazon.com/free

From:http://www.jfh.com/jfperiodical/article/1242

 

Flume+Kafka+Storm+Redis实时分析系统基本架构

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同。这篇文章的目的只是带大家入个门,让大家对实时分析技术有一个简单的认识,并和大家一起做学习交流。
文章的最后还有Troubleshooting,分享了作者在部署本文示例程序过程中所遇到的各种问题和解决方案。

系统基本架构

整个实时分析系统的架构就是先由电商系统的订单服务器产生订单日志, 然后使用Flume去监听订单日志,并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 接着由Storm系统消费Kafka中的消息,同时消费记录由Zookeeper集群管理,这样即使Kafka宕机重启后也能找到上次的消费记录,接着从上次宕机点继续从Kafka的Broker中进行消费。但是由于存在先消费后记录日志或者先记录后消费的非原子操作,如果出现刚好消费完一条消息并还没将信息记录到Zookeeper的时候就宕机的类似问题,或多或少都会存在少量数据丢失或重复消费的问题, 其中一个解决方案就是Kafka的Broker和Zookeeper都部署在同一台机子上。接下来就是使用用户定义好的Storm Topology去进行日志信息的分析并输出到Redis缓存数据库中(也可以进行持久化),最后用Web APP去读取Redis中分析后的订单信息并展示给用户。之所以在Flume和Storm中间加入一层Kafka消息系统,就是因为在高并发的条件下, 订单日志的数据会井喷式增长,如果Storm的消费速度(Storm的实时计算能力那是最快之一,但是也有例外, 而且据说现在Twitter的开源实时计算框架Heron比Storm还要快)慢于日志的产生速度,加上Flume自身的局限性,必然会导致大量数据滞后并丢失,所以加了Kafka消息系统作为数据缓冲区,而且Kafka是基于log File的消息系统,也就是说消息能够持久化在硬盘中,再加上其充分利用Linux的I/O特性,提供了可观的吞吐量。架构中使用Redis作为数据库也是因为在实时的环境下,Redis具有很高的读写速度。

业务背景
各大电商网站在合适的时间进行各种促销活动已是常态,在能为网站带来大量的流量和订单的同时,对于用户也有不小的让利,必然是大家伙儿喜闻乐见的。在促销活动期间,老板和运营希望能实时看到订单情况,老板开心,运营也能根据实时的订单数据调整运营策略,而让用户能实时看到网站的订单数据,也会勾起用户的购买欲。但是普通的离线计算系统已然不能满足在高并发环境下的实时计算要求,所以我们得使用专门实时计算系统,如:Storm, Heron, Spark Stream等,去满足类似的需求。
既然要分析订单数据,那必然在订单产生的时候要把订单信息记录在日志文件中。本文中,作者通过使用log4j2,以及结合自己之前开发电商系统的经验,写了一个订单日志生成模拟器,代码如下,能帮助大家随机产生订单日志。下面所展示的订单日志文件格式和数据就是我们本文中的分析目标,本文的案例中用来分析所有商家的订单总销售额并找出销售额钱20名的商家。

订单数据格式:
orderNumber: XX | orderDate: XX | paymentNumber: XX | paymentDate: XX | merchantName: XX | sku: [ skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;] | price: [ totalPrice: XX discount: XX paymentPrice: XX ]

订单日志生成程序:
使用log4j2将日志信息写入文件中,每小时滚动一次日志文件

  1.       
  2.         
  3.           
  4.         
  5.         
  6.           filePattern=”/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log”>
  7.             
  8.             
  9.                 
  10.             
  11.         
  12.       
  13.       
  14.         
  15.           
  16.           
  17.         
  18.       


      
        
          
        
    	
        	
       		
        		
      		
                  		
      
      
        
          
          
        
      

生成器代码:

  1. package com.guludada.ordersInfo;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Random;
  5. // Import log4j classes.
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. public class ordersInfoGenerator {
  9.     public enum paymentWays {
  10.         Wechat,Alipay,Paypal
  11.     }
  12.     public enum merchantNames {
  13.         优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
  14.         暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
  15.     }
  16.     public enum productNames {
  17.         黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
  18.         沙滩拖鞋
  19.     }
  20.     float[] skuPriceGroup = {299,399,699,899,1000,2000};
  21.     float[] discountGroup = {10,20,50,100};
  22.     float totalPrice = 0;
  23.     float discount = 0;
  24.     float paymentPrice = 0;
  25.     private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
  26.     private int logsNumber = 1000;
  27.     public void generate() {
  28.         for(int i = 0; i <= logsNumber; i++) {
  29.             logger.info(randomOrderInfo());
  30.         }
  31.     }
  32.     public String randomOrderInfo() {
  33.         SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
  34.         Date date = new Date();
  35.         String orderNumber = randomNumbers(5) + date.getTime();
  36.         String orderDate = sdf.format(date);
  37.         String paymentNumber = randomPaymentWays() + “-” + randomNumbers(8);
  38.         String paymentDate = sdf.format(date);
  39.         String merchantName = randomMerchantNames();
  40.         String skuInfo = randomSkus();
  41.         String priceInfo = calculateOrderPrice();
  42.         return “orderNumber: ” + orderNumber + ” | orderDate: ” + orderDate + ” | paymentNumber: ” +
  43.             paymentNumber + ” | paymentDate: ” + paymentDate + ” | merchantName: ” + merchantName +
  44.             ” | sku: ” + skuInfo + ” | price: ” + priceInfo;
  45.     }
  46.     private String randomPaymentWays() {
  47.         paymentWays[] paymentWayGroup = paymentWays.values();
  48.         Random random = new Random();
  49.         return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
  50.     }
  51.     private String randomMerchantNames() {
  52.         merchantNames[] merchantNameGroup = merchantNames.values();
  53.         Random random = new Random();
  54.         return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
  55.     }
  56.     private String randomProductNames() {
  57.         productNames[] productNameGroup = productNames.values();
  58.         Random random = new Random();
  59.         return productNameGroup[random.nextInt(productNameGroup.length)].name();
  60.     }
  61.     private String randomSkus() {
  62.         Random random = new Random();
  63.         int skuCategoryNum = random.nextInt(3);
  64.         String skuInfo =”[“;
  65.         totalPrice = 0;
  66.         for(int i = 1; i <= 3; i++) {
  67.             int skuNum = random.nextInt(3)+1;
  68.             float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
  69.             float totalSkuPrice = skuPrice * skuNum;
  70.             String skuName = randomProductNames();
  71.             String skuCode = randomCharactersAndNumbers(10);
  72.             skuInfo += ” skuName: ” + skuName + ” skuNum: ” + skuNum + ” skuCode: ” + skuCode
  73.                     + ” skuPrice: ” + skuPrice + ” totalSkuPrice: ” + totalSkuPrice + “;”;
  74.             totalPrice += totalSkuPrice;
  75.         }
  76.         skuInfo += ” ]”;
  77.         return skuInfo;
  78.     }
  79.     private String calculateOrderPrice() {
  80.         Random random = new Random();
  81.         discount = discountGroup[random.nextInt(discountGroup.length)];
  82.         paymentPrice = totalPrice – discount;
  83.         String priceInfo = “[ totalPrice: ” + totalPrice + ” discount: ” + discount + ” paymentPrice: ” + paymentPrice +” ]”;
  84.         return priceInfo;
  85.     }
  86.     private String randomCharactersAndNumbers(int length) {
  87.         String characters = “abcdefghijklmnopqrstuvwxyz0123456789”;
  88.         String randomCharacters = “”;
  89.                 Random random = new Random();
  90.                 for (int i = 0; i < length; i++) {
  91.               randomCharacters += characters.charAt(random.nextInt(characters.length()));
  92.                 }
  93.                 return randomCharacters;
  94.     }
  95.     private String randomNumbers(int length) {
  96.         String characters = “0123456789”;
  97.         String randomNumbers = “”;
  98.                 Random random = new Random();
  99.                 for (int i = 0; i < length; i++) {
  100.              randomNumbers += characters.charAt(random.nextInt(characters.length()));
  101.                 }
  102.                return randomNumbers;
  103.     }
  104.     public static void main(String[] args) {
  105.         ordersInfoGenerator generator = new ordersInfoGenerator();
  106.         generator.generate();
  107.     }
  108. }
package com.guludada.ordersInfo;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class ordersInfoGenerator {
	
	public enum paymentWays {
		Wechat,Alipay,Paypal
	}
	public enum merchantNames {
		优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
		暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
	}
	
	public enum productNames {
		黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
		沙滩拖鞋
	}
	
	float[] skuPriceGroup = {299,399,699,899,1000,2000};
	float[] discountGroup = {10,20,50,100};
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
	private int logsNumber = 1000;
	
	public void generate() {
				
		for(int i = 0; i <= logsNumber; i++) {			
			logger.info(randomOrderInfo());			
		}
	}
	
	public String randomOrderInfo() {
		
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");		
		Date date = new Date();		
		
		String orderNumber = randomNumbers(5) + date.getTime();
		
		String orderDate = sdf.format(date);
		
		String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
		
		String paymentDate = sdf.format(date);
		
		String merchantName = randomMerchantNames();
		
		String skuInfo = randomSkus();
		
		String priceInfo = calculateOrderPrice();
		
		return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
			paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName + 
			" | sku: " + skuInfo + " | price: " + priceInfo;
	}
		
	private String randomPaymentWays() {
		
		paymentWays[] paymentWayGroup = paymentWays.values();
		Random random = new Random();
		return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
	}
	
	private String randomMerchantNames() {
		
		merchantNames[] merchantNameGroup = merchantNames.values();
		Random random = new Random();
		return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
	}
	
	private String randomProductNames() {
		
		productNames[] productNameGroup = productNames.values();
		Random random = new Random();
		return productNameGroup[random.nextInt(productNameGroup.length)].name();
	}
	
	
	private String randomSkus() {
		
		Random random = new Random();
		int skuCategoryNum = random.nextInt(3);
		
		String skuInfo ="[";
		
		totalPrice = 0;
		for(int i = 1; i <= 3; i++) {
			
			int skuNum = random.nextInt(3)+1;
			float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
			float totalSkuPrice = skuPrice * skuNum;			
			String skuName = randomProductNames();
			String skuCode = randomCharactersAndNumbers(10);
			skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
					+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";		
			totalPrice += totalSkuPrice;
		}
		
		
		skuInfo += " ]";
		
		return skuInfo;
	}
	
	private String calculateOrderPrice() {
		
		Random random = new Random();
		discount = discountGroup[random.nextInt(discountGroup.length)];
		paymentPrice = totalPrice - discount;
		
		String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
		
		return priceInfo;
	}
	
	private String randomCharactersAndNumbers(int length) {
		
		String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
		String randomCharacters = "";  
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	  randomCharacters += characters.charAt(random.nextInt(characters.length()));  
                }  
                return randomCharacters;  
	}
	
	private String randomNumbers(int length) {
		
		String characters = "0123456789";
		String randomNumbers = "";   
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	 randomNumbers += characters.charAt(random.nextInt(characters.length()));  
                }  
               return randomNumbers;		
	}
	
	public static void main(String[] args) {
		
		ordersInfoGenerator generator = new ordersInfoGenerator();
		generator.generate();
	}
}

收集日志数据
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,FlumeApache的一个顶级项目,与Kafka也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。

Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件(Flume Event)并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分给下一个装在分布式系统中其它服务器上的Flume Agent进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。

在本文中,Flume的Source我们选择的是Exec Source,因为是实时系统,直接通过tail 命令来监听日志文件,而在Kafka的Broker集群端的Flume我们选择Kafka Sink 来把数据下沉到Kafka消息系统中。

下图是来自Flume官网里的Flume拉取数据的架构图:

图片来源:http://flume.apache.org/FlumeUserGuide.html

订单日志产生端的Flume配置文件如下:

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = exec
  5. agent.sources.origin.command = tail -F /export/data/trivial/app.log
  6. agent.sources.origin.channels = memorychannel
  7. agent.sources.origin.interceptors = i1
  8. agent.sources.origin.interceptors.i1.type = static
  9. agent.sources.origin.interceptors.i1.key = topic
  10. agent.sources.origin.interceptors.i1.value = ordersInfo
  11. agent.sinks.loggerSink.type = logger
  12. agent.sinks.loggerSink.channel = memorychannel
  13. agent.channels.memorychannel.type = memory
  14. agent.channels.memorychannel.capacity = 10000
  15. agent.sinks.target.type = avro
  16. agent.sinks.target.channel = memorychannel
  17. agent.sinks.target.hostname = 172.16.124.130
  18. agent.sinks.target.port = 4545
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /export/data/trivial/app.log
agent.sources.origin.channels = memorychannel

agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545

Kafka消息系统端Flume配置文件

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = avro
  5. agent.sources.origin.channels = memorychannel
  6. agent.sources.origin.bind = 0.0.0.0
  7. agent.sources.origin.port = 4545
  8. agent.sinks.loggerSink.type = logger
  9. agent.sinks.loggerSink.channel = memorychannel
  10. agent.channels.memorychannel.type = memory
  11. agent.channels.memorychannel.capacity = 5000000
  12. agent.channels.memorychannel.transactionCapacity = 1000000
  13. agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
  14. #agent.sinks.target.topic = bigdata
  15. agent.sinks.target.brokerList=localhost:9092
  16. agent.sinks.target.requiredAcks=1
  17. agent.sinks.target.batchSize=100
  18. agent.sinks.target.channel = memorychannel
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
#agent.sinks.target.topic = bigdata
agent.sinks.target.brokerList=localhost:9092
agent.sinks.target.requiredAcks=1
agent.sinks.target.batchSize=100
agent.sinks.target.channel = memorychannel

这里需要注意的是,在日志服务器端的Flume agent中我们配置了一个interceptors,这个是用来为Flume Event(Flume Event就是拉取到的一行行的日志信息)的头部添加key为“topic”的K-V键值对,这样这条抓取到的日志信息就会根据topic的值去到Kafka中指定的topic消息池中,当然还可以为Flume Event额外配置一个key为“Key”的键值对,Kafka Sink会根据key“Key”的值将这条日志信息下沉到不同的Kafka分片上,否则就是随机分配。在Kafka集群端的Flume配置里,有几个重要的参数需要注意,“topic”是指定抓取到的日志信息下沉到Kafka哪一个topic池中,如果之前Flume发送端为Flume Event添加了带有topic的头信息,则这里可以不用配置;brokerList就是配置Kafka集群的主机地址和端口;requireAcks=1是配置当下沉到Kafka的消息储存到特定partition的leader中成功后就返回确认消息,requireAcks=0是不需要确认消息成功写入Kafka中,requireAcks=-1是指不光需要确认消息被写入partition的leander中,还要确认完成该条消息的所有备份;batchSize配置每次下沉多少条消息,每次下沉的数量越多延迟也高。

Kafka消息系统
这一部分我们将谈谈Kafka的配置和使用,Kafka在我们的系统中实际上就相当于起到一个数据缓冲池的作用, 有点类似于ActiveQ的消息队列和Redis这样的缓存区的作用,但是更可靠,因为是基于log File的消息系统,数据不容易丢失,以及能记录数据的消费位置并且用户还可以自定义消息消费的起始位置,这就使得重复消费消息也可以得以实现,而且同时具有队列和发布订阅两种消息消费模式,十分灵活,并且与Storm的契合度很高,充分利用Linux系统的I/O提高读写速度等等。另一个要提的方面就是Kafka的Consumer是pull-based模型的,而Flume是push-based模型。push-based模型是尽可能大的消费数据,但是当生产者速度大于消费者时数据会被覆盖。而pull-based模型可以缓解这个压力,消费速度可以慢于生产速度,有空余时再拉取那些没拉取到的数据。

Kafka是一个分布式的高吞吐量的消息系统,同时兼有点对点和发布订阅两种消息消费模式Kafka主要由Producer,Consumer和Broker组成。Kafka中引入了一个叫“topic”的概念,用来管理不同种类的消息,不同类别的消息会记录在到其对应的topic池中,而这些进入到topic中的消息会被Kafka写入磁盘的log文件中进行持久化处理。Kafka会把消息写入磁盘的log file中进行持久化对于每一个topic里的消息log文件,Kafka都会对其进行分片处理,而每一个消息都会顺序写入中log分片中,并且被标上“offset”的标量来代表这条消息在这个分片中的顺序,并且这些写入的消息无论是内容还是顺序都是不可变的。所以Kafka和其它消息队列系统的一个区别就是它能做到分片中的消息是能顺序被消费的,但是要做到全局有序还是有局限性的,除非整个topic只有一个log分片。并且无论消息是否有被消费,这条消息会一直保存在log文件中,当留存时间足够长到配置文件中指定的retention的时间后,这条消息才会被删除以释放空间。对于每一个Kafka的Consumer,它们唯一要存的Kafka相关的元数据就是这个“offset”值,记录着Consumer在分片上消费到了哪一个位置。通常Kafka是使用Zookeeper来为每一个Consumer保存它们的offset信息,所以在启动Kafka之前需要有一个Zookeeper集群;而且Kafka默认采用的是先记录offset再读取数据的策略,这种策略会存在少量数据丢失的可能。不过用户可以灵活设置Consumer的“offset”的位置,在加上消息记录在log文件中,所以是可以重复消费消息的。log的分片和它们的备份分散保存在集群的服务器上,对于每一个partition,在集群上都会有一台这个partition存在服务器作为leader,而这个partitionpartition的其它备份所在的服务器做为follower,leader负责处理关于这个partition所有请求,而follower负责这个partition的其它备份的同步工作,当leader服务器宕机时,其中一个follower服务器就会被选举为新的leader。

一般的消息系统分为两种模式,一种是点对点的消费模式,也就是queuing模式,另一种是发布订阅模式,也就是publish-subscribe模式,而Kafka引入了一个Consumer Group的概念,使得其能兼有两种模式。在Kafka中,每一个consumer都会标明自己属于哪个consumer group,每个topic的消息都会分发给每一个subscribe了这个topic的所有consumer group中的一个consumer实例。所以当所有的consumers都在同一个consumer group中,那么就像queuing的消息系统,一个message一次只被一个consumer消费。如果每一个consumer都有不同consumer group,那么就像public-subscribe消息系统一样,一个消息分发给所有的consumer实例。对于普通的消息队列系统,可能存在多个consumer去同时消费message,虽然message是有序地分发出去的,但是由于网络延迟的时候到达不同的consumer的时间不是顺序的,这时就失去了顺序性,解决方案是只用一个consumer去消费message,但显然不太合适。而对于Kafka来说,一个partiton只分发给每一个consumer group中的一个consumer实例,也就是说这个partition只有一个consumer实例在消费,所以可以保证在一个partition内部数据的处理是有序的,不同之处就在于Kafka内部消息进行了分片处理,虽然看上去也是单consumer的做法,但是分片机制保证了并发消费。如果要做到全局有序,那么整个topic中的消息只有一个分片,并且每一个consumer group中只能有一个consumer实例。这实际上就是彻底牺牲了消息消费时的并发度。

 

Kafka的配置和部署十分简单
1. 首先启动Zookeeper集群,Kafka需要Zookeeper集群来帮助记录每一个Consumer的offset
2. 为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有两个节点的Kafka集群,那么节点1和节点2的最基本的配置如下:

  1. config/server-1.properties:
  2.     broker.id=1
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=export/data/kafka
    zookeeper.connect=localhost:2181
  1. config/server-2.properties:
  2.     broker.id=2
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=/export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9093
    log.dir=/export/data/kafka
    zookeeper.connect=localhost:2181

broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。
3. 配置完上面的配置文件后,只要分别在节点上
输入下面命令启动Kafka进程就可以使用了

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...


Storm实时计算框架

接下来开始介绍本篇文章要使用的实时计算框架StormStrom是一个非常快的实时计算框架,至于快到什么程度呢?官网首页给出的数据是每一个Storm集群上的节点每一秒能处理一百万条数据。相比Hadoop“Mapreduce”计算框架,Storm使用的是"Topology"Mapreduce程序在计算完成后最终会停下来,而Topology则是会永远运行下去除非你显式地使用“kill -9 XXX”命令停掉它。和大多数的集群系统一样,Storm集群也存在着Master节点和Worker节点,在Master节点上运行的一个守护进程叫“Nimbus”,类似于Hadoop“JobTracker”的功能,负责集群中计算程序的分发,任务的分发,监控任务和工作节点的运行情况等;Worker节点上运行的守护进程叫“Supervisor”,负责接收Nimbus分发的任务并运行,每一个Worker上都会运行着Topology程序的一部分,而一个Topology程序的运行就是由集群上多个Worker一起协同工作的。值得注意的是NimubsSupervisor之间的协调工作也是通过Zookeeper来管理的,NimbusSupervisor自己本身在集群上都是无状态的,它们的状态都保存在Zookeeper上,所以任何节点的宕机和动态扩容都不会影响整个集群的工作运行,并支持fast-fail机制。

Storm有一个很重要的对数据的抽象概念,叫做“Stream”,我们姑且称之为数据流,数据流Stream就是由之间没有任何关系的松散的一个一个的数据元组“tuples”所组成的序列。要在Storm上做实时计算,首先你得有一个计算程序,这就是“Topology”,一个Topology程序由“Spout”“Bolt”共同组成。Storm就是通过Topology程序将数据流Stream通过可靠(ACK机制)的分布式计算生成我们的目标数据流Stream,就比如说把婚恋网站上当日注册的所有用户信息数据流Stream通过Topology程序计算出月收入上万年龄在30岁以下的新的用户信息流Stream。在我们的文章中,Spout就是实现了特定接口的Java类,它相当于数据源,用于产生数据或者从外部接收数据;而Bolt就是实现了Storm Bolt接口的Java类,用于消费从Spout发送出来的数据流并实现用户自定义的数据处理逻辑;对于复杂的数据处理,可以定义多个连续的Bolt去协同处理。最后在程序中通过SpoutBolt生成Topology对象并提交到Storm集群上执行。

tuplesStorm的数据模型,,由值和其所对应的field所组成,比如说在SpoutBolt中定义了发出的元组的field为:(name,age,gender),那么从这个SpoutBolt中发出的数据流的每一个元组值就类似于(''咕噜大大",27,"中性")Storm中还有一个Stream Group的概念,它用来决定从Spout或或或Bolt组件中发出的tuples接下来应该传到哪一个组件中或者更准确地说在程序里设置某个组件应该接收来自哪一个组件的tuples; 并且在Storm中提供了多个用于数据流分组的机制,比如说shuffleGrouping,用来将当前组件产生的tuples随机分发到下一个组件中,或者 fieldsGrouping,根据tuplesfield值来决定当前组件产生的tuples应该分发到哪一个组件中。
 
另一部分需要了解的就是Stormtasksworkers的概念。每一个worker都是一个运行在物理机器上的JVM进程,每个worker中又运行着多个task线程,这些task线程可能是Spout任务也可能是Bolt任务,由Nimbus根据RoundRobin负载均衡策略来分配,而至于在整个Topology程序里要起几个Spout线程或Bolt线程,也就是tasks,由用户在程序中设置并发度来决定。

Storm集群的配置文件如下:
Storm的配置文件在项目的conf目录下,也就是:conf/storm.yaml

  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements.  See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership.  The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License.  You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ########### These MUST be filled in for a storm configuration
  17. storm.zookeeper.servers:
  18.   - "ymhHadoop"
  19.   - "ymhHadoop2"
  20.   - "ymhHadoop3"
  21. storm.local.dir: "/export/data/storm/workdir"
  22. nimbus.host: "ymhHadoop"
  23. supervisor.slots.ports:
  24.   -6700
  25.   -6701
  26.   -6702
  27.   -6703
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
  - "ymhHadoop"
  - "ymhHadoop2"
  - "ymhHadoop3"    

storm.local.dir: "/export/data/storm/workdir"
 
nimbus.host: "ymhHadoop"

supervisor.slots.ports:
  -6700
  -6701
  -6702
  -6703 
 
storm.zookeeper.servers自然就是用来配置我们熟悉的Zookeeper集群中各个节点的URI地址和端口的
storm.local.dir 是用来配置storm节点相关文件的存储目录的,每一个storm集群的节点在本地服务器上都要有一个目录存储少量的和该节点有关的一些信息。记得要开发这个目录的读写权限哦
nimbus.host 自然就是用来指定nimbus服务器的URI
supervisor.slots.ports 这个是用来配置supervisor服务器启动的worker所监听的端口,每一个worker就是一个物理的JVM进程。上面这些是基本配置,并且要严格按照上面的格式来,少一个空格都会报错。

接下来就是将配置文件拷贝到集群的各个机器上,然后在分别在nimbussupervisor机器上通过$bin/storm nimbus $bin/storm supervisor命令来启动集群上的机子。最后在nimbus上通过$bin/storm UI 命令可以启动Storm提供的UI界面,功能十分强大,可以监控集群上各个节点的运行状态,提交Topology任务,监控Topology任务的运行情况等。这个UI界面可以通过http://{nimbus host}:8080的地址访问到。



Redis数据库

Redis是一个基于内存的多种数据结构的存储工具,经常有人说Redis是一个基于key-value数据结构的缓存数据库,这种说法必然是不准确的,Key-Value只是其中的一种数据结构的实现,Redis支持Stringshasheslistssetssorted sets等多种常见的数据结构,并提供了功能强大的范围查询,以及提供了INCRINCRBY,DECR,DECRBY等多种原子命令操作,保证在并发的环境下不会出现脏数据。虽然Redis是基于内存的数据库,但也提供了多种硬盘持久化策略,比如说RDB策略,用来将某个时间点的Redis的数据快照存储在硬盘中,或者是AOF策略,将每一个Redis操作命令都不可变的顺序记录在log文件中,恢复数据时就将log文件中的所有命令顺序执行一遍等等。Redis不光可以作为网站热点数据的缓存服务器,还可以用来做数据库,或者消息队列服务器的broker等。在本文中选择Redis作为订单分析结果的存储工具,一方面是其灵活的数据结构和强大的数据操作命令,另一方面就是在大数据的实时计算环境下,需要Redis这样的具备高速I/O的数据库。
 
在本文的例子中,作者使用Sorted Sets数据结构来存储各个商家的总订单销售额,Sorted Sets数据结构由Key, Scoreelement value 三部分组成,Set的数据结构保证同一个key中的元素值不会重复,而在Sorted Sets结构中是通过 Score来为元素值排序,这很自然地就能将各个商家的总订单销售额设置为Score,然后商家名称为element value,这样就能根据总订单销售额来为商家排序。在Storm程序中,我们通过Jedis API来调用Redis
$ZINCRBY KEY INCREMENT MEMBER
的命令来统计商家总销售额, ZINCRBY是一个原子命令,能保证在Storm的并发计算的环境下,正确地增加某个商家的Score的值,也就是它们的订单总销售额。而对于两个商家同名这种情况应该在业务系统中去避免而不应该由我们的数据分析层来处理。最后提一个小trips,就是如果所有商家的Score都设置成相同的分数,那么Redis就会默认使用商家名的字母字典序来排序。

Kafka+Storm+Redis的整合
当数据被Flume拉取进Kafka消息系统中,我们就可以使用Storm来进行消费,Redis来对结果进行存储。StormKafka有很好的兼容性,我们可以通过Kafka Spout来从Kafka中获取数据;在Bolt处理完数据后,通过Jedis API在程序中将数据存储在Redis数据库中。

下面就是Kafka Spout和创建Topology的程序代码:

BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
zkHosts是用来指定Zookeeper集群的节点的URI和端口,而Zookeeper集群是用来记录SpoutKafka消息消费的offset位置

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
主要是用来将SpoutKafka拉取来的byte[]数组格式的数据转化为Stormtuples

  1. package com.guludada.ordersanalysis;
  2. import java.util.UUID;
  3. import backtype.storm.Config;
  4. import backtype.storm.LocalCluster;
  5. import backtype.storm.StormSubmitter;
  6. import backtype.storm.generated.AlreadyAliveException;
  7. import backtype.storm.generated.InvalidTopologyException;
  8. import backtype.storm.spout.SchemeAsMultiScheme;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. import storm.kafka.Broker;
  12. import storm.kafka.BrokerHosts;
  13. import storm.kafka.KafkaSpout;
  14. import storm.kafka.SpoutConfig;
  15. import storm.kafka.StaticHosts;
  16. import storm.kafka.StringScheme;
  17. import storm.kafka.ZkHosts;
  18. import storm.kafka.trident.GlobalPartitionInformation;
  19. public class ordersAnalysisTopology {
  20.     private static String topicName = "ordersInfo";
  21.     private static String zkRoot = "/stormKafka/"+topicName;
  22.     public static void main(String[] args) {
  23.         BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
  24.         SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
  25.         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  26.         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  27.         TopologyBuilder builder = new TopologyBuilder();
  28.         builder.setSpout("kafkaSpout",kafkaSpout);
  29.         builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");
  30.         Config conf = new Config();
  31.         conf.setDebug(true);
  32.         if(args != null && args.length > 0) {
  33.             conf.setNumWorkers(1);
  34.             try {
  35.                 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  36.             } catch (AlreadyAliveException e) {
  37.                 // TODO Auto-generated catch block
  38.                 e.printStackTrace();
  39.             } catch (InvalidTopologyException e) {
  40.                 // TODO Auto-generated catch block
  41.                 e.printStackTrace();
  42.             }
  43.         } else {
  44.             conf.setMaxSpoutPending(3);
  45.             LocalCluster cluster = new LocalCluster();
  46.             cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
  47.         }
  48.     }
  49. }
package com.guludada.ordersanalysis;

import java.util.UUID;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.Broker;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.GlobalPartitionInformation;

public class ordersAnalysisTopology {
	
	private static String topicName = "ordersInfo";
	private static String zkRoot = "/stormKafka/"+topicName;
	
	public static void main(String[] args) {
		
		BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");

		
		SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();        
		builder.setSpout("kafkaSpout",kafkaSpout);        
		builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");

		Config conf = new Config();
		conf.setDebug(true);
		
		if(args != null && args.length > 0) {
			conf.setNumWorkers(1);
			try {
				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} else {
			
			conf.setMaxSpoutPending(3);
			
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
			
			
		}

	}
}

下面是Bolt程序,主要是用来处理从Kafka拉取到的订单日志信息, 并计算出所有商家的总订单收入,然后使用Jedis API将计算结果存入到Redis数据库中。

 

  1. package com.guludada.domain;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. public class ordersBean {
  5.     Date createTime = null;
  6.     String number = "";
  7.     String paymentNumber = "";
  8.     Date paymentDate = null;
  9.     String merchantName = "";
  10.     ArrayList skuGroup = null;
  11.     float totalPrice = 0;
  12.     float discount = 0;
  13.     float paymentPrice = 0;
  14.     public Date getCreateTime() {
  15.         return createTime;
  16.     }
  17.     public void setCreateTime(Date createTime) {
  18.         this.createTime = createTime;
  19.     }
  20.     public String getNumber() {
  21.         return number;
  22.     }
  23.     public void setNumber(String number) {
  24.         this.number = number;
  25.     }
  26.     public String getPaymentNumber() {
  27.         return paymentNumber;
  28.     }
  29.     public void setPaymentNumber(String paymentNumber) {
  30.         this.paymentNumber = paymentNumber;
  31.     }
  32.     public Date getPaymentDate() {
  33.         return paymentDate;
  34.     }
  35.     public void setPaymentDate(Date paymentDate) {
  36.         this.paymentDate = paymentDate;
  37.     }
  38.     public String getMerchantName() {
  39.         return merchantName;
  40.     }
  41.     public void setMerchantName(String merchantName) {
  42.         this.merchantName = merchantName;
  43.     }
  44.     public ArrayList getSkuGroup() {
  45.         return skuGroup;
  46.     }
  47.     public void setSkuGroup(ArrayList skuGroup) {
  48.         this.skuGroup = skuGroup;
  49.     }
  50.     public float getTotalPrice() {
  51.         return totalPrice;
  52.     }
  53.     public void setTotalPrice(float totalPrice) {
  54.         this.totalPrice = totalPrice;
  55.     }
  56.     public float getDiscount() {
  57.         return discount;
  58.     }
  59.     public void setDiscount(float discount) {
  60.         this.discount = discount;
  61.     }
  62.     public float getPaymentPrice() {
  63.         return paymentPrice;
  64.     }
  65.     public void setPaymentPrice(float paymentPrice) {
  66.         this.paymentPrice = paymentPrice;
  67.     }
  68. }
package com.guludada.domain;

import java.util.ArrayList;
import java.util.Date;

public class ordersBean {

	Date createTime = null;
	String number = "";
	String paymentNumber = "";
	Date paymentDate = null;
	String merchantName = "";
	ArrayList skuGroup = null;
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	public Date getCreateTime() {
		return createTime;
	}
	public void setCreateTime(Date createTime) {
		this.createTime = createTime;
	}
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}
	public String getPaymentNumber() {
		return paymentNumber;
	}
	public void setPaymentNumber(String paymentNumber) {
		this.paymentNumber = paymentNumber;
	}
	public Date getPaymentDate() {
		return paymentDate;
	}
	public void setPaymentDate(Date paymentDate) {
		this.paymentDate = paymentDate;
	}
	public String getMerchantName() {
		return merchantName;
	}
	public void setMerchantName(String merchantName) {
		this.merchantName = merchantName;
	}
	public ArrayList getSkuGroup() {
		return skuGroup;
	}
	public void setSkuGroup(ArrayList skuGroup) {
		this.skuGroup = skuGroup;
	}
	public float getTotalPrice() {
		return totalPrice;
	}
	public void setTotalPrice(float totalPrice) {
		this.totalPrice = totalPrice;
	}
	public float getDiscount() {
		return discount;
	}
	public void setDiscount(float discount) {
		this.discount = discount;
	}
	public float getPaymentPrice() {
		return paymentPrice;
	}
	public void setPaymentPrice(float paymentPrice) {
		this.paymentPrice = paymentPrice;
	}
	
	
}

本文例子中用不到skusbean,所以这里作者就没有写委屈偷懒一下下

  1. package com.guludada.domain;
  2. public class skusBean {
  3.       ………………
  4. }
package com.guludada.domain;

public class skusBean {
      ………………
}

logInfoHandler用来过滤订单的日志信息,并保存到ordersBean和skusBean中,方便Bolt获取日志数据的各项属性进行处理

  1. package com.guludada.common;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. import com.guludada.domain.ordersBean;
  7. public class logInfoHandler {
  8.     SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  9.     public ordersBean getOrdersBean(String orderInfo) {
  10.         ordersBean order = new ordersBean();
  11.         //从日志信息中过滤出订单信息
  12.         Pattern orderPattern = Pattern.compile("orderNumber:.+");
  13.         Matcher orderMatcher = orderPattern.matcher(orderInfo);
  14.         if(orderMatcher.find()) {
  15.             String orderInfoStr = orderMatcher.group(0);
  16.             String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
  17.             //获取订单号
  18.             String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
  19.             order.setNumber(orderNum);
  20.             //获取创建时间
  21.             String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
  22.             try {
  23.                 order.setCreateTime(sdf_final.parse(orderCreateTime));
  24.             } catch (ParseException e) {
  25.                 // TODO Auto-generated catch block
  26.                 e.printStackTrace();
  27.             }
  28.             //获取商家名称
  29.             String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
  30.             order.setMerchantName(merchantName);
  31.             //获取订单总额
  32.             String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
  33.             String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
  34.             order.setTotalPrice(Float.parseFloat(totalPrice));
  35.             return order;
  36.         } else {
  37.             return order;
  38.         }
  39.     }
  40. }
package com.guludada.common;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.domain.ordersBean;

public class logInfoHandler {
	
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public ordersBean getOrdersBean(String orderInfo) {
		
		ordersBean order = new ordersBean();
		
		//从日志信息中过滤出订单信息
		Pattern orderPattern = Pattern.compile("orderNumber:.+");
		Matcher orderMatcher = orderPattern.matcher(orderInfo);
		if(orderMatcher.find()) {
			
			String orderInfoStr = orderMatcher.group(0);
			String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
			
			//获取订单号
			String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
			order.setNumber(orderNum);
						
			//获取创建时间
			String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
			try {
				order.setCreateTime(sdf_final.parse(orderCreateTime));
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
			//获取商家名称
			String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
			order.setMerchantName(merchantName);
			
			//获取订单总额
			String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
			String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
			order.setTotalPrice(Float.parseFloat(totalPrice));
						
			return order;
						
		} else {
			return order;
		}
	}
}

 

  1. package com.guludada.ordersanalysis;
  2. import java.util.Map;
  3. import com.guludada.common.logInfoHandler;
  4. import com.guludada.domain.ordersBean;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichBolt;
  9. import backtype.storm.tuple.Tuple;
  10. import redis.clients.jedis.Jedis;
  11. import redis.clients.jedis.JedisPool;
  12. import redis.clients.jedis.JedisPoolConfig;
  13. public class merchantsSalesAnalysisBolt extends BaseRichBolt {
  14.     private OutputCollector _collector;
  15.     logInfoHandler loginfohandler;
  16.     JedisPool pool;
  17.     public void execute(Tuple tuple) {
  18.         String orderInfo = tuple.getString(0);
  19.         ordersBean order = loginfohandler.getOrdersBean(orderInfo);
  20.         //store the salesByMerchant infomation into Redis
  21.         Jedis jedis = pool.getResource();
  22.         jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
  23.     }
  24.     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
  25.         this._collector = collector;
  26.         this.loginfohandler = new logInfoHandler();
  27.         this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
  28.     }
  29.     public void declareOutputFields(OutputFieldsDeclarer arg0) {
  30.         // TODO Auto-generated method stub
  31.     }
  32. }
package com.guludada.ordersanalysis;

import java.util.Map;

import com.guludada.common.logInfoHandler;
import com.guludada.domain.ordersBean;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class merchantsSalesAnalysisBolt extends BaseRichBolt {
	
	private OutputCollector _collector;
	logInfoHandler loginfohandler;
	JedisPool pool;

	public void execute(Tuple tuple) {
		String orderInfo = tuple.getString(0);
		ordersBean order = loginfohandler.getOrdersBean(orderInfo);
		
		//store the salesByMerchant infomation into Redis
		Jedis jedis = pool.getResource();
		jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this._collector = collector;
		this.loginfohandler = new logInfoHandler();
		this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
		
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		
	}

}

Topology项目的Maven配置文件

  1.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2.   4.0.0
  3.   com.guludada
  4.   Storm_OrdersAnalysis
  5.   war
  6.   0.0.1-SNAPSHOT
  7.   Storm_OrdersAnalysis Maven Webapp
  8.   http://maven.apache.org
  9.   
  10.     
  11.         org.apache.storm
  12.         storm-core
  13.         0.9.6
  14.         provided
  15.     
  16.     
  17.         org.apache.storm
  18.         storm-kafka
  19.         0.9.6
  20.     
  21.     
  22.         org.apache.kafka
  23.         kafka_2.10
  24.         0.9.0.1
  25.             
  26.                 
  27.                     org.apache.zookeeper
  28.                     zookeeper
  29.                 
  30.                 
  31.                     log4j
  32.                     log4j
  33.                 
  34.                 
  35.                     org.slf4j
  36.                     slf4j-log4j12
  37.                 
  38.             
  39.     
  40.     
  41.         redis.clients
  42.         jedis
  43.         2.8.1
  44.     
  45.   
  46.   
  47.     Storm_OrdersAnalysis
  48.     
  49.         
  50.             maven-assembly-plugin
  51.             
  52.                 
  53.                     jar-with-dependencies
  54.                 
  55.                 
  56.                    
  57.                      com.guludada.ordersanalysis.ordersAnalysisTopology
  58.                    
  59.                  
  60.              
  61.           
  62.       
  63.   

  4.0.0
  com.guludada
  Storm_OrdersAnalysis
  war
  0.0.1-SNAPSHOT
  Storm_OrdersAnalysis Maven Webapp
  http://maven.apache.org
  
    
		org.apache.storm
		storm-core
		0.9.6
		provided
	
	
        org.apache.storm
        storm-kafka
        0.9.6
    
    
    	org.apache.kafka
        kafka_2.10
        0.9.0.1
            
                
                    org.apache.zookeeper
                    zookeeper
                
                
                    log4j
                    log4j
                
                
                    org.slf4j
    				slf4j-log4j12
                
            
    
    
	    redis.clients
	    jedis
	    2.8.1
		
  
  
    Storm_OrdersAnalysis
    
		
			maven-assembly-plugin
			
				  
			    	jar-with-dependencies
			    
			    
			       
			         com.guludada.ordersanalysis.ordersAnalysisTopology
			       
			     
			 
		  
	  
  

maven配置文件中配置了一个官方推荐的maven-assembly-plugin插件,用来帮助用户方便地打包Topology程序的。只需要进入到项目的根路径,然后运行
$mvn assembly:assembly
命令就可以打包好Topologyjar包了。

最后我带大家梳理一下整个项目的部署流程
1.  启动Zookeeper
2. 启动Kafka
3. 启动Flume将程序拉取到Kafka
4. 启动Storm集群
5. 启动Redis服务端  通过命令
$ src/redis-server
6. 提交打包好的Topology程序到Storm集群中通过Storm UI 或者命令$storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
7. 启动RedisCLI客户端查看结果通过命令
$ src/redis-cli --raw
$  zrange key 0 -1 withscores

如下图:

 

Troubleshooting
  1. 在使用maven同时导入storm-core, storm-kaka和kafka的依赖包的时候可能会出现jar包冲突导致无法初始化Log4jLoggerFactory,并无法启动Storm程序.解决方法也很简单,按照红字提示,把多余的jar包移除就行了,通过在maven的pom文件中kafka的依赖设置部分加入下面的设置org.slf4jslf4j-log4j12
  2. 第一次执行Storm建立Topology时,作者遇到了一个十分低级的问题,就是发现明明Kafka的topic里有数据,可是Storm程序怎么都无法读取到数据,后来才从下面的文章中明白了问题的所在 http://m.blog.csdn.net/article/details?id=18615761  原因就在于Topology第一次启动前还没有在zookeeper中的zkRoot创建offset信息,Storm取不到offset信息就会使用默认的offset,也就是log文件中从最后一个元素开始读取信息,所以之前在kafka中的数据都无法读出来。Storm启动后,再往broker中写数据,这些后写的数据就能正确被Storm处理。
  3. 当Storm的topology传到Nimbus的时候,或者说你的Storm程序刚开始启动的时候可能会报关于JedisPool是一个无法序列化的对象而导致的错误:java.lang.RuntimeException:java.io.NotSerializableException: redis.clients.jedis.JedisPool 解决方案就是将Bolt类中外部的JedisPool初始化代码放入Bolt的prepare()方法中,如本文的代码示例所示
  4. 在Storm启动并开始连接Redis的时候,会报出连接被拒绝,因为Redis运行在protect mode模式下的错误。这是因为Storm程序是远程连接Redis的服务器端,如果Redis服务器端没有设置密码的话是拒绝远程连接的。解决方法也十分简单,关闭protect mode模式(强烈不推荐),或者使用下面命令为Redis设置密码就可以了$config set requirepass 123
  5. 向Storm提交Topology以后, Supervisor端会一直报“Kill XXXX No Such process”的错误,多数原因是提交的topology没有正确被执行,而Storm的日记中不会显示topology程序里的错误。解决方法就是启动Storm UI, 通过这个Storm自带的UI界面查看topology的运行情况,并且程序中的错误也会在UI界面中显示出来,能方便地查看topology程序的错误。

    6.kafka使用的时候的小问题:
        当在一台机子上启动kafka producer客户端的时候,是无法在同一台机子上继续启动kafka的consumer客户端的,因为这两个进程可能占用的同一个端口,需要在另外一台机子上启动kafka consumer程序,这样就能看见正确的结果了

最后,感谢所有耐心看完这篇文章的人,楼主也深感自己的技术水平和语言表达还有很多需要提高的地方,希望能和大家一起交流学习共同进步,欢迎大家留下宝贵的意见和评论!还有再最后吐槽一下,CSDN的文章编辑器在我的MAC系统的火狐浏览器下十分十分十分十分难用,字体格式等根本不受控制,各种莫名其妙的BUG…………

from:http://blog.csdn.net/ymh198816/article/details/51998085