All posts by dotte

Flume+Spark+Hive+Spark SQL离线分析系统

前段时间把Scala和Spark一起学习了,所以借此机会在这里做个总结,顺便和大家一起分享一下目前最火的分布式计算技术Spark!当然Spark不光是可以做离线计算,还提供了许多功能强大的组件,比如说,Spark Streaming 组件做实时计算,和Kafka等消息系统也有很好的兼容性;Spark Sql,可以让用户通过标准SQL语句操作从不同的数据源中过来的结构化数据;还提供了种类丰富的MLlib库方便用户做机器学习等等。Spark是由Scala语言编写而成的,Scala是运行在JVM上的面向函数的编程语言,它的学习过程简直反人类,可读性就我个人来看,也不是能广为让大众接受的语言,但是它功能强大,熟练后能极大提高开发速度,对于实现同样的功能,所需要写的代码量比Java少得多得多,这都得益于Scala的语言特性。本文借鉴作者之前写的另一篇关于Hadoop离线计算的文章,继续使用那篇文章中点击流分析的案例,只不过MapReduce部分改为由Spark离线计算来完成,同时,你会发现做一模一样的日志清洗任务,相比上一篇文章,代码总数少了非常非常多,这都是Scala语言的功劳。本篇文章在Flume部分的内容和之前的Hadoop离线分析文章的内容基本一致,Hive部分新加了对Hive数据仓库的简单说明,同时还补充了对HDFS的说明和配置,并且新加了大量对Spark框架的详细介绍,文章的最后一如既往地添加了Troubleshooting段落,和大家分享作者在部署时遇到的各种问题,读者们可以有选择性的阅读。

PS:本文Spark说明部分的最后一段非常重要,作者总结了Spark在集群环境下不得忽略的一些特性,所有使用Spark的用户都应该要重点理解。或者读者们可以直接阅读官方文档加深理解:http://spark.apache.org/docs/latest/programming-guide.html

Spark离线分析系统架构图

这里写图片描述
整个离线分析的总体架构就是使用Flume从FTP服务器上采集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Spark的RDDs操作函数清洗日志文件,最后使用Spark SQL配合HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,当然大家也可以尝试一些自动化的任务调度工具,比如说AZKABAN或者OOZIE等。
分析所使用的点击流日志文件主要来自Nginx的access.log日志文件,需要注意的是在这里并不是用Flume直接去生产环境上拉取nginx的日志文件,而是多设置了一层FTP服务器来缓冲所有的日志文件,然后再用Flume监听FTP服务器上指定的目录并拉取目录里的日志文件到HDFS服务器上(具体原因下面分析)。从生产环境推送日志文件到FTP服务器的操作可以通过Shell脚本配合Crontab定时器来实现。

网站点击流数据


图片来源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

一般在WEB系统中,用户对站点的页面的访问浏览,点击行为等一系列的数据都会记录在日志中,每一条日志记录就代表着上图中的一个数据点;而点击流数据关注的就是所有这些点连起来后的一个完整的网站浏览行为记录,可以认为是一个用户对网站的浏览session。比如说用户从哪一个外站进入到当前的网站,用户接下来浏览了当前网站的哪些页面,点击了哪些图片链接按钮等一系列的行为记录,这一个整体的信息就称为是该用户的点击流记录。这篇文章中设计的离线分析系统就是收集WEB系统中产生的这些数据日志,并清洗日志内容存储分布式的HDFS文件存储系统上,接着使用离线分析工具HIVE去统计所有用户的点击流信息。
本系统中我们采用Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式如下:

样例数据格式:
124.42.13.230 – – [18/Sep/2013:06:57:50 +0000] “GET /shoppingMall?ver=1.2.1 HTTP/1.1” 200 7200 “http://www.baidu.com.cn” “Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)”

格式分析:
1. 访客ip地址:124.42.13.230
2. 访客用户信息: – –
3. 请求时间:[18/Sep/2013:06:57:50 +0000]
4. 请求方式:GET
5. 请求的url:/shoppingMall?ver=1.10.2
6. 请求所用协议:HTTP/1.1
7. 响应码:200
8. 返回的数据流量:7200
9. 访客的来源url:http://www.baidu.com.cn
10. 访客所用浏览器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

HDFS

Apache Hadoop是用来支持海量数据分布式计算的软件框架,它具备高可靠性,高稳定性,动态扩容,运用简单的计算模型(MapReduce)在集群上进行分布式计算,并支持海量数据的存储。Apache Hadoop主要包含4个重要的模块,一个是 Hadoop Common,支持其它模块运行的通用组件;Hadoop Distributed File System(HDFS), 分布式文件存储系统;Hadoop Yarn,负责计算任务的调度和集群上资源的管理;Hadoop MapReduce,基于Hadoop Yarn的分布式计算框架。在本文的案例中,我们主要用到HDFS作为点击流数据存储,分布式计算框架我们将采用Spark RDDs Operations去替代MapReduce。

要配置Hadoop集群,首先需要配置Hadoop daemons, 它是所有其它Hadoop组件运行所必须的守护进程, 它的配置文件是

etc/hadoop/hadoop-env.sh

# set to the root of your Java installation
export JAVA_HOME=/usr/java/latest

Hadoop的运行需要Java开发环境的支持,一定要显示地标明集群上所有机器的JDK安装目录,即使你自己本机的环境已经配置好了JAVA_HOME,因为Hadoop是通过SSH来启动守护进程的,即便是NameNode启动自己本机的守护进程;如果不显示配置JDK安装目录,那么Hadoop在通过SSH启动守护进程时会找不到Java环境而报错。

在本文的案例中,我们只使用Hadoop HDFS组件,所以我们只需要配置HDFS的守护进程,NameNode daemons,SecondaryNameNode daemons以及DataNode daemons,它们的配置文件主要是core-site.xml和hdfs-site.xml:

etc/hadoop/core-site.xml



<configuration>
   <property>
      <name>fs.defaultFSname>
      <value>hdfs://ymhHadoop:9000value>
   property>
   <property>
       <name>hadoop.tmp.dirname>
       <value>/root/apps/hadoop/tmpvalue>
   property>
configuration>

fs.defaultFS属性是指定用来做NameNode的主机URI;而hadoop.tmp.dir是配置Hadoop依赖的一些系统运行时产生的文件的目录,默认是在/tmp/${username}目录下的,但是系统一重启这个目录下的文件就会被清空,所以我们重新指定它的目录

etc/hadoop/hdfs-site.xml




<configuration>
   <property>
      <name>dfs.replicationname>
      <value>1value>
   property>
    <property>
      <name>dfs.namenode.name.dirname>
      <value>/your/pathvalue>
   property>
   <property>
      <name>dfs.blocksizename>
      <value>268435456value>
   property>
   <property>
      <name>dfs.datanode.data.dirname>
      <value>/your/pathvalue>
   property>

configuration>

dfs.replication 是配置每一份在HDFS系统上的文件有几个备份;dfs.namenode.name.dir 是配置用户自定义的目录存储HDFS的业务日志和命名空间日志,也就是操作日志,集群发生故障时可以通过这份文件来恢复数据。dfs.blocksize,定义HDFS最大的文件分片是多大,默认256M,我们不需要改动;dfs.datanode.data.dir, 用来配置DataNode中的数据Blocks应该存储在哪个文件目录下。

最后把配置文件拷贝到集群的所有机子上,接下来就是启动HDFS集群,如果是第一次启动,记得一定要格式化整个HDFS文件系统

$HADOOP_PREFIX/bin/hdfs namenode -format 

接下来就是通过下面的命令分别启动NameNode和DataNode

$HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
$HADOOP_PREFIX/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script hdfs start datanode

收集用户数据

网站会通过前端JS代码或服务器端的后台代码收集用户浏览数据并存储在网站服务器中。一般运维人员会在离线分析系统和真实生产环境之间部署FTP服务器,并将生产环境上的用户数据每天定时发送到FTP服务器上,离线分析系统就会从FTP服务上采集数据而不会影响到生产环境。
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,Flume是Apache的一个顶级项目,与Hadoop也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。
Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server 产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分发给下一个装在分布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。
本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。
需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。

FTP服务器上的Flume配置文件如下:

    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = spooldir  
    agent.sources.origin.spoolDir = /export/data/trivial/weblogs  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.deserializer.maxLineLength = 2048  

    agent.sources.origin.interceptors = i2  
    agent.sources.origin.interceptors.i2.type = host  
    agent.sources.origin.interceptors.i2.hostHeader = hostname  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 10000  

    agent.sinks.target.type = avro  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hostname = 172.16.124.130  
    agent.sinks.target.port = 4545  

这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

在Hadoop服务器上的配置文件如下:

    agent.sources = origin  
    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = avro  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.bind = 0.0.0.0  
    agent.sources.origin.port = 4545  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 5000000  
    agent.channels.memorychannel.transactionCapacity = 1000000  

    agent.sinks.target.type = hdfs  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S  
    agent.sinks.target.hdfs.filePrefix = data-%{hostname}  
    agent.sinks.target.hdfs.rollInterval = 60  
    agent.sinks.target.hdfs.rollSize = 1073741824  
    agent.sinks.target.hdfs.rollCount = 1000000  
    agent.sinks.target.hdfs.round = true  
    agent.sinks.target.hdfs.roundValue = 10  
    agent.sinks.target.hdfs.roundUnit = minute  
    agent.sinks.target.hdfs.useLocalTimeStamp = true  
    agent.sinks.target.hdfs.minBlockReplicas=1  
    agent.sinks.target.hdfs.writeFormat=Text  
    agent.sinks.target.hdfs.fileType=DataStream  

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。用户分别在日志文件服务器及HDFS服务器端启动如下命令,便可以一直监听是否有新日志产生,然后拉取到HDFS文件系统中:

$ nohup bin/flume-ng agent -n $your_agent_name -c conf -f conf/$your_conf_name &

Spark

Spark是最近特别火的一个分布式计算框架,最主要原因就是快!和男人不一样,在大数据领域,一个框架会不会火,快是除了可靠性之外一个最重要的话语权,几乎所有新出的分布式框架或即将推出的新版本的MapReduce都在强调一点,我很快。Spark官网上给出的数据是Spark程序和中间数据运行在内存上时计算速度是Hadoop的100倍,即使在磁盘上也是比Hadoop快10倍。
每一个Spark程序都是提供了一个Driver进程来负责运行用户提供的程序,这个Driver进程会生成一个SparkContext,负责和Cluster Manager(可以是Spark自己提供的集群管理工具,也可以是Hadoop 的资源调度工具 Yarn)沟通,Cluster负责协调和调度集群上的Worker Node资源,当Driver获取到集群上Worker Node资源后,就会向Worker Node的Executor发送计算程序(通过Jar或者python文件),接着再向Exectutor发送计算任务去执行,Executor会启动多个线程并行运行计算任务,同时还会根据需求在Worker Node上缓存计算过程中的中间数据。需要注意的虽然Worker Node上可以启动多个物理JVM来运行不同Spark程序的Executor,但是不同的Spark程序之间不能进行通讯和数据交换。另一方面,对于Cluster Manager来说,不需要知道Spark Driver的底层,只要Spark Driver和Cluster Manager能互相通信并获取计算资源就可以协同工作,所以Spark Driver能较为方便地和各种资源调度框架整合,比如Yarn,Mesos等。
这里写图片描述
图片来源:http://spark.apache.org/docs/latest/cluster-overview.html

Spark就是通过Driver来发送用户的计算程序到集群的工作节点中,然后去并行计算数据,这其中有一个很重要的Spark专有的数据模型叫做RDD(Resilient
distributed dataset), 它代表着每一个计算阶段的数据集合,这些数据集合可以继续它所在的工作节点上,或者通过“shuffle”动作在集群中重新分发后,进行下一步的并行计算,形成新的RDD数据集。这些RDD有一个最重要的特点就是可以并行计算。RDD最开始有两种方式进行创建,一种是从Driver程序中的Scala Collections创建而来(或者其它语言的Collections),将它们转化成RDD然后在工作结点中并发处理,另一种就是从外部的分布式数据文件系统中创建RDD,如HDFS,HBASE或者任何实现了Hadoop InputFormat接口的对象。

对于Driver程序中的Collections数据,可以使用parallelize()方法将数据根据集群节点数进行切片(partitions),然后发送到集群中并发处理,一般一个节点一个切片一个task进行处理,用户也可以自定义数据的切片数。而对于外部数据源的数据,Spark可以从任何基于Hadoop框架的数据源创建RDD,一般一个文件块(blocks)创建一个RDD切片,然后在集群上并行计算。

在Spark中,对于RDDs的计算操作有两种类型,一种是Transformations,另一种是Actions。Transformations相当于Hadoop的Map组件,通过对RDDs的并发计算,然后返回新的RDDs对象;而actions则相当于Hadoop的Reduce组件,通过计算(我们这里说的计算就是function)汇总之前Transformation操作产生的RDDs对象,产生最终结果,然后返回到Driver程序中。特别需要说明的是,所有的Transformations操作都是延迟计算的(lazy), 它们一开始只会记录这个Transformations是用在哪一个RDDs上,并不会开始执行计算,除非遇到了需要返回最终结果到Driver程序中的Action操作,这时候Transformations才会开始真正意义上的计算。所以用户的Spark程序最后一步都需要一个Actions类型的操作,否则这个程序并不会触发任何计算。这么做的好处在于能提高Spark的运行效率,因为通过Transformations操作创建的RDDs对象最终只会在Actions类型的方法中用到,而且只会返回包含最终结果的RDDs到Driver中,而不是大量的中间结果。有时候,有些RDDs的计算结果会多次被重复调用,这就触发多次的重复计算,用户可以使用persist()或者cache()方法将部分RDDs的计算结果缓存在整个集群的内存中,这样当其它的RDDs需要之前的RDDs的计算结果时就可以直接从集群的内存中获得,提高运行效率。

在Spark中,另外一个需要了解的概念就是“Shuffle”,当遇到类似“reduceByKey”的Actions操作时,会把集群上所有分片的RDDs都读一遍,然后在集群之间相互拷贝并全部收集起来,统一计算这所有的RDDs,获得一个整体的结果而不再是单个分片的计算结果,接着再重新分发到集群中或者发送回Driver程序。在Shuffle过程中,Spark会产生两种类型的任务,一种是Map task,用于匹配本地分片需要shuffle的数据并将这些数据写入文件中,然后Reduce task就会读取这些文件并整合所有的数据。 所以说”Shuffle”过程会消耗许多本地磁盘的I/O资源,内存资源,网络I/O,附带还会产生许多的序列化过程。通常,repartition类型的操作,比如:repartitions和coalesce,ByKey类型的操作,比如:reduceByKey,groupByKey,join类型的操作,如:cogroup和join等,都会产生Shuffle过程。

接下来,来谈一谈Spark在集群环境下的一些特性,这部分内容非常非常重要,请大家一定要重点理解。首先,读者们一定要记住,Spark是通过Driver把用户打包提交的Spark程序序列化以后,分发到集群中的工作节点上去运行,对于计算结果的汇总是返回到Driver端,也就是说通常用户都是从Driver服务器上获取到最终的计算结果!在这个大前提下我们来探讨下面几个问题:
1. 关于如何正确地将函数传入RDD operation中,有两种推荐的方式,一种就是直接传函数体,另一种是在伴生对象中创建方法,然后通过类名.方法名的方式传入;如下面的代码所示

object DateHandler {
  def parseDate(s: String): String = { ... }
}

rdd.map(DateHandler.parseDate)

错误的传函数的方式如下:

Class MySpark {
 def parseDate(s: String): String = { ... }
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => this.parseDate(x))}
}
…………
val myspark = new MySpark
myspark.rddOperation(sc.rdd)
这样子的传递方式会把整个mySpark对象序列化后传到集群中,会造成不必要的内存开支。
因为向map中传入的“this.parseDate(x)”是一个对象实例和它里面的函数。

当在RDD operation中访问类中的变量时,也会造成传递整个对象的开销,比如:

Class MySpark {
 val myVariable
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => x + myVariable)}
}
这样也相当于x => this.x + myVariable,又关联了这个对象实例,
解决方法就是把这个类的变量传入方法内部做局部变量,
就会从访问对象中的变量变为访问局部变量值
def rddOperation(rdd:RDD[String]):RDD[String] = {val _variable = this.myVariable;rdd.map(x => x + _variable)}

2.第二个特别需要注意的问题就是在RDD operations中去更改一个全局变量,
在集群环境中也是很容易出现错误的,注意下面的代码:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

这段代码最终返回的结果还是0。这是因为这段代码连同counter是序列化后分发到集群上所有的节点机器上,不同的节点上拥有各自独立的counter,并不会是原先Driver上counter的引用,并且统计的值也不一样,最后统计结果也不会返回给Driver去重新赋值。Driver主机上的counter还是它原来的值,不会发生任何变化。如果需要在RDD operations中操作全局变量,就需要使用accumulator()方法,这是一个线程安全的方法,能在并发环境下原子性地改变全局变量的值。

3.对于集群环境下的Spark,第三个重要的是如何去合理地打印RDDs中的值。如果只是使用rdd.foreach(println()) 或者 rdd.map(println())是行不通的,一定要记住,程序会被分送到集群的工作节点上各自运行,println方法调用的也是工作节点上的输入输出接口,而用户获取数据和计算结果都是在Driver主机上的,所以是无法看到这些打印的结果。解决方法之一就是打印前将所有数据先返回Driver,如rdd.collect().foreach(println),但是这可能会让Driver瞬间耗光内存,因为collect操作将集群上的所有数据全部一次性返回给Driver。较为合理的操作为使用take() 方法先获取部分数据,然后再打印,如:rdd.take(100).foreach(println)。
4. 另外需要补充说明的是foreach(func)这个Action操作,它的作用是对集群上每一个datasets元素执行传入的func方法,这个func方法是在各个工作节点上分别执行的。虽然foreach是action操作,但是它并不是先全部将数据返回给Driver然后再在Driver上执行func方法,它返回的给Driver的Unit,这点要特别注意。所以foreach(func)操作里传入的func函数对Driver中的全局变量的操作或者打印数据等操作对于Driver来说都是无效的,这个func函数只运行在工作节点上。
5. 最后要提的是Spark的共享变量,其中一个共享变量就是使用accumulator方法封装的变量,而另一个共享变量就是广播变量(Broadcast Variables)。在谈广播变量之前,大家需要了解一个概念叫“stage”,每次进行shuffle操作之前的所有RDDs的操作都属于同一个stage。所以每次在shuffle操作时,上一个stage计算的结果都会被Spark封装成广播变量,并通过一定的高效算法将这些计算结果在集群上的每个节点里都缓存上一份,并且是read-only的,这样当下一个stage的任务再次需要之前stage的计算结果时就不用再重新计算了。用户可以自定义广播变量,一般是在某个stage的datasets需要被后续多个stage的任务重复使用的情况下设置会比较有意义。

日志清洗

当Flume从日志服务器上获取到Nginx访问日志并拉取到HDFS系统后,我们接下来要做的就是使用Spark进行日志清洗。
首先是启动Spark集群,Spark目前主要有三种集群部署方式,一种是Spark自带Standalone模式做为cluster manager,另外两种分别是Yarn和Mesos作为cluster manager。在Yarn的部署方式下,又细分了两种提交Spark程序的模式,一种是cluster模式,Driver程序直接运行在Application Master上,并直接由Yarn管理,当程序完成初始化工作后相关的客户端进程就会退出;另一种是client模式,提交程序后,Driver一直运行在客户端进程中并和Yarn的Application Master通信获取工作节点资源。在Standalone的部署方式下,也同样是细分了cluster模式和client模式的Spark程序提交方式,cluster模式下Driver是运行在工作节点的进程中,一旦完成提交程序的任务,相关的客户端进程就会退出;而client模式中,Driver会一直运行在客户端进程中并一直向console输出运行信息。本文案例中,使用Standalone模式部署Spark集群,同时我们选择手动部署的方式来启动Spark集群:

//启动 master 节点 启动完后可以通过 localhost:8080 访问Spark自带的UI界面
./sbin/start-master.sh

//启动 Worker 节点 
./sbin/start-slave.sh spark://HOST:PORT

//然后通过spark-submit script 提交Spark程序
//默认是使用client模式运行,也可以手动设置成 cluster模式
//--deploy-mode cluster
$bin/spark-submit --class com.guludada.Spark_ClickStream.VisitsInfo --master spark://ymhHadoop:7077 --executor-memory 1G --total-executor-cores 2 /export/data/spark/sparkclickstream.jar

下面是清洗日志的Spark代码,主要是过滤掉无效的访问日志信息:

package com.guludada.Spark_ClickStream

import scala.io.Source
import java.text.SimpleDateFormat;
import java.util.Locale;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.util.Date;

class WebLogClean extends Serializable {

  def weblogParser(logLine:String):String =  {

      //过滤掉信息不全或者格式不正确的日志信息
      val isStandardLogInfo = logLine.split(" ").length >= 12;

      if(isStandardLogInfo) {

        //过滤掉多余的符号
        val newLogLine:String = logLine.replace("- - ", "").replaceFirst("""\[""", "").replace(" +0000]", "");
        //将日志格式替换成正常的格式
        val logInfoGroup:Array[String] = newLogLine.split(" ");
        val oldDateFormat = logInfoGroup(1);
        //如果访问时间不存在,也是一个不正确的日志信息
        if(oldDateFormat == "-") return ""
        val newDateFormat = WebLogClean.sdf_standard.format(WebLogClean.sdf_origin.parse(oldDateFormat)) 
        return newLogLine.replace(oldDateFormat, newDateFormat)

      } else {

        return ""

      }
  }
}

object WebLogClean {

   val sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   def main(args: Array[String]) {

    val curDate = new Date(); 
    val weblogclean = new WebLogClean
    val logFile = "hdfs://ymhHadoop:9000/flume/events/"+WebLogClean.sdf_hdfsfolder.format(curDate)+"/*" // Should be some file on your system
    val conf = new SparkConf().setAppName("WebLogCleaner").setMaster("local")
    val sc = new SparkContext(conf)
    val logFileSource = sc.textFile(logFile,1).cache()

    val logLinesMapRDD = logFileSource.map(x => weblogclean.weblogParser(x)).filter(line => line != "");
    logLinesMapRDD.saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogClean.sdf_hdfsfolder.format(curDate)) 

  }

}

经过清洗后的日志格式如下:
这里写图片描述

接着为每一条访问记录拼上sessionID

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.UUID;
import java.util.Date;

class WebLogSession {

}

object WebLogSession {

   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(1)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(1)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def distinctLogInfoBySession(logInfoGroup:List[String]):List[String] = {

       val logInfoBySession:ListBuffer[String] = new ListBuffer[String]
       var lastRequestTime:Long = 0;
       var lastSessionID:String = "";

       for(logInfo <- logInfoGroup) {

         //某IP的用户第一次访问网站的记录做为该用户的第一个session日志
         if(lastRequestTime == 0) {

           lastSessionID = UUID.randomUUID().toString();
           //将该次访问日志记录拼上sessionID并放进按session分类的日志信息数组中
           logInfoBySession += lastSessionID + " " +logInfo
           //记录该次访问日志的时间,并用户和下一条访问记录比较,看时间间隔是否超过30分钟,是的话就代表新Session开始
           lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

         } else {

           //当前日志记录和上一次的访问时间相比超过30分钟,所以认为是一个新的Session,重新生成sessionID
           if(sdf_standard.parse(logInfo.split(" ")(1)).getTime - lastRequestTime >= 30 * 60 * 1000) {
               //和上一条访问记录相比,时间间隔超过了30分钟,所以当做一次新的session,并重新生成sessionID
               lastSessionID = UUID.randomUUID().toString();
               logInfoBySession += lastSessionID + " " +logInfo
               //记录该次访问日志的时间,做为一个新session开始的时间,并继续和下一条访问记录比较,看时间间隔是否又超过30分钟
               lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

           } else { //当前日志记录和上一次的访问时间相比没有超过30分钟,所以认为是同一个Session,继续沿用之前的sessionID

               logInfoBySession += lastSessionID + " " +logInfo
           }           
         }         
       }
       return logInfoBySession.toList
   }

   def main(args: Array[String]) {



      val curDate = new Date(); 
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("WebLogSession").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile, 1).cache()

      //将log信息变为(IP,log信息)的tuple格式,也就是按IP地址将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(IP[String],log信息[Iterator])中的日志按时间的升序排序
      //(其实这一步没有必要,本来Nginx的日志信息就是按访问先后顺序记录的,这一步只是为了演示如何在Scala语境下进行自定义排序) 
      //排完序后(IP[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => WebLogSession.dateComparator(A,B)))

      //将每一个IP的日志信息按30分钟的session分类并拼上session信息
      val logInfoBySessionRDD = sortedLogRDD.map(WebLogSession.distinctLogInfoBySession(_))
      //将List中的日志信息拆分成单条日志信息输出
      val logInfoWithSessionRDD =  logInfoBySessionRDD.flatMap(line => line).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   } 
}

拼接上sessionID的日志如下所示:
这里写图片描述

最后一步就是根据SessionID来整理用户的浏览信息,代码如下:

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date;

class VisitsInfo {

}

object VisitsInfo {

  val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
  val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(2)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(2)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def getVisitsInfo(logInfoGroup:List[String]):String = {

     //获取用户在该次session里所访问的页面总数
     //先用map函数将某次session里的所有访问记录变成(url,logInfo)元组的形式,然后再用groupBy函数按url分组,最后统计共有几个组
    val visitPageNum = logInfoGroup.map(log => (log.split(" ")(4),log)).groupBy(x => x._1).count(p => true)

    //获取该次session的ID
    val sessionID = logInfoGroup(0).split(" ")(0)

    //获取该次session的开始时间
    val startTime = logInfoGroup(0).split(" ")(2)

    //获取该次session的结束时间
    val endTime = logInfoGroup(logInfoGroup.length-1).split(" ")(2)

    //获取该次session第一次访问的url
    val entryPage = logInfoGroup(0).split(" ")(4)

    //获取该次session最后一次访问的url
    val leavePage = logInfoGroup(logInfoGroup.length-1).split(" ")(4)

    //获取该次session的用户IP
    val IP = logInfoGroup(0).split(" ")(1)

    //获取该次session的用户从哪个网站过来
    val referal = logInfoGroup(0).split(" ")(8)

     return sessionID + " " + startTime + " " + endTime + " " + entryPage + " " + leavePage + " " + visitPageNum + " " + IP + " " + referal;

   }

   def main(args: Array[String]) {

      val curDate = new Date();      
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("VisitsInfo").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile,1).cache()

      //将log信息变为(session,log信息)的tuple格式,也就是按session将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(session[String],log信息[Iterator])中的日志按时间的升序排序
      //排完序后(session[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => VisitsInfo.dateComparator(A,B)))

      //统计每一个单独的Session的相关信息
      sortedLogRDD.map(VisitsInfo.getVisitsInfo(_)).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/visits_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   }
}

最后整理出来的日志信息的格式和示例图:
SessionID 访问时间 离开时间 第一次访问页面 最后一次访问的页面 访问的页面总数 IP Referal
Session1 2016-05-30 15:17:00 2016-05-30 15:19:00 /blog/me /blog/others 5 192.168.12.130 www.baidu.com
Session2 2016-05-30 14:17:00 2016-05-30 15:19:38 /home /profile 10 192.168.12.140 www.178.com
Session3 2016-05-30 12:17:00 2016-05-30 15:40:00 /products /detail 6 192.168.12.150 www.78dm.com

这里写图片描述

Hive

Hive是一个数据仓库,让用户可以使用SQL语言操作分布式存储系统中的数据。在客户端,用户可以使用如何关系型数据库一样的建表SQL语句来创建数据仓库的数据表,并将HDFS中的数据导入到数据表中,接着就可以使用Hive SQL语句非常方便地对HDFS中的数据做一些增删改查的操作;在底层,当用户输入Hive Sql语句后,Hive会将SQL语句发送到它的Driver进程中的语义分析器进行分析,然后根据Hive SQL的语义转化为对应的Hadoop MapReduce程序来对HDFS中数据来进行操作;同时,Hive还将表的表名,列名,分区,属性,以及表中的数据的路径等元数据信息都存储在外部的数据库中,如:Mysql或者自带的Derby数据库等。
Hive中主要由以下几种数据模型组成:
1. Databases,相当于命名空间的作用,用来避免同名的表,视图,列名的冲突,就相当于管理同一类别的一组表的库。具体的表现为HDFS中/user/hive/warehouse/中的一个目录。
2. Tables,是具有同一模式的数据的抽象,简单点来说就是传统关系型数据库中的表。具体的表现形式为Databases下的子目录,里面存储着表中的数据块文件,而这些文件是从经过MapReduce清洗后的贴源数据文件块拷贝过来的,也就是使用Hive SQL 中的Load语句,Load语句就是将原先HDFS系统中的某个路径里的数据拷贝到/user/hive/warehouse/路径里的过程,然后通过Mysql中存储的元数据信息将这些数据和Hive的表映射起来。
3. Partitions,创建表时,用户可以指定以某个Key值来为表中的数据分片。从Tables的层面来讲,Partition就是表中新加的一个虚拟字段,用来为数据分类,在HDFS文件系统中的体现就是这个表的数据分片都按Key来划分并进入到不同的目录中,但是Hive不会保证属于某个Key的内容就一定会进入到某个分片中,因为Hive无法感知,所以需要用户在插入数据时自己要将数据根据key值划分到所对应的数据分片中,这样在以后才能提高查询效率。
4. Buckets(Clusters),是指每一个分片上的数据根据表中某个列的hash值组织在一起,也就是进入到同一个桶中,这样能提升数据查询的效率。分桶最大的意义在于增加join的效率。比如 select user.id, user.name,admin.tele from user join admin on user.id=admin.id, 已经根据id将数据分进不同的桶里,两个数据表join的时候,只要把hash结果相同的桶直接相连就行,提高join的效率。一般两张表的分桶数量要一致,才能达到join的最高效率,如果是倍数关系,也会提高join的效率但没有一致数量的分桶效率高,如果不是倍数关系分桶又不一致,那么效率和没分桶没什么区别。

Spark SQL

在作者之前的Hadoop文章里,使用MapReduce清洗完日志文件后,在Hive的客户端中使用Hive SQL去构建对应的数据仓库并对数据进行分析。和之前不同的是,在本篇文章中, 作者使用的是Spark SQL去对Hive数据仓库进行操作。因为文章篇幅有限,下面只对Spark SQL进行一个简单的介绍,更多具体的内容读者们可以去阅读官方文档。

Spark SQL是Spark项目中专门用来处理结构化数据的一个模块,用户可以通过SQL,DataFrames API,DataSets API和Spark SQL进行交互。Spark SQL可以通过标准的SQL语句对各种数据源中的数据进行操作,如Json,Parquet等,也可以通过Hive SQL操作Hive中的数据;DataFrames是一组以列名组织的数据结构,相当于关系型数据库中的表,DataFrames可以从结构化的数据文件中创建而来,如Json,Parquet等,也可以从Hive中的表,外部数据库,RDDs等创建出来;Datasets是Spark1.6后新加入的API,类似于RDDs,可以使用Transformations和Actions API 操作数据,同时提供了很多运行上的优化,并且用Encoder来替代Java Serialization接口进行序列化相关的操作。

DataFrames可以通过RDDs转化而来,其中一种转化方式就是通过case class来定义DataFrames中的列结构,也可以说是表结构,然后将RDDs中的数据转化为case class对象,接着通过反射机制获取到case class对表结构的定义并转化成DataFrames对象。转化成DF对象后,用户可以方便地使用DataFrames提供的“domain-specific”操作语言来操作里面的数据,亦或是将DataFrames对象注册成其对应的表,然后通过标准SQL语句来操作里面的数据。总之,Spark SQL提供了多样化的数据结构和操作方法让我们能以SQL语句方便地对数据进行操作,减少运维和开发成本,十分方便和强大!

而在本案例里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。
Visits数据分析
页面具体访问记录Visits的事实表和维度表结构
这里写图片描述

接下来启动spark shell,然后使用Spark SQL去操作Hive数据仓库

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

在spark shell顺序执行如下命令操作Hive数据仓库,在此过程中,大家会发现执行速度比在Hive客户端中快很多,原因就在于使用Spark SQL去操作Hive,其底层使用的是Spark RDDs去操作HDFS中的数据,而不再是原来的Hadoop MapReduce。

//创建HiveContext对象,并且该对象继承了SqlContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

//在数据仓库中创建Visits信息的贴源数据表:
sqlContext.sql("create table visitsinfo_spark(session string,startdate string,enddate string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate) into 4 buckets row format delimited fields terminated by ' '")

//将HDFS中的数据导入到HIVE的Visits信息贴源数据表中
sqlContext.sql("load data inpath '/spark_clickstream/visits_log/16-07-18' overwrite into table visitsinfo_spark partition(inputDate='2016-07-27')")

这里写图片描述

//  根据具体的业务分析逻辑创建ODS层的Visits事实表,并从visitsinfo_spark的贴源表中导入数据
sqlContext.sql("create table ods_visits_spark(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' '")

sqlContext.sql("insert into table ods_visits_spark partition(inputDate='2016-07-27') select vi.session,vi.startdate,vi.enddate,vi.entrypage,vi.leavepage,vi.viewpagenum,vi.ip,vi.referal from visitsinfo_spark as vi where vi.inputDate='2016-07-27'")

//创建Visits事实表的时间维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_time_spark(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' '")

// 将“访问时间”和“离开时间”两列的值合并后再放入时间维度表中,减少数据的冗余
sqlContext.sql("insert overwrite table ods_dim_visits_time_spark partition(inputDate='2016-07-27') select distinct ov.timeparam, substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits_spark as ov1 union select ov2.leavetime as timeparam from ods_visits_spark as ov2) as ov")

这里写图片描述

//创建visits事实表的URL维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_url_spark(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields terminated by ' '")

//将每个session的进入页面和离开页面的URL合并后存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits_spark as ov1 union select ov2.leavepage as pageurl from ods_visits_spark as ov2 ) as ov lateral view parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query")

//将每个session从哪个外站进入当前网站的信息存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.referal,b.host,b.path,b.query from ods_visits_spark as ov lateral view parse_url_tuple(substr(ov.referal,2,length(ov.referal)-2),'HOST','PATH','QUERY') b as host,path,query")

这里写图片描述

//查询访问网站页面最多的前20个session的信息
sqlContext.sql("select * from ods_visits_spark as ov sort by viewpagenum desc").show()

这里写图片描述

Troubleshooting

使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题

需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。

使用Flume拉取到HDFS中的文件格式错乱

这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致

启动Spark任务的时候会报任务无法序列化的错误

这里写图片描述
而这个错误的主要原因是Driver向worker通过RPC通信发送的任务无法序列化,很有可能就是用户在使用transformations或actions方法的时候,向这个方法中传入的函数里包含不可序列化的对象,如上面的程序中 logFileSource.map(x => weblogclean.weblogParser(x)) 向map中传入的函数包含不可序列化的对象weblogclean,所以要将该对象的相关类变为可序列化的类,通过extends Serializable的方法解决

在分布式环境下如何设置每个用户的SessionID

可以使用UUID,UUID是分布式环境下唯一的元素识别码,它由日期和时间,时钟序列,机器识别码(一般为网卡MAC地址)三部分组成。这样就保证了每个用户的SessionID的唯一性。

使用maven编译Spark程序时报错

在使用maven编译Spark程序时会报错,[ERROR] error: error while loading CharSequence, class file ‘/Library/Java/JavaVirtualMachines/jdk1.8.0_77.jdk/Contents/Home/jre/lib/rt.jar(java/lang/CharSequence.class)’ is broken
如图:
这里写图片描述
主要原因是Scala 2.10 和 JDK1.8的版本冲突问题,解决方案只能是将JDK降到1.7去编译

要在Spark中使用HiveContext,配置完后启动spark-shell报错

要在Spark中使用HiveContext,将所需的Hive配置文件拷贝到Spark项目的conf目录下,并且把连接数据库的Driver包也放到了Spark项目中的lib目录下,然后启动spark-shell报错,主要还是找不到CLASSPATH中的数据库连接驱动包,如下图:
这里写图片描述
这里写图片描述
目前作者想到的解决方案比较笨拙:就是启动spark-shell的时候显示地告诉驱动jar包的位置

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

from:http://blog.csdn.net/ymh198816/article/details/52014315

BigData

作者:Xiaoyu Ma
链接:https://www.zhihu.com/question/27974418/answer/38965760
来源:知乎
著作权归作者所有,转载请联系作者获得授权。

大数据本身是个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的。你可以把它比作一个厨房所以需要的各种工具。锅碗瓢盆,各有各的用处,互相之间又有重合。你可以用汤锅直接当碗吃饭喝汤,你可以用小刀或者刨子去皮。但是每个工具有自己的特性,虽然奇怪的组合也能工作,但是未必是最佳选择。大数据,首先你要能存的下大数据。
传统的文件系统是单机的,不能横跨不同的机器。HDFS(Hadoop Distributed FileSystem)的设计本质上是为了大量的数据能横跨成百上千台机器,但是你看到的是一个文件系统而不是很多文件系统。比如你说我要获取/hdfs/tmp/file1的数据,你引用的是一个文件路径,但是实际的数据存放在很多不同的机器上。你作为用户,不需要知道这些,就好比在单机上你不关心文件分散在什么磁道什么扇区一样。HDFS为你管理这些数据。

存的下数据之后,你就开始考虑怎么处理数据。虽然HDFS可以为你整体管理不同机器上的数据,但是这些数据太大了。一台机器读取成T上P的数据(很大的数据哦,比如整个东京热有史以来所有高清电影的大小甚至更大),一台机器慢慢跑也许需要好几天甚至好几周。对于很多公司来说,单机处理是不可忍受的,比如微博要更新24小时热博,它必须在24小时之内跑完这些处理。那么我如果要用很多台机器处理,我就面临了如何分配工作,如果一台机器挂了如何重新启动相应的任务,机器之间如何互相通信交换数据以完成复杂的计算等等。这就是MapReduce / Tez / Spark的功能。MapReduce是第一代计算引擎,Tez和Spark是第二代。MapReduce的设计,采用了很简化的计算模型,只有Map和Reduce两个计算过程(中间用Shuffle串联),用这个模型,已经可以处理大数据领域很大一部分问题了。
那什么是Map什么是Reduce?
考虑如果你要统计一个巨大的文本文件存储在类似HDFS上,你想要知道这个文本里各个词的出现频率。你启动了一个MapReduce程序。Map阶段,几百台机器同时读取这个文件的各个部分,分别把各自读到的部分分别统计出词频,产生类似
(hello, 12100次),(world,15214次)等等这样的Pair(我这里把Map和Combine放在一起说以便简化);这几百台机器各自都产生了如上的集合,然后又有几百台机器启动Reduce处理。Reducer机器A将从Mapper机器收到所有以A开头的统计结果,机器B将收到B开头的词汇统计结果(当然实际上不会真的以字母开头做依据,而是用函数产生Hash值以避免数据串化。因为类似X开头的词肯定比其他要少得多,而你不希望数据处理各个机器的工作量相差悬殊)。然后这些Reducer将再次汇总,(hello,12100)+(hello,12311)+(hello,345881)= (hello,370292)。每个Reducer都如上处理,你就得到了整个文件的词频结果。
这看似是个很简单的模型,但很多算法都可以用这个模型描述了。
Map+Reduce的简单模型很黄很暴力,虽然好用,但是很笨重。第二代的Tez和Spark除了内存Cache之类的新feature,本质上来说,是让Map/Reduce模型更通用,让Map和Reduce之间的界限更模糊,数据交换更灵活,更少的磁盘读写,以便更方便地描述复杂算法,取得更高的吞吐量。

有了MapReduce,Tez和Spark之后,程序员发现,MapReduce的程序写起来真麻烦。他们希望简化这个过程。这就好比你有了汇编语言,虽然你几乎什么都能干了,但是你还是觉得繁琐。你希望有个更高层更抽象的语言层来描述算法和数据处理流程。于是就有了Pig和Hive。Pig是接近脚本方式去描述MapReduce,Hive则用的是SQL。它们把脚本和SQL语言翻译成MapReduce程序,丢给计算引擎去计算,而你就从繁琐的MapReduce程序中解脱出来,用更简单更直观的语言去写程序了。

有了Hive之后,人们发现SQL对比Java有巨大的优势。一个是它太容易写了。刚才词频的东西,用SQL描述就只有一两行,MapReduce写起来大约要几十上百行。而更重要的是,非计算机背景的用户终于感受到了爱:我也会写SQL!于是数据分析人员终于从乞求工程师帮忙的窘境解脱出来,工程师也从写奇怪的一次性的处理程序中解脱出来。大家都开心了。Hive逐渐成长成了大数据仓库的核心组件。甚至很多公司的流水线作业集完全是用SQL描述,因为易写易改,一看就懂,容易维护。

自从数据分析人员开始用Hive分析数据之后,它们发现,Hive在MapReduce上跑,真鸡巴慢!流水线作业集也许没啥关系,比如24小时更新的推荐,反正24小时内跑完就算了。但是数据分析,人们总是希望能跑更快一些。比如我希望看过去一个小时内多少人在充气娃娃页面驻足,分别停留了多久,对于一个巨型网站海量数据下,这个处理过程也许要花几十分钟甚至很多小时。而这个分析也许只是你万里长征的第一步,你还要看多少人浏览了跳蛋多少人看了拉赫曼尼诺夫的CD,以便跟老板汇报,我们的用户是猥琐男闷骚女更多还是文艺青年/少女更多。你无法忍受等待的折磨,只能跟帅帅的工程师蝈蝈说,快,快,再快一点!
于是Impala,Presto,Drill诞生了(当然还有无数非著名的交互SQL引擎,就不一一列举了)。三个系统的核心理念是,MapReduce引擎太慢,因为它太通用,太强壮,太保守,我们SQL需要更轻量,更激进地获取资源,更专门地对SQL做优化,而且不需要那么多容错性保证(因为系统出错了大不了重新启动任务,如果整个处理时间更短的话,比如几分钟之内)。这些系统让用户更快速地处理SQL任务,牺牲了通用性稳定性等特性。如果说MapReduce是大砍刀,砍啥都不怕,那上面三个就是剔骨刀,灵巧锋利,但是不能搞太大太硬的东西。

这些系统,说实话,一直没有达到人们期望的流行度。因为这时候又两个异类被造出来了。他们是Hive on Tez / Spark和SparkSQL。它们的设计理念是,MapReduce慢,但是如果我用新一代通用计算引擎Tez或者Spark来跑SQL,那我就能跑的更快。而且用户不需要维护两套系统。这就好比如果你厨房小,人又懒,对吃的精细程度要求有限,那你可以买个电饭煲,能蒸能煲能烧,省了好多厨具。

上面的介绍,基本就是一个数据仓库的构架了。底层HDFS,上面跑MapReduce/Tez/Spark,在上面跑Hive,Pig。或者HDFS上直接跑Impala,Drill,Presto。这解决了中低速数据处理的要求。

那如果我要更高速的处理呢?
如果我是一个类似微博的公司,我希望显示不是24小时热博,我想看一个不断变化的热播榜,更新延迟在一分钟之内,上面的手段都将无法胜任。于是又一种计算模型被开发出来,这就是Streaming(流)计算。Storm是最流行的流计算平台。流计算的思路是,如果要达到更实时的更新,我何不在数据流进来的时候就处理了?比如还是词频统计的例子,我的数据流是一个一个的词,我就让他们一边流过我就一边开始统计了。流计算很牛逼,基本无延迟,但是它的短处是,不灵活,你想要统计的东西必须预先知道,毕竟数据流过就没了,你没算的东西就无法补算了。因此它是个很好的东西,但是无法替代上面数据仓库和批处理系统。

还有一个有些独立的模块是KV Store,比如Cassandra,HBase,MongoDB以及很多很多很多很多其他的(多到无法想象)。所以KV Store就是说,我有一堆键值,我能很快速滴获取与这个Key绑定的数据。比如我用身份证号,能取到你的身份数据。这个动作用MapReduce也能完成,但是很可能要扫描整个数据集。而KV Store专用来处理这个操作,所有存和取都专门为此优化了。从几个P的数据中查找一个身份证号,也许只要零点几秒。这让大数据公司的一些专门操作被大大优化了。比如我网页上有个根据订单号查找订单内容的页面,而整个网站的订单数量无法单机数据库存储,我就会考虑用KV Store来存。KV Store的理念是,基本无法处理复杂的计算,大多没法JOIN,也许没法聚合,没有强一致性保证(不同数据分布在不同机器上,你每次读取也许会读到不同的结果,也无法处理类似银行转账那样的强一致性要求的操作)。但是丫就是快。极快。
每个不同的KV Store设计都有不同取舍,有些更快,有些容量更高,有些可以支持更复杂的操作。必有一款适合你。

除此之外,还有一些更特制的系统/组件,比如Mahout是分布式机器学习库,Protobuf是数据交换的编码和库,ZooKeeper是高一致性的分布存取协同系统,等等。

有了这么多乱七八糟的工具,都在同一个集群上运转,大家需要互相尊重有序工作。所以另外一个重要组件是,调度系统。现在最流行的是Yarn。你可以把他看作中央管理,好比你妈在厨房监工,哎,你妹妹切菜切完了,你可以把刀拿去杀鸡了。只要大家都服从你妈分配,那大家都能愉快滴烧菜。

你可以认为,大数据生态圈就是一个厨房工具生态圈。为了做不同的菜,中国菜,日本菜,法国菜,你需要各种不同的工具。而且客人的需求正在复杂化,你的厨具不断被发明,也没有一个万用的厨具可以处理所有情况,因此它会变的越来越复杂。

赚钱的逻辑是什么?

作者:肥肥猫
链接:https://www.zhihu.com/question/28893681/answer/74951218
来源:知乎
著作权归作者所有,转载请联系作者获得授权。

对真正的有钱人来说,并不存在所谓“什么方法才能赚钱”的问题,而只有,“要怎样解决某一个问题”以及“怎样调动自己能调动的一切资源”这两个问题。“看钱不是钱而是工具”,就是一种能力。

许多人商海沉浮数十年,即便倾家荡产依然还能东山再起,靠的也不是具体的门路,而是比一般人高出一个层次的思维和意识。就像《1942》里的地主在流亡路上和长工说的,“我知道怎么从一个穷人变成财主,给我十年,你大爷我还是东家”。

想在中国赚大钱,有一些基本的规律和共通的观念,按照重要性排列大致有:

1.政治敏锐

从70年代末开始到80年代初,搞投机倒把的个体户是第一批发财者,而这些人往往是被主流国营经济拒之门外,被逼无奈才出去闯的。

到了80年代中后期,发财的人又从体制外回到体制内。依托村或者集体社,利用自身在原有社体制内的声望或者职务(村长,支部书记),集资办起了加工厂的人成为第二批暴富者,这一波机会和体制转型有着千丝万缕的联系,以苏南模式(华西村)最为典型。

90年代早期的价格双轨制改革和90年代后期的国退民进。不知道有多少人利用国有资本变卖的机遇,抓住了廉价买卖产权的机会。2000年开始到2005年。加入世贸又给出口加工业带来历史性的机遇,货代、纺织品出口都养肥了大把没有文化的沿海小老板。2005年的时候随便开个什么矿,之后的商品大牛市会让你的钱多到用麻袋装。

至于商品房改革、股权分置改革、资本市场的发展更是和每个家庭的财富息息相关。以前老有人说看新闻联播致富这个梗,其实是有一定道理的。

大的暴富机会,中国在过去20年出现了不下十次,最大的机遇几乎全都和政治格局变化息息相关。其基本特点是一波流。一波三五年让你赚个饱,但这波赚完还想继续赚,几乎不可能。煤矿、进出口加工,这才几年而已,现在已经在生存线上挣扎。

2. 从无到有

在中国,要预测社会未来的走向,一个最简单、最基本的思路就是:

“中国在走美国的老路”。

这也就是所谓的CTC(“Copy to China”)的基础。可以说是中国最近20年所有新兴行业的一条发展主线。这条主线之清晰、之连贯,和前面所说的靠政治趋势赚钱的一波流的短暂易逝形成了鲜明对比。从90年代末期到2000年初的中国第一次互联网泡沫开始,一直到最近这3年的第二次互联网泡沫,几乎所有成功的,体量在百亿美元以上的新兴企业,其产品最初的雏形,都是一个“中国版的美国XX”。

解决稀缺是一切生意的根本宗旨。赚钱这回事,其根本意义,是“为他人解决问题”,而所谓利润,无非是为他人(社会)解决某一问题的酬劳。而从无到有,从0到1的事情,从满足稀缺性需求的角度,要远比从1到10的事情值得去做。从经济学角度来讲,最紧缺的东西才是最有价值的,故而得到的回报也不可同日而语。BAT这些搬运工起家的巨头,解决的都是从无到有的局面,之后那些跟风的早已在格局上输了,只能吃几口残羹冷炙而已。

3.ROE思维

如果从财务上来解释这一点,那就是ROE(股本回报率)= 利润率*周转率*财务杠杆比率。

注意这三者是乘数关系,所以利润为负的情况下,你的失败也会被这个放大器给放大。但即便如此,所有赚了大钱的人,几乎没有不用到杠杆的,敢下注本来就是企业家精神的一部分。

我在之前的回答里说过,穷人是手里有多少资源才敢做大事情,富人是先想到要做多大事才开始考虑要如何筹措资源。熟练运用财务杠杆的前提,就是敢于去动“本不属于自己”的资源的意识,或者说的再直白一些,敢去借别人手里的资源来为我所用。

巴菲特扣除杠杆的收益率不过13%左右。被自己有限的资源所限制,说明自己无法掌控资源的流向,依然只是资源的奴隶。

再谈一下周转率。即使是低利润,只要有高周转率一样可以在短期内获得发展。比如很多人排队的绿茶和外婆家,菜价便宜但翻台率高。当利润很微的时候,提高周转率依然可以提升ROE。

卖珠宝和卖烤串哪个赚钱呢?很难定论。珠宝的单品利润算下来可能比较高,但周转率太低。反之,烤串利润率低但周转率要大大增高。假设同样的资金投入卖珠宝和卖烤串,并且假设珠宝3个月一个资金回笼的周期(从生产到物流到销售),烤串一天一个周期(从生产到销售),那么在假设两者都可以销售出去的情况下,也许卖烤串比卖珠宝更赚。

此外,中间型行业和智力型行业通常有望达到更高的ROE,主要是因为资产较轻的关系。

4.了解自己

没错,研究自己比研究他人,研究金钱更重要。我们最大的问题,就是对自己的了解远远没有自己以为的多。

能赚钱不代表自己就能从钱中获得享受,对金钱的态度其实是一件特别个人化的事情。大部分人也许可以做到正确衡量各种资源的时间价值、却往往对自己的时间成本毫不在意,对自己的收益偏好毫无概念。

这种能力需要建立在深刻了解自己的身体和心理的基础之上;同样的资金、时间,由于掌握信息的差异,对自己来说,用在哪里最值得,用在哪里对当下、对未来的效用最大,都是需要反复摸索和实践的。穷人往往把“钱”看的太“值钱”,把“钱”以外的资源,例如自己的时间,看得太不值钱。

很多人喜欢把赚钱、理财、投资混为一谈,其实这是三件性质完全不同的事情,所需要点亮的技能树也不尽相同。就我的观察,会赚钱的人不一定会理财;会理财和会投资也是完全两码事,会投资的人你若让他凭自己做生意,很可能亏的一塌糊涂。自己到底最适合做哪一样,也需要摸索。

很多人对创业有着各种不切实际的幻想,明明是个不够自律、厌恶繁琐的人,非要去做生产型企业,殊不知这可以算是最繁琐的一种商业模式,从原料供应,采购,生产,库存,物流,销售,每一个环节都不能出错,毛利润率又低,必须钻到每一个细节中去扣成本,可以说一步走错,满盘皆输。对自己不了解,花下去的人生成本是无法挽回的。

5.防骗意识

这里的“骗子”是广义的,泛指一切出于零和动机打你主意的、想从你这里榨取资源的人。防骗意识的培养,应该贯穿在人生的始终。

投资本身,首先一定是一件反常识的事情,不然有钱人不会在人口比例中占少数。对于投资,其零和性要比赚钱大得多,所以有时候不止要看到趋势,还要评估周围其他人是否也看到了这个趋势,更要考虑到周围人是否知道自己知道对方知道。

如果说赚钱还是人和外物之间的事,那投资就一定是人和其他人之间的游戏,更加复杂诡异,不仅要防马路上的骗子、短信里的骗子,更要防资本市场的骗子,伪装成“创业家”的骗子、伪装成“好企业”的骗子、伪装成“合伙人”、“好朋友”的骗子、伪装成“情人”、“伴侣”的骗子、甚至是伪装成“养老金”、“社会福利保障”的骗子。

怎么阅读财务报告,怎样判断信息的真实性,知道怎么查一个生意伙伴的诚信水平;这些东西之重要,可以说分分钟能让你半生心血付之东流。但可惜这些东西对于普通人家的孩子,极少有地方系统的学习,只能靠自己一笔一笔“学费”去交。

过去我们小时候学校所提供的教育和家庭教育,对于以上怎样和钱打交道的知识可谓一片空白。大部分中国的家长和老师自己也不懂这些东西。既然自己一辈子也没活明白,那就更不用说教会下一代了。这样导致的社会差距只会进一步扩大。

数据挖掘

基本概念

相关话题:大数据 数据分析 

如何系统地学习数据挖掘

这个问题思考了很久,作为过来人谈一谈,建议先看下以前的一些回答。

磨刀不误砍柴工。在学习数据挖掘之前应该明白几点:

  • 数据挖掘目前在中国的尚未流行开,犹如屠龙之技。
  • 数据初期的准备通常占整个数据挖掘项目工作量的70%左右。
  • 数据挖掘本身融合了统计学、数据库和机器学习等学科,并不是新的技术。
  • 数据挖掘技术更适合业务人员学习(相比技术人员学习业务来的更高效)
  • 数据挖掘适用于传统的BI(报表、OLAP等)无法支持的领域。
  • 数据挖掘项目通常需要重复一些毫无技术含量的工作。

如果你阅读了以上内容觉得可以接受,那么继续往下看。

学习一门技术要和行业靠拢,没有行业背景的技术如空中楼阁。技术尤其是计算机领域的技术发展是宽泛且快速更替的(十年前做网页设计都能成立公司),一般人没有这个精力和时间全方位的掌握所有技术细节。但是技术在结合行业之后就能够独当一面了,一方面有利于抓住用户痛点和刚性需求,另一方面能够累计行业经验,使用互联网思维跨界让你更容易取得成功。不要在学习技术时想要面面俱到,这样会失去你的核心竞争力。

一、目前国内的数据挖掘人员工作领域大致可分为三类。

  • 1)数据分析师:在拥有行业数据的电商、金融、电信、咨询等行业里做业务咨询,商务智能,出分析报告。
  • 2)数据挖掘工程师:在多媒体、电商、搜索、社交等大数据相关行业里做机器学习算法实现和分析。
  • 3)科学研究方向:在高校、科研单位、企业研究院等高大上科研机构研究新算法效率改进及未来应用。

二、说说各工作领域需要掌握的技能。
(1).数据分析师

  • 需要有深厚的数理统计基础,但是对程序开发能力不做要求。
  • 需要熟练使用主流的数据挖掘(或统计分析)工具如Business Analytics and Business Intelligence Software(SAS)、SPSS、EXCEL等。
  • 需要对与所在行业有关的一切核心数据有深入的理解,以及一定的数据敏感性培养。
  • 经典图书推荐:《概率论与数理统计》、《统计学》推荐David Freedman版、《业务建模与数据挖掘》、《数据挖掘导论》、《SAS编程与数据挖掘商业案例》、《Clementine数据挖掘方法及应用 》、《Excel 2007 VBA参考大全》、《IBM SPSS Statistics 19 Statistical Procedures Companion》等。

(2).数据挖掘工程师

  • 需要理解主流机器学习算法的原理和应用。
  • 需要熟悉至少一门编程语言如(Python、C、C++、Java、Delphi等)。
  • 需要理解数据库原理,能够熟练操作至少一种数据库(Mysql、SQL、DB2、Oracle等),能够明白MapReduce的原理操作以及熟练使用Hadoop系列工具更好。
  • 经典图书推荐:《数据挖掘概念与技术》、《机器学习实战》、《人工智能及其应用》、《数据库系统概论》、《算法导论》、《Web数据挖掘》、《 Python标准库》、《thinking in Java》、《Thinking in C++》、《数据结构》等。

(3).科学研究方向

三、以下是通信行业数据挖掘工程师的工作感受。

真正从数据挖掘项目实践的角度讲,沟通能力对挖掘的兴趣爱好是最重要的,有了爱好才可以愿意钻研,有了不错的沟通能力,才可以正确理解业务问题,才能正确把业务问题转化成挖掘问题,才可以在相关不同专业人才之间清楚表达你的意图和想法,取得他们的理解和支持。所以我认为沟通能力和兴趣爱好是个人的数据挖掘的核心竞争力,是很难学到的;而其他的相关专业知识谁都可以学,算不上个人发展的核心竞争力。

说到这里可能很多数据仓库专家、程序员、统计师等等都要扔砖头了,对不起,我没有别的意思,你们的专业对于数据挖掘都很重要,大家本来就是一个整体的,但是作为单独一个个体的人来说,精力有限,时间有限,不可能这些领域都能掌握,在这种情况下,选择最重要的核心,我想应该是数据挖掘技能和相关业务能力吧(从另外的一个极端的例子,我们可以看, 比如一个迷你型的挖掘项目,一个懂得市场营销和数据挖掘技能的人应该可以胜任。这其中他虽然不懂数据仓库,但是简单的Excel就足以胜任高打6万个样本的数据处理;他虽然不懂专业的展示展现技能,但是只要他自己看的懂就行了,这就无需什么展示展现;前面说过,统计技能是应该掌握的,这对一个人的迷你项目很重要;他虽然不懂编程,但是专业挖掘工具和挖掘技能足够让他操练的;这样在迷你项目中,一个懂得挖掘技能和市场营销业务能力的人就可以圆满完成了,甚至在一个数据源中根据业务需求可以无穷无尽的挖掘不同的项目思路,试问就是这个迷你项目,单纯的一个数据仓库专家、单纯的一个程序员、单纯的一个展示展现技师、甚至单纯的一个挖掘技术专家,都是无法胜任的)。这从另一个方面也说明了为什么沟通能力的重要,这些个完全不同的专业领域,想要有效有机地整合在一起进行数据挖掘项目实践,你说没有好的沟通能力行吗?

数据挖掘能力只能在项目实践的熔炉中提升、升华,所以跟着项目学挖掘是最有效的捷径。国外学习挖掘的人都是一开始跟着老板做项目,刚开始不懂不要紧,越不懂越知道应该学什么,才能学得越快越有效果。我不知道国内的数据挖掘学生是怎样学的,但是从网上的一些论坛看,很多都是纸上谈兵,这样很浪费时间,很没有效率。

另外现在国内关于数据挖掘的概念都很混乱,很多BI只是局限在报表的展示和简单的统计分析,却也号称是数据挖掘;另一方面,国内真正规模化实施数据挖掘的行业是屈指可数(银行、保险公司、移动通讯),其他行业的应用就只能算是小规模的,比如很多大学都有些相关的挖掘课题、挖掘项目,但都比较分散,而且都是处于摸索阶段,但是我相信数据挖掘在中国一定是好的前景,因为这是历史发展的必然。

讲到移动方面的实践案例,如果你是来自移动的话,你一定知道国内有家叫华院分析的公司(申明,我跟这家公司没有任何关系,我只是站在数据挖掘者的角度分析过中国大多数的号称数据挖掘服务公司,觉得华院还不错,比很多徒有虚名的大公司来得更实际),他们的业务现在已经覆盖了绝大多数中国省级移动公司的分析挖掘项目,你上网搜索一下应该可以找到一些详细的资料吧。我对华院分析印象最深的一点就是2002年这个公司白手起家,自己不懂不要紧,一边自学一边开始拓展客户,到现在在中国的移动通讯市场全面开花,的确佩服佩服呀。他们最开始都是用EXCEL处理数据,用肉眼比较选择比较不同的模型,你可以想象这其中的艰难吧。

至于移动通讯的具体的数据挖掘的应用,那太多了,比如不同话费套餐的制订、客户流失模型、不同服务交叉销售模型、不同客户对优惠的弹性分析、客户群体细分模型、不同客户生命周期模型、渠道选择模型、恶意欺诈预警模型,太多了,记住,从客户的需求出发,从实践中的问题出发,移动中可以发现太多的挖掘项目。最后告诉你一个秘密,当你数据挖掘能力提升到一定程度时,你会发现无论什么行业,其实数据挖掘的应用有大部分是重合的相似的,这样你会觉得更轻松。

四、成为一名数据科学家需要掌握的技能图。(原文:Data Science: How do I become a data scientist?

人一能之,己十之;人十能之,己千之。果能此道矣,虽愚,必明;虽柔,必强。
与君共勉。

作者:Han Hsiao
链接:https://www.zhihu.com/question/20751219/answer/24345252
来源:知乎
著作权归作者所有,转载请联系作者获得授权。