Flume+Kafka+Storm+Redis实时分析系统基本架构

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同。这篇文章的目的只是带大家入个门,让大家对实时分析技术有一个简单的认识,并和大家一起做学习交流。
文章的最后还有Troubleshooting,分享了作者在部署本文示例程序过程中所遇到的各种问题和解决方案。

系统基本架构

整个实时分析系统的架构就是先由电商系统的订单服务器产生订单日志, 然后使用Flume去监听订单日志,并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 接着由Storm系统消费Kafka中的消息,同时消费记录由Zookeeper集群管理,这样即使Kafka宕机重启后也能找到上次的消费记录,接着从上次宕机点继续从Kafka的Broker中进行消费。但是由于存在先消费后记录日志或者先记录后消费的非原子操作,如果出现刚好消费完一条消息并还没将信息记录到Zookeeper的时候就宕机的类似问题,或多或少都会存在少量数据丢失或重复消费的问题, 其中一个解决方案就是Kafka的Broker和Zookeeper都部署在同一台机子上。接下来就是使用用户定义好的Storm Topology去进行日志信息的分析并输出到Redis缓存数据库中(也可以进行持久化),最后用Web APP去读取Redis中分析后的订单信息并展示给用户。之所以在Flume和Storm中间加入一层Kafka消息系统,就是因为在高并发的条件下, 订单日志的数据会井喷式增长,如果Storm的消费速度(Storm的实时计算能力那是最快之一,但是也有例外, 而且据说现在Twitter的开源实时计算框架Heron比Storm还要快)慢于日志的产生速度,加上Flume自身的局限性,必然会导致大量数据滞后并丢失,所以加了Kafka消息系统作为数据缓冲区,而且Kafka是基于log File的消息系统,也就是说消息能够持久化在硬盘中,再加上其充分利用Linux的I/O特性,提供了可观的吞吐量。架构中使用Redis作为数据库也是因为在实时的环境下,Redis具有很高的读写速度。

业务背景
各大电商网站在合适的时间进行各种促销活动已是常态,在能为网站带来大量的流量和订单的同时,对于用户也有不小的让利,必然是大家伙儿喜闻乐见的。在促销活动期间,老板和运营希望能实时看到订单情况,老板开心,运营也能根据实时的订单数据调整运营策略,而让用户能实时看到网站的订单数据,也会勾起用户的购买欲。但是普通的离线计算系统已然不能满足在高并发环境下的实时计算要求,所以我们得使用专门实时计算系统,如:Storm, Heron, Spark Stream等,去满足类似的需求。
既然要分析订单数据,那必然在订单产生的时候要把订单信息记录在日志文件中。本文中,作者通过使用log4j2,以及结合自己之前开发电商系统的经验,写了一个订单日志生成模拟器,代码如下,能帮助大家随机产生订单日志。下面所展示的订单日志文件格式和数据就是我们本文中的分析目标,本文的案例中用来分析所有商家的订单总销售额并找出销售额钱20名的商家。

订单数据格式:
orderNumber: XX | orderDate: XX | paymentNumber: XX | paymentDate: XX | merchantName: XX | sku: [ skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;] | price: [ totalPrice: XX discount: XX paymentPrice: XX ]

订单日志生成程序:
使用log4j2将日志信息写入文件中,每小时滚动一次日志文件

  1. <?xml version=”1.0″ encoding=”UTF-8″?>
  2. <Configuration status=”INFO”>
  3.       <Appenders>
  4.         <Console name=”myConsole” target=”SYSTEM_OUT”>
  5.           <PatternLayout pattern=”%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} – %msg%n”/>
  6.         </Console>
  7.         <RollingFile name=”myFile” fileName=”/Users/guludada/Desktop/logs/app.log”
  8.           filePattern=”/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log”>
  9.             <PatternLayout pattern=”%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} – %msg%n”/>
  10.             <Policies>
  11.                 <TimeBasedTriggeringPolicy />
  12.             </Policies>
  13.         </RollingFile>
  14.       </Appenders>
  15.       <Loggers>
  16.         <Root level=”Info”>
  17.           <AppenderRef ref=”myConsole”/>
  18.           <AppenderRef ref=”myFile”/>
  19.         </Root>
  20.       </Loggers>
  21. </Configuration>
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
      <Appenders>
        <Console name="myConsole" target="SYSTEM_OUT">
          <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    	<RollingFile name="myFile" fileName="/Users/guludada/Desktop/logs/app.log"
          filePattern="/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log">
        	<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
       		<Policies>
        		<TimeBasedTriggeringPolicy />
      		</Policies>
        </RollingFile>          		
      </Appenders>
      <Loggers>
        <Root level="Info">
          <AppenderRef ref="myConsole"/>
          <AppenderRef ref="myFile"/>
        </Root>
      </Loggers>
</Configuration>

生成器代码:

  1. package com.guludada.ordersInfo;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Random;
  5. // Import log4j classes.
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. public class ordersInfoGenerator {
  9.     public enum paymentWays {
  10.         Wechat,Alipay,Paypal
  11.     }
  12.     public enum merchantNames {
  13.         优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
  14.         暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
  15.     }
  16.     public enum productNames {
  17.         黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
  18.         沙滩拖鞋
  19.     }
  20.     float[] skuPriceGroup = {299,399,699,899,1000,2000};
  21.     float[] discountGroup = {10,20,50,100};
  22.     float totalPrice = 0;
  23.     float discount = 0;
  24.     float paymentPrice = 0;
  25.     private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
  26.     private int logsNumber = 1000;
  27.     public void generate() {
  28.         for(int i = 0; i <= logsNumber; i++) {
  29.             logger.info(randomOrderInfo());
  30.         }
  31.     }
  32.     public String randomOrderInfo() {
  33.         SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
  34.         Date date = new Date();
  35.         String orderNumber = randomNumbers(5) + date.getTime();
  36.         String orderDate = sdf.format(date);
  37.         String paymentNumber = randomPaymentWays() + “-” + randomNumbers(8);
  38.         String paymentDate = sdf.format(date);
  39.         String merchantName = randomMerchantNames();
  40.         String skuInfo = randomSkus();
  41.         String priceInfo = calculateOrderPrice();
  42.         return “orderNumber: ” + orderNumber + ” | orderDate: ” + orderDate + ” | paymentNumber: ” +
  43.             paymentNumber + ” | paymentDate: ” + paymentDate + ” | merchantName: ” + merchantName +
  44.             ” | sku: ” + skuInfo + ” | price: ” + priceInfo;
  45.     }
  46.     private String randomPaymentWays() {
  47.         paymentWays[] paymentWayGroup = paymentWays.values();
  48.         Random random = new Random();
  49.         return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
  50.     }
  51.     private String randomMerchantNames() {
  52.         merchantNames[] merchantNameGroup = merchantNames.values();
  53.         Random random = new Random();
  54.         return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
  55.     }
  56.     private String randomProductNames() {
  57.         productNames[] productNameGroup = productNames.values();
  58.         Random random = new Random();
  59.         return productNameGroup[random.nextInt(productNameGroup.length)].name();
  60.     }
  61.     private String randomSkus() {
  62.         Random random = new Random();
  63.         int skuCategoryNum = random.nextInt(3);
  64.         String skuInfo =”[“;
  65.         totalPrice = 0;
  66.         for(int i = 1; i <= 3; i++) {
  67.             int skuNum = random.nextInt(3)+1;
  68.             float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
  69.             float totalSkuPrice = skuPrice * skuNum;
  70.             String skuName = randomProductNames();
  71.             String skuCode = randomCharactersAndNumbers(10);
  72.             skuInfo += ” skuName: ” + skuName + ” skuNum: ” + skuNum + ” skuCode: ” + skuCode
  73.                     + ” skuPrice: ” + skuPrice + ” totalSkuPrice: ” + totalSkuPrice + “;”;
  74.             totalPrice += totalSkuPrice;
  75.         }
  76.         skuInfo += ” ]”;
  77.         return skuInfo;
  78.     }
  79.     private String calculateOrderPrice() {
  80.         Random random = new Random();
  81.         discount = discountGroup[random.nextInt(discountGroup.length)];
  82.         paymentPrice = totalPrice – discount;
  83.         String priceInfo = “[ totalPrice: ” + totalPrice + ” discount: ” + discount + ” paymentPrice: ” + paymentPrice +” ]”;
  84.         return priceInfo;
  85.     }
  86.     private String randomCharactersAndNumbers(int length) {
  87.         String characters = “abcdefghijklmnopqrstuvwxyz0123456789”;
  88.         String randomCharacters = “”;
  89.                 Random random = new Random();
  90.                 for (int i = 0; i < length; i++) {
  91.               randomCharacters += characters.charAt(random.nextInt(characters.length()));
  92.                 }
  93.                 return randomCharacters;
  94.     }
  95.     private String randomNumbers(int length) {
  96.         String characters = “0123456789”;
  97.         String randomNumbers = “”;
  98.                 Random random = new Random();
  99.                 for (int i = 0; i < length; i++) {
  100.              randomNumbers += characters.charAt(random.nextInt(characters.length()));
  101.                 }
  102.                return randomNumbers;
  103.     }
  104.     public static void main(String[] args) {
  105.         ordersInfoGenerator generator = new ordersInfoGenerator();
  106.         generator.generate();
  107.     }
  108. }
package com.guludada.ordersInfo;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class ordersInfoGenerator {
	
	public enum paymentWays {
		Wechat,Alipay,Paypal
	}
	public enum merchantNames {
		优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
		暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
	}
	
	public enum productNames {
		黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
		沙滩拖鞋
	}
	
	float[] skuPriceGroup = {299,399,699,899,1000,2000};
	float[] discountGroup = {10,20,50,100};
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
	private int logsNumber = 1000;
	
	public void generate() {
				
		for(int i = 0; i <= logsNumber; i++) {			
			logger.info(randomOrderInfo());			
		}
	}
	
	public String randomOrderInfo() {
		
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");		
		Date date = new Date();		
		
		String orderNumber = randomNumbers(5) + date.getTime();
		
		String orderDate = sdf.format(date);
		
		String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
		
		String paymentDate = sdf.format(date);
		
		String merchantName = randomMerchantNames();
		
		String skuInfo = randomSkus();
		
		String priceInfo = calculateOrderPrice();
		
		return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
			paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName + 
			" | sku: " + skuInfo + " | price: " + priceInfo;
	}
		
	private String randomPaymentWays() {
		
		paymentWays[] paymentWayGroup = paymentWays.values();
		Random random = new Random();
		return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
	}
	
	private String randomMerchantNames() {
		
		merchantNames[] merchantNameGroup = merchantNames.values();
		Random random = new Random();
		return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
	}
	
	private String randomProductNames() {
		
		productNames[] productNameGroup = productNames.values();
		Random random = new Random();
		return productNameGroup[random.nextInt(productNameGroup.length)].name();
	}
	
	
	private String randomSkus() {
		
		Random random = new Random();
		int skuCategoryNum = random.nextInt(3);
		
		String skuInfo ="[";
		
		totalPrice = 0;
		for(int i = 1; i <= 3; i++) {
			
			int skuNum = random.nextInt(3)+1;
			float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
			float totalSkuPrice = skuPrice * skuNum;			
			String skuName = randomProductNames();
			String skuCode = randomCharactersAndNumbers(10);
			skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
					+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";		
			totalPrice += totalSkuPrice;
		}
		
		
		skuInfo += " ]";
		
		return skuInfo;
	}
	
	private String calculateOrderPrice() {
		
		Random random = new Random();
		discount = discountGroup[random.nextInt(discountGroup.length)];
		paymentPrice = totalPrice - discount;
		
		String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
		
		return priceInfo;
	}
	
	private String randomCharactersAndNumbers(int length) {
		
		String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
		String randomCharacters = "";  
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	  randomCharacters += characters.charAt(random.nextInt(characters.length()));  
                }  
                return randomCharacters;  
	}
	
	private String randomNumbers(int length) {
		
		String characters = "0123456789";
		String randomNumbers = "";   
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	 randomNumbers += characters.charAt(random.nextInt(characters.length()));  
                }  
               return randomNumbers;		
	}
	
	public static void main(String[] args) {
		
		ordersInfoGenerator generator = new ordersInfoGenerator();
		generator.generate();
	}
}

收集日志数据
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,FlumeApache的一个顶级项目,与Kafka也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。

Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件(Flume Event)并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分给下一个装在分布式系统中其它服务器上的Flume Agent进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。

在本文中,Flume的Source我们选择的是Exec Source,因为是实时系统,直接通过tail 命令来监听日志文件,而在Kafka的Broker集群端的Flume我们选择Kafka Sink 来把数据下沉到Kafka消息系统中。

下图是来自Flume官网里的Flume拉取数据的架构图:

图片来源:http://flume.apache.org/FlumeUserGuide.html

订单日志产生端的Flume配置文件如下:

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = exec
  5. agent.sources.origin.command = tail -F /export/data/trivial/app.log
  6. agent.sources.origin.channels = memorychannel
  7. agent.sources.origin.interceptors = i1
  8. agent.sources.origin.interceptors.i1.type = static
  9. agent.sources.origin.interceptors.i1.key = topic
  10. agent.sources.origin.interceptors.i1.value = ordersInfo
  11. agent.sinks.loggerSink.type = logger
  12. agent.sinks.loggerSink.channel = memorychannel
  13. agent.channels.memorychannel.type = memory
  14. agent.channels.memorychannel.capacity = 10000
  15. agent.sinks.target.type = avro
  16. agent.sinks.target.channel = memorychannel
  17. agent.sinks.target.hostname = 172.16.124.130
  18. agent.sinks.target.port = 4545
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /export/data/trivial/app.log
agent.sources.origin.channels = memorychannel

agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545

Kafka消息系统端Flume配置文件

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = avro
  5. agent.sources.origin.channels = memorychannel
  6. agent.sources.origin.bind = 0.0.0.0
  7. agent.sources.origin.port = 4545
  8. agent.sinks.loggerSink.type = logger
  9. agent.sinks.loggerSink.channel = memorychannel
  10. agent.channels.memorychannel.type = memory
  11. agent.channels.memorychannel.capacity = 5000000
  12. agent.channels.memorychannel.transactionCapacity = 1000000
  13. agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
  14. #agent.sinks.target.topic = bigdata
  15. agent.sinks.target.brokerList=localhost:9092
  16. agent.sinks.target.requiredAcks=1
  17. agent.sinks.target.batchSize=100
  18. agent.sinks.target.channel = memorychannel
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
#agent.sinks.target.topic = bigdata
agent.sinks.target.brokerList=localhost:9092
agent.sinks.target.requiredAcks=1
agent.sinks.target.batchSize=100
agent.sinks.target.channel = memorychannel

这里需要注意的是,在日志服务器端的Flume agent中我们配置了一个interceptors,这个是用来为Flume Event(Flume Event就是拉取到的一行行的日志信息)的头部添加key为“topic”的K-V键值对,这样这条抓取到的日志信息就会根据topic的值去到Kafka中指定的topic消息池中,当然还可以为Flume Event额外配置一个key为“Key”的键值对,Kafka Sink会根据key“Key”的值将这条日志信息下沉到不同的Kafka分片上,否则就是随机分配。在Kafka集群端的Flume配置里,有几个重要的参数需要注意,“topic”是指定抓取到的日志信息下沉到Kafka哪一个topic池中,如果之前Flume发送端为Flume Event添加了带有topic的头信息,则这里可以不用配置;brokerList就是配置Kafka集群的主机地址和端口;requireAcks=1是配置当下沉到Kafka的消息储存到特定partition的leader中成功后就返回确认消息,requireAcks=0是不需要确认消息成功写入Kafka中,requireAcks=-1是指不光需要确认消息被写入partition的leander中,还要确认完成该条消息的所有备份;batchSize配置每次下沉多少条消息,每次下沉的数量越多延迟也高。

Kafka消息系统
这一部分我们将谈谈Kafka的配置和使用,Kafka在我们的系统中实际上就相当于起到一个数据缓冲池的作用, 有点类似于ActiveQ的消息队列和Redis这样的缓存区的作用,但是更可靠,因为是基于log File的消息系统,数据不容易丢失,以及能记录数据的消费位置并且用户还可以自定义消息消费的起始位置,这就使得重复消费消息也可以得以实现,而且同时具有队列和发布订阅两种消息消费模式,十分灵活,并且与Storm的契合度很高,充分利用Linux系统的I/O提高读写速度等等。另一个要提的方面就是Kafka的Consumer是pull-based模型的,而Flume是push-based模型。push-based模型是尽可能大的消费数据,但是当生产者速度大于消费者时数据会被覆盖。而pull-based模型可以缓解这个压力,消费速度可以慢于生产速度,有空余时再拉取那些没拉取到的数据。

Kafka是一个分布式的高吞吐量的消息系统,同时兼有点对点和发布订阅两种消息消费模式Kafka主要由Producer,Consumer和Broker组成。Kafka中引入了一个叫“topic”的概念,用来管理不同种类的消息,不同类别的消息会记录在到其对应的topic池中,而这些进入到topic中的消息会被Kafka写入磁盘的log文件中进行持久化处理。Kafka会把消息写入磁盘的log file中进行持久化对于每一个topic里的消息log文件,Kafka都会对其进行分片处理,而每一个消息都会顺序写入中log分片中,并且被标上“offset”的标量来代表这条消息在这个分片中的顺序,并且这些写入的消息无论是内容还是顺序都是不可变的。所以Kafka和其它消息队列系统的一个区别就是它能做到分片中的消息是能顺序被消费的,但是要做到全局有序还是有局限性的,除非整个topic只有一个log分片。并且无论消息是否有被消费,这条消息会一直保存在log文件中,当留存时间足够长到配置文件中指定的retention的时间后,这条消息才会被删除以释放空间。对于每一个Kafka的Consumer,它们唯一要存的Kafka相关的元数据就是这个“offset”值,记录着Consumer在分片上消费到了哪一个位置。通常Kafka是使用Zookeeper来为每一个Consumer保存它们的offset信息,所以在启动Kafka之前需要有一个Zookeeper集群;而且Kafka默认采用的是先记录offset再读取数据的策略,这种策略会存在少量数据丢失的可能。不过用户可以灵活设置Consumer的“offset”的位置,在加上消息记录在log文件中,所以是可以重复消费消息的。log的分片和它们的备份分散保存在集群的服务器上,对于每一个partition,在集群上都会有一台这个partition存在服务器作为leader,而这个partitionpartition的其它备份所在的服务器做为follower,leader负责处理关于这个partition所有请求,而follower负责这个partition的其它备份的同步工作,当leader服务器宕机时,其中一个follower服务器就会被选举为新的leader。

一般的消息系统分为两种模式,一种是点对点的消费模式,也就是queuing模式,另一种是发布订阅模式,也就是publish-subscribe模式,而Kafka引入了一个Consumer Group的概念,使得其能兼有两种模式。在Kafka中,每一个consumer都会标明自己属于哪个consumer group,每个topic的消息都会分发给每一个subscribe了这个topic的所有consumer group中的一个consumer实例。所以当所有的consumers都在同一个consumer group中,那么就像queuing的消息系统,一个message一次只被一个consumer消费。如果每一个consumer都有不同consumer group,那么就像public-subscribe消息系统一样,一个消息分发给所有的consumer实例。对于普通的消息队列系统,可能存在多个consumer去同时消费message,虽然message是有序地分发出去的,但是由于网络延迟的时候到达不同的consumer的时间不是顺序的,这时就失去了顺序性,解决方案是只用一个consumer去消费message,但显然不太合适。而对于Kafka来说,一个partiton只分发给每一个consumer group中的一个consumer实例,也就是说这个partition只有一个consumer实例在消费,所以可以保证在一个partition内部数据的处理是有序的,不同之处就在于Kafka内部消息进行了分片处理,虽然看上去也是单consumer的做法,但是分片机制保证了并发消费。如果要做到全局有序,那么整个topic中的消息只有一个分片,并且每一个consumer group中只能有一个consumer实例。这实际上就是彻底牺牲了消息消费时的并发度。

 

Kafka的配置和部署十分简单
1. 首先启动Zookeeper集群,Kafka需要Zookeeper集群来帮助记录每一个Consumer的offset
2. 为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有两个节点的Kafka集群,那么节点1和节点2的最基本的配置如下:

  1. config/server-1.properties:
  2.     broker.id=1
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=export/data/kafka
    zookeeper.connect=localhost:2181
  1. config/server-2.properties:
  2.     broker.id=2
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=/export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9093
    log.dir=/export/data/kafka
    zookeeper.connect=localhost:2181

broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。
3. 配置完上面的配置文件后,只要分别在节点上
输入下面命令启动Kafka进程就可以使用了

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...


Storm实时计算框架

接下来开始介绍本篇文章要使用的实时计算框架StormStrom是一个非常快的实时计算框架,至于快到什么程度呢?官网首页给出的数据是每一个Storm集群上的节点每一秒能处理一百万条数据。相比Hadoop“Mapreduce”计算框架,Storm使用的是“Topology”Mapreduce程序在计算完成后最终会停下来,而Topology则是会永远运行下去除非你显式地使用“kill -9 XXX”命令停掉它。和大多数的集群系统一样,Storm集群也存在着Master节点和Worker节点,在Master节点上运行的一个守护进程叫“Nimbus”,类似于Hadoop“JobTracker”的功能,负责集群中计算程序的分发,任务的分发,监控任务和工作节点的运行情况等;Worker节点上运行的守护进程叫“Supervisor”,负责接收Nimbus分发的任务并运行,每一个Worker上都会运行着Topology程序的一部分,而一个Topology程序的运行就是由集群上多个Worker一起协同工作的。值得注意的是NimubsSupervisor之间的协调工作也是通过Zookeeper来管理的,NimbusSupervisor自己本身在集群上都是无状态的,它们的状态都保存在Zookeeper上,所以任何节点的宕机和动态扩容都不会影响整个集群的工作运行,并支持fast-fail机制。

Storm有一个很重要的对数据的抽象概念,叫做“Stream”,我们姑且称之为数据流,数据流Stream就是由之间没有任何关系的松散的一个一个的数据元组“tuples”所组成的序列。要在Storm上做实时计算,首先你得有一个计算程序,这就是“Topology”,一个Topology程序由“Spout”“Bolt”共同组成。Storm就是通过Topology程序将数据流Stream通过可靠(ACK机制)的分布式计算生成我们的目标数据流Stream,就比如说把婚恋网站上当日注册的所有用户信息数据流Stream通过Topology程序计算出月收入上万年龄在30岁以下的新的用户信息流Stream。在我们的文章中,Spout就是实现了特定接口的Java类,它相当于数据源,用于产生数据或者从外部接收数据;而Bolt就是实现了Storm Bolt接口的Java类,用于消费从Spout发送出来的数据流并实现用户自定义的数据处理逻辑;对于复杂的数据处理,可以定义多个连续的Bolt去协同处理。最后在程序中通过SpoutBolt生成Topology对象并提交到Storm集群上执行。

tuplesStorm的数据模型,,由值和其所对应的field所组成,比如说在SpoutBolt中定义了发出的元组的field为:(name,age,gender),那么从这个SpoutBolt中发出的数据流的每一个元组值就类似于(”咕噜大大“,27,”中性“)Storm中还有一个Stream Group的概念,它用来决定从Spout或或或Bolt组件中发出的tuples接下来应该传到哪一个组件中或者更准确地说在程序里设置某个组件应该接收来自哪一个组件的tuples; 并且在Storm中提供了多个用于数据流分组的机制,比如说shuffleGrouping,用来将当前组件产生的tuples随机分发到下一个组件中,或者 fieldsGrouping,根据tuplesfield值来决定当前组件产生的tuples应该分发到哪一个组件中。
 
另一部分需要了解的就是Stormtasksworkers的概念。每一个worker都是一个运行在物理机器上的JVM进程,每个worker中又运行着多个task线程,这些task线程可能是Spout任务也可能是Bolt任务,由Nimbus根据RoundRobin负载均衡策略来分配,而至于在整个Topology程序里要起几个Spout线程或Bolt线程,也就是tasks,由用户在程序中设置并发度来决定。

Storm集群的配置文件如下:
Storm的配置文件在项目的conf目录下,也就是:conf/storm.yaml

  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements.  See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership.  The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # “License”); you may not use this file except in compliance
  7. # with the License.  You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an “AS IS” BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ########### These MUST be filled in for a storm configuration
  17. storm.zookeeper.servers:
  18.   – “ymhHadoop”
  19.   – “ymhHadoop2”
  20.   – “ymhHadoop3”
  21. storm.local.dir: “/export/data/storm/workdir”
  22. nimbus.host: “ymhHadoop”
  23. supervisor.slots.ports:
  24.   -6700
  25.   -6701
  26.   -6702
  27.   -6703
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
  - "ymhHadoop"
  - "ymhHadoop2"
  - "ymhHadoop3"    

storm.local.dir: "/export/data/storm/workdir"
 
nimbus.host: "ymhHadoop"

supervisor.slots.ports:
  -6700
  -6701
  -6702
  -6703 
 
storm.zookeeper.servers自然就是用来配置我们熟悉的Zookeeper集群中各个节点的URI地址和端口的
storm.local.dir 是用来配置storm节点相关文件的存储目录的,每一个storm集群的节点在本地服务器上都要有一个目录存储少量的和该节点有关的一些信息。记得要开发这个目录的读写权限哦
nimbus.host 自然就是用来指定nimbus服务器的URI
supervisor.slots.ports 这个是用来配置supervisor服务器启动的worker所监听的端口,每一个worker就是一个物理的JVM进程。上面这些是基本配置,并且要严格按照上面的格式来,少一个空格都会报错。

接下来就是将配置文件拷贝到集群的各个机器上,然后在分别在nimbussupervisor机器上通过$bin/storm nimbus $bin/storm supervisor命令来启动集群上的机子。最后在nimbus上通过$bin/storm UI 命令可以启动Storm提供的UI界面,功能十分强大,可以监控集群上各个节点的运行状态,提交Topology任务,监控Topology任务的运行情况等。这个UI界面可以通过http://{nimbus host}:8080的地址访问到。



Redis数据库

Redis是一个基于内存的多种数据结构的存储工具,经常有人说Redis是一个基于key-value数据结构的缓存数据库,这种说法必然是不准确的,Key-Value只是其中的一种数据结构的实现,Redis支持Stringshasheslistssetssorted sets等多种常见的数据结构,并提供了功能强大的范围查询,以及提供了INCRINCRBY,DECR,DECRBY等多种原子命令操作,保证在并发的环境下不会出现脏数据。虽然Redis是基于内存的数据库,但也提供了多种硬盘持久化策略,比如说RDB策略,用来将某个时间点的Redis的数据快照存储在硬盘中,或者是AOF策略,将每一个Redis操作命令都不可变的顺序记录在log文件中,恢复数据时就将log文件中的所有命令顺序执行一遍等等。Redis不光可以作为网站热点数据的缓存服务器,还可以用来做数据库,或者消息队列服务器的broker等。在本文中选择Redis作为订单分析结果的存储工具,一方面是其灵活的数据结构和强大的数据操作命令,另一方面就是在大数据的实时计算环境下,需要Redis这样的具备高速I/O的数据库。
 
在本文的例子中,作者使用Sorted Sets数据结构来存储各个商家的总订单销售额,Sorted Sets数据结构由Key, Scoreelement value 三部分组成,Set的数据结构保证同一个key中的元素值不会重复,而在Sorted Sets结构中是通过 Score来为元素值排序,这很自然地就能将各个商家的总订单销售额设置为Score,然后商家名称为element value,这样就能根据总订单销售额来为商家排序。在Storm程序中,我们通过Jedis API来调用Redis
$ZINCRBY KEY INCREMENT MEMBER
的命令来统计商家总销售额, ZINCRBY是一个原子命令,能保证在Storm的并发计算的环境下,正确地增加某个商家的Score的值,也就是它们的订单总销售额。而对于两个商家同名这种情况应该在业务系统中去避免而不应该由我们的数据分析层来处理。最后提一个小trips,就是如果所有商家的Score都设置成相同的分数,那么Redis就会默认使用商家名的字母字典序来排序。

Kafka+Storm+Redis的整合
当数据被Flume拉取进Kafka消息系统中,我们就可以使用Storm来进行消费,Redis来对结果进行存储。StormKafka有很好的兼容性,我们可以通过Kafka Spout来从Kafka中获取数据;在Bolt处理完数据后,通过Jedis API在程序中将数据存储在Redis数据库中。

下面就是Kafka Spout和创建Topology的程序代码:

BrokerHosts hosts = new ZkHosts(“ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181”);
zkHosts是用来指定Zookeeper集群的节点的URI和端口,而Zookeeper集群是用来记录SpoutKafka消息消费的offset位置

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
主要是用来将SpoutKafka拉取来的byte[]数组格式的数据转化为Stormtuples

  1. package com.guludada.ordersanalysis;
  2. import java.util.UUID;
  3. import backtype.storm.Config;
  4. import backtype.storm.LocalCluster;
  5. import backtype.storm.StormSubmitter;
  6. import backtype.storm.generated.AlreadyAliveException;
  7. import backtype.storm.generated.InvalidTopologyException;
  8. import backtype.storm.spout.SchemeAsMultiScheme;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. import storm.kafka.Broker;
  12. import storm.kafka.BrokerHosts;
  13. import storm.kafka.KafkaSpout;
  14. import storm.kafka.SpoutConfig;
  15. import storm.kafka.StaticHosts;
  16. import storm.kafka.StringScheme;
  17. import storm.kafka.ZkHosts;
  18. import storm.kafka.trident.GlobalPartitionInformation;
  19. public class ordersAnalysisTopology {
  20.     private static String topicName = “ordersInfo”;
  21.     private static String zkRoot = “/stormKafka/”+topicName;
  22.     public static void main(String[] args) {
  23.         BrokerHosts hosts = new ZkHosts(“ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181”);
  24.         SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
  25.         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  26.         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  27.         TopologyBuilder builder = new TopologyBuilder();
  28.         builder.setSpout(“kafkaSpout”,kafkaSpout);
  29.         builder.setBolt(“merchantsSalesBolt”, new merchantsSalesAnalysisBolt(), 2).shuffleGrouping(“kafkaSpout”);
  30.         Config conf = new Config();
  31.         conf.setDebug(true);
  32.         if(args != null && args.length > 0) {
  33.             conf.setNumWorkers(1);
  34.             try {
  35.                 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  36.             } catch (AlreadyAliveException e) {
  37.                 // TODO Auto-generated catch block
  38.                 e.printStackTrace();
  39.             } catch (InvalidTopologyException e) {
  40.                 // TODO Auto-generated catch block
  41.                 e.printStackTrace();
  42.             }
  43.         } else {
  44.             conf.setMaxSpoutPending(3);
  45.             LocalCluster cluster = new LocalCluster();
  46.             cluster.submitTopology(“ordersAnalysis”, conf, builder.createTopology());
  47.         }
  48.     }
  49. }
package com.guludada.ordersanalysis;

import java.util.UUID;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.Broker;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.GlobalPartitionInformation;

public class ordersAnalysisTopology {
	
	private static String topicName = "ordersInfo";
	private static String zkRoot = "/stormKafka/"+topicName;
	
	public static void main(String[] args) {
		
		BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");

		
		SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();        
		builder.setSpout("kafkaSpout",kafkaSpout);        
		builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");

		Config conf = new Config();
		conf.setDebug(true);
		
		if(args != null && args.length > 0) {
			conf.setNumWorkers(1);
			try {
				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} else {
			
			conf.setMaxSpoutPending(3);
			
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
			
			
		}

	}
}

下面是Bolt程序,主要是用来处理从Kafka拉取到的订单日志信息, 并计算出所有商家的总订单收入,然后使用Jedis API将计算结果存入到Redis数据库中。

 

  1. package com.guludada.domain;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. public class ordersBean {
  5.     Date createTime = null;
  6.     String number = “”;
  7.     String paymentNumber = “”;
  8.     Date paymentDate = null;
  9.     String merchantName = “”;
  10.     ArrayList<skusBean> skuGroup = null;
  11.     float totalPrice = 0;
  12.     float discount = 0;
  13.     float paymentPrice = 0;
  14.     public Date getCreateTime() {
  15.         return createTime;
  16.     }
  17.     public void setCreateTime(Date createTime) {
  18.         this.createTime = createTime;
  19.     }
  20.     public String getNumber() {
  21.         return number;
  22.     }
  23.     public void setNumber(String number) {
  24.         this.number = number;
  25.     }
  26.     public String getPaymentNumber() {
  27.         return paymentNumber;
  28.     }
  29.     public void setPaymentNumber(String paymentNumber) {
  30.         this.paymentNumber = paymentNumber;
  31.     }
  32.     public Date getPaymentDate() {
  33.         return paymentDate;
  34.     }
  35.     public void setPaymentDate(Date paymentDate) {
  36.         this.paymentDate = paymentDate;
  37.     }
  38.     public String getMerchantName() {
  39.         return merchantName;
  40.     }
  41.     public void setMerchantName(String merchantName) {
  42.         this.merchantName = merchantName;
  43.     }
  44.     public ArrayList<skusBean> getSkuGroup() {
  45.         return skuGroup;
  46.     }
  47.     public void setSkuGroup(ArrayList<skusBean> skuGroup) {
  48.         this.skuGroup = skuGroup;
  49.     }
  50.     public float getTotalPrice() {
  51.         return totalPrice;
  52.     }
  53.     public void setTotalPrice(float totalPrice) {
  54.         this.totalPrice = totalPrice;
  55.     }
  56.     public float getDiscount() {
  57.         return discount;
  58.     }
  59.     public void setDiscount(float discount) {
  60.         this.discount = discount;
  61.     }
  62.     public float getPaymentPrice() {
  63.         return paymentPrice;
  64.     }
  65.     public void setPaymentPrice(float paymentPrice) {
  66.         this.paymentPrice = paymentPrice;
  67.     }
  68. }
package com.guludada.domain;

import java.util.ArrayList;
import java.util.Date;

public class ordersBean {

	Date createTime = null;
	String number = "";
	String paymentNumber = "";
	Date paymentDate = null;
	String merchantName = "";
	ArrayList<skusBean> skuGroup = null;
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	public Date getCreateTime() {
		return createTime;
	}
	public void setCreateTime(Date createTime) {
		this.createTime = createTime;
	}
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}
	public String getPaymentNumber() {
		return paymentNumber;
	}
	public void setPaymentNumber(String paymentNumber) {
		this.paymentNumber = paymentNumber;
	}
	public Date getPaymentDate() {
		return paymentDate;
	}
	public void setPaymentDate(Date paymentDate) {
		this.paymentDate = paymentDate;
	}
	public String getMerchantName() {
		return merchantName;
	}
	public void setMerchantName(String merchantName) {
		this.merchantName = merchantName;
	}
	public ArrayList<skusBean> getSkuGroup() {
		return skuGroup;
	}
	public void setSkuGroup(ArrayList<skusBean> skuGroup) {
		this.skuGroup = skuGroup;
	}
	public float getTotalPrice() {
		return totalPrice;
	}
	public void setTotalPrice(float totalPrice) {
		this.totalPrice = totalPrice;
	}
	public float getDiscount() {
		return discount;
	}
	public void setDiscount(float discount) {
		this.discount = discount;
	}
	public float getPaymentPrice() {
		return paymentPrice;
	}
	public void setPaymentPrice(float paymentPrice) {
		this.paymentPrice = paymentPrice;
	}
	
	
}

本文例子中用不到skusbean,所以这里作者就没有写委屈偷懒一下下

  1. package com.guludada.domain;
  2. public class skusBean {
  3.       ………………
  4. }
package com.guludada.domain;

public class skusBean {
      ………………
}

logInfoHandler用来过滤订单的日志信息,并保存到ordersBean和skusBean中,方便Bolt获取日志数据的各项属性进行处理

  1. package com.guludada.common;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. import com.guludada.domain.ordersBean;
  7. public class logInfoHandler {
  8.     SimpleDateFormat sdf_final = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
  9.     public ordersBean getOrdersBean(String orderInfo) {
  10.         ordersBean order = new ordersBean();
  11.         //从日志信息中过滤出订单信息
  12.         Pattern orderPattern = Pattern.compile(“orderNumber:.+”);
  13.         Matcher orderMatcher = orderPattern.matcher(orderInfo);
  14.         if(orderMatcher.find()) {
  15.             String orderInfoStr = orderMatcher.group(0);
  16.             String[] orderInfoGroup = orderInfoStr.trim().split(“\\|”);
  17.             //获取订单号
  18.             String orderNum = (orderInfoGroup[0].split(“:”))[1].trim();
  19.             order.setNumber(orderNum);
  20.             //获取创建时间
  21.             String orderCreateTime = orderInfoGroup[1].trim().split(” “)[1] + ” ” + orderInfoGroup[1].trim().split(” “)[2];
  22.             try {
  23.                 order.setCreateTime(sdf_final.parse(orderCreateTime));
  24.             } catch (ParseException e) {
  25.                 // TODO Auto-generated catch block
  26.                 e.printStackTrace();
  27.             }
  28.             //获取商家名称
  29.             String merchantName = (orderInfoGroup[4].split(“:”))[1].trim();
  30.             order.setMerchantName(merchantName);
  31.             //获取订单总额
  32.             String orderPriceInfo = (orderInfoGroup[6].split(“price:”))[1].trim();
  33.             String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(” “))[1];
  34.             order.setTotalPrice(Float.parseFloat(totalPrice));
  35.             return order;
  36.         } else {
  37.             return order;
  38.         }
  39.     }
  40. }
package com.guludada.common;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.domain.ordersBean;

public class logInfoHandler {
	
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public ordersBean getOrdersBean(String orderInfo) {
		
		ordersBean order = new ordersBean();
		
		//从日志信息中过滤出订单信息
		Pattern orderPattern = Pattern.compile("orderNumber:.+");
		Matcher orderMatcher = orderPattern.matcher(orderInfo);
		if(orderMatcher.find()) {
			
			String orderInfoStr = orderMatcher.group(0);
			String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
			
			//获取订单号
			String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
			order.setNumber(orderNum);
						
			//获取创建时间
			String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
			try {
				order.setCreateTime(sdf_final.parse(orderCreateTime));
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
			//获取商家名称
			String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
			order.setMerchantName(merchantName);
			
			//获取订单总额
			String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
			String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
			order.setTotalPrice(Float.parseFloat(totalPrice));
						
			return order;
						
		} else {
			return order;
		}
	}
}

 

  1. package com.guludada.ordersanalysis;
  2. import java.util.Map;
  3. import com.guludada.common.logInfoHandler;
  4. import com.guludada.domain.ordersBean;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichBolt;
  9. import backtype.storm.tuple.Tuple;
  10. import redis.clients.jedis.Jedis;
  11. import redis.clients.jedis.JedisPool;
  12. import redis.clients.jedis.JedisPoolConfig;
  13. public class merchantsSalesAnalysisBolt extends BaseRichBolt {
  14.     private OutputCollector _collector;
  15.     logInfoHandler loginfohandler;
  16.     JedisPool pool;
  17.     public void execute(Tuple tuple) {
  18.         String orderInfo = tuple.getString(0);
  19.         ordersBean order = loginfohandler.getOrdersBean(orderInfo);
  20.         //store the salesByMerchant infomation into Redis
  21.         Jedis jedis = pool.getResource();
  22.         jedis.zincrby(“orderAna:topSalesByMerchant”, order.getTotalPrice(), order.getMerchantName());
  23.     }
  24.     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
  25.         this._collector = collector;
  26.         this.loginfohandler = new logInfoHandler();
  27.         this.pool = new JedisPool(new JedisPoolConfig(), “ymhHadoop”,6379,2 * 60000,”12345″);
  28.     }
  29.     public void declareOutputFields(OutputFieldsDeclarer arg0) {
  30.         // TODO Auto-generated method stub
  31.     }
  32. }
package com.guludada.ordersanalysis;

import java.util.Map;

import com.guludada.common.logInfoHandler;
import com.guludada.domain.ordersBean;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class merchantsSalesAnalysisBolt extends BaseRichBolt {
	
	private OutputCollector _collector;
	logInfoHandler loginfohandler;
	JedisPool pool;

	public void execute(Tuple tuple) {
		String orderInfo = tuple.getString(0);
		ordersBean order = loginfohandler.getOrdersBean(orderInfo);
		
		//store the salesByMerchant infomation into Redis
		Jedis jedis = pool.getResource();
		jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this._collector = collector;
		this.loginfohandler = new logInfoHandler();
		this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
		
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		
	}

}

Topology项目的Maven配置文件

  1. <project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
  2.   xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd”>
  3.   <modelVersion>4.0.0</modelVersion>
  4.   <groupId>com.guludada</groupId>
  5.   <artifactId>Storm_OrdersAnalysis</artifactId>
  6.   <packaging>war</packaging>
  7.   <version>0.0.1-SNAPSHOT</version>
  8.   <name>Storm_OrdersAnalysis Maven Webapp</name>
  9.   <url>http://maven.apache.org</url>
  10.   <dependencies>
  11.     <dependency>
  12.         <groupId>org.apache.storm</groupId>
  13.         <artifactId>storm-core</artifactId>
  14.         <version>0.9.6</version>
  15.         <scope>provided</scope>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.apache.storm</groupId>
  19.         <artifactId>storm-kafka</artifactId>
  20.         <version>0.9.6</version>
  21.     </dependency>
  22.     <dependency>
  23.         <groupId>org.apache.kafka</groupId>
  24.         <artifactId>kafka_2.10</artifactId>
  25.         <version>0.9.0.1</version>
  26.             <exclusions>
  27.                 <exclusion>
  28.                     <groupId>org.apache.zookeeper</groupId>
  29.                     <artifactId>zookeeper</artifactId>
  30.                 </exclusion>
  31.                 <exclusion>
  32.                     <groupId>log4j</groupId>
  33.                     <artifactId>log4j</artifactId>
  34.                 </exclusion>
  35.                 <exclusion>
  36.                     <groupId>org.slf4j</groupId>
  37.                     <artifactId>slf4j-log4j12</artifactId>
  38.                 </exclusion>
  39.             </exclusions>
  40.     </dependency>
  41.     <dependency>
  42.         <groupId>redis.clients</groupId>
  43.         <artifactId>jedis</artifactId>
  44.         <version>2.8.1</version>
  45.     </dependency>
  46.   </dependencies>
  47.   <build>
  48.     <finalName>Storm_OrdersAnalysis</finalName>
  49.     <plugins>
  50.         <plugin>
  51.             <artifactId>maven-assembly-plugin</artifactId>
  52.             <configuration>
  53.                 <descriptorRefs>
  54.                     <descriptorRef>jar-with-dependencies</descriptorRef>
  55.                 </descriptorRefs>
  56.                 <archive>
  57.                    <manifest>
  58.                      <mainClass>com.guludada.ordersanalysis.ordersAnalysisTopology</mainClass>
  59.                    </manifest>
  60.                  </archive>
  61.              </configuration>
  62.           </plugin>
  63.       </plugins>
  64.   </build>
  65. </project>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.guludada</groupId>
  <artifactId>Storm_OrdersAnalysis</artifactId>
  <packaging>war</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>Storm_OrdersAnalysis Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>0.9.6</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>0.9.6</version>
    </dependency>
    <dependency>
    	<groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
    				<artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
    </dependency>
    <dependency>
	    <groupId>redis.clients</groupId>
	    <artifactId>jedis</artifactId>
	    <version>2.8.1</version>
	</dependency>	
  </dependencies>
  <build>
    <finalName>Storm_OrdersAnalysis</finalName>
    <plugins>
		<plugin>
			<artifactId>maven-assembly-plugin</artifactId>
			<configuration>
				<descriptorRefs>  
			    	<descriptorRef>jar-with-dependencies</descriptorRef>
			    </descriptorRefs>
			    <archive>
			       <manifest>
			         <mainClass>com.guludada.ordersanalysis.ordersAnalysisTopology</mainClass>
			       </manifest>
			     </archive>
			 </configuration>
		  </plugin>
	  </plugins>
  </build>
</project>

maven配置文件中配置了一个官方推荐的maven-assembly-plugin插件,用来帮助用户方便地打包Topology程序的。只需要进入到项目的根路径,然后运行
$mvn assembly:assembly
命令就可以打包好Topologyjar包了。

最后我带大家梳理一下整个项目的部署流程
1.  启动Zookeeper
2. 启动Kafka
3. 启动Flume将程序拉取到Kafka
4. 启动Storm集群
5. 启动Redis服务端  通过命令
$ src/redis-server
6. 提交打包好的Topology程序到Storm集群中通过Storm UI 或者命令$storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
7. 启动RedisCLI客户端查看结果通过命令
$ src/redis-cli –raw
$  zrange key 0 -1 withscores

如下图:

 

Troubleshooting
  1. 在使用maven同时导入storm-core, storm-kaka和kafka的依赖包的时候可能会出现jar包冲突导致无法初始化Log4jLoggerFactory,并无法启动Storm程序.解决方法也很简单,按照红字提示,把多余的jar包移除就行了,通过在maven的pom文件中kafka的依赖设置部分加入下面的设置<exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion>
  2. 第一次执行Storm建立Topology时,作者遇到了一个十分低级的问题,就是发现明明Kafka的topic里有数据,可是Storm程序怎么都无法读取到数据,后来才从下面的文章中明白了问题的所在 http://m.blog.csdn.net/article/details?id=18615761  原因就在于Topology第一次启动前还没有在zookeeper中的zkRoot创建offset信息,Storm取不到offset信息就会使用默认的offset,也就是log文件中从最后一个元素开始读取信息,所以之前在kafka中的数据都无法读出来。Storm启动后,再往broker中写数据,这些后写的数据就能正确被Storm处理。
  3. 当Storm的topology传到Nimbus的时候,或者说你的Storm程序刚开始启动的时候可能会报关于JedisPool是一个无法序列化的对象而导致的错误:java.lang.RuntimeException:java.io.NotSerializableException: redis.clients.jedis.JedisPool 解决方案就是将Bolt类中外部的JedisPool初始化代码放入Bolt的prepare()方法中,如本文的代码示例所示
  4. 在Storm启动并开始连接Redis的时候,会报出连接被拒绝,因为Redis运行在protect mode模式下的错误。这是因为Storm程序是远程连接Redis的服务器端,如果Redis服务器端没有设置密码的话是拒绝远程连接的。解决方法也十分简单,关闭protect mode模式(强烈不推荐),或者使用下面命令为Redis设置密码就可以了$config set requirepass 123
  5. 向Storm提交Topology以后, Supervisor端会一直报“Kill XXXX No Such process”的错误,多数原因是提交的topology没有正确被执行,而Storm的日记中不会显示topology程序里的错误。解决方法就是启动Storm UI, 通过这个Storm自带的UI界面查看topology的运行情况,并且程序中的错误也会在UI界面中显示出来,能方便地查看topology程序的错误。

    6.kafka使用的时候的小问题:
        当在一台机子上启动kafka producer客户端的时候,是无法在同一台机子上继续启动kafka的consumer客户端的,因为这两个进程可能占用的同一个端口,需要在另外一台机子上启动kafka consumer程序,这样就能看见正确的结果了

最后,感谢所有耐心看完这篇文章的人,楼主也深感自己的技术水平和语言表达还有很多需要提高的地方,希望能和大家一起交流学习共同进步,欢迎大家留下宝贵的意见和评论!还有再最后吐槽一下,CSDN的文章编辑器在我的MAC系统的火狐浏览器下十分十分十分十分难用,字体格式等根本不受控制,各种莫名其妙的BUG…………

from:http://blog.csdn.net/ymh198816/article/details/51998085

发表评论

电子邮件地址不会被公开。 必填项已用*标注