Tag Archives: Spark

Data science Python notebooks

 

data-science-ipython-notebooks

Index

 

deep-learning

IPython Notebook(s) demonstrating deep learning functionality.

 

tensor-flow-tutorials

Additional TensorFlow tutorials:

Notebook Description
tsf-basics Learn basic operations in TensorFlow, a library for various kinds of perceptual and language understanding tasks from Google.
tsf-linear Implement linear regression in TensorFlow.
tsf-logistic Implement logistic regression in TensorFlow.
tsf-nn Implement nearest neighboars in TensorFlow.
tsf-alex Implement AlexNet in TensorFlow.
tsf-cnn Implement convolutional neural networks in TensorFlow.
tsf-mlp Implement multilayer perceptrons in TensorFlow.
tsf-rnn Implement recurrent neural networks in TensorFlow.
tsf-gpu Learn about basic multi-GPU computation in TensorFlow.
tsf-gviz Learn about graph visualization in TensorFlow.
tsf-lviz Learn about loss visualization in TensorFlow.

tensor-flow-exercises

Notebook Description
tsf-not-mnist Learn simple data curation by creating a pickle with formatted datasets for training, development and testing in TensorFlow.
tsf-fully-connected Progressively train deeper and more accurate models using logistic regression and neural networks in TensorFlow.
tsf-regularization Explore regularization techniques by training fully connected networks to classify notMNIST characters in TensorFlow.
tsf-convolutions Create convolutional neural networks in TensorFlow.
tsf-word2vec Train a skip-gram model over Text8 data in TensorFlow.
tsf-lstm Train a LSTM character model over Text8 data in TensorFlow.

 

theano-tutorials

Notebook Description
theano-intro Intro to Theano, which allows you to define, optimize, and evaluate mathematical expressions involving multi-dimensional arrays efficiently. It can use GPUs and perform efficient symbolic differentiation.
theano-scan Learn scans, a mechanism to perform loops in a Theano graph.
theano-logistic Implement logistic regression in Theano.
theano-rnn Implement recurrent neural networks in Theano.
theano-mlp Implement multilayer perceptrons in Theano.

 

keras-tutorials

Notebook Description
keras Keras is an open source neural network library written in Python. It is capable of running on top of either Tensorflow or Theano.
setup Learn about the tutorial goals and how to set up your Keras environment.
intro-deep-learning-ann Get an intro to deep learning with Keras and Artificial Neural Networks (ANN).
theano Learn about Theano by working with weights matrices and gradients.
keras-otto Learn about Keras by looking at the Kaggle Otto challenge.
ann-mnist Review a simple implementation of ANN for MNIST using Keras.
conv-nets Learn about Convolutional Neural Networks (CNNs) with Keras.
conv-net-1 Recognize handwritten digits from MNIST using Keras – Part 1.
conv-net-2 Recognize handwritten digits from MNIST using Keras – Part 2.
keras-models Use pre-trained models such as VGG16, VGG19, ResNet50, and Inception v3 with Keras.
auto-encoders Learn about Autoencoders with Keras.
rnn-lstm Learn about Recurrent Neural Networks (RNNs) with Keras.
lstm-sentence-gen Learn about RNNs using Long Short Term Memory (LSTM) networks with Keras.

deep-learning-misc

Notebook Description
deep-dream Caffe-based computer vision program which uses a convolutional neural network to find and enhance patterns in images.

 

scikit-learn

IPython Notebook(s) demonstrating scikit-learn functionality.

Notebook Description
intro Intro notebook to scikit-learn. Scikit-learn adds Python support for large, multi-dimensional arrays and matrices, along with a large library of high-level mathematical functions to operate on these arrays.
knn Implement k-nearest neighbors in scikit-learn.
linear-reg Implement linear regression in scikit-learn.
svm Implement support vector machine classifiers with and without kernels in scikit-learn.
random-forest Implement random forest classifiers and regressors in scikit-learn.
k-means Implement k-means clustering in scikit-learn.
pca Implement principal component analysis in scikit-learn.
gmm Implement Gaussian mixture models in scikit-learn.
validation Implement validation and model selection in scikit-learn.

 

statistical-inference-scipy

IPython Notebook(s) demonstrating statistical inference with SciPy functionality.

Notebook Description
scipy SciPy is a collection of mathematical algorithms and convenience functions built on the Numpy extension of Python. It adds significant power to the interactive Python session by providing the user with high-level commands and classes for manipulating and visualizing data.
effect-size Explore statistics that quantify effect size by analyzing the difference in height between men and women. Uses data from the Behavioral Risk Factor Surveillance System (BRFSS) to estimate the mean and standard deviation of height for adult women and men in the United States.
sampling Explore random sampling by analyzing the average weight of men and women in the United States using BRFSS data.
hypothesis Explore hypothesis testing by analyzing the difference of first-born babies compared with others.

 

pandas

IPython Notebook(s) demonstrating pandas functionality.

Notebook Description
pandas Software library written for data manipulation and analysis in Python. Offers data structures and operations for manipulating numerical tables and time series.
github-data-wrangling Learn how to load, clean, merge, and feature engineer by analyzing GitHub data from the Viz repo.
Introduction-to-Pandas Introduction to Pandas.
Introducing-Pandas-Objects Learn about Pandas objects.
Data Indexing and Selection Learn about data indexing and selection in Pandas.
Operations-in-Pandas Learn about operating on data in Pandas.
Missing-Values Learn about handling missing data in Pandas.
Hierarchical-Indexing Learn about hierarchical indexing in Pandas.
Concat-And-Append Learn about combining datasets: concat and append in Pandas.
Merge-and-Join Learn about combining datasets: merge and join in Pandas.
Aggregation-and-Grouping Learn about aggregation and grouping in Pandas.
Pivot-Tables Learn about pivot tables in Pandas.
Working-With-Strings Learn about vectorized string operations in Pandas.
Working-with-Time-Series Learn about working with time series in pandas.
Performance-Eval-and-Query Learn about high-performance Pandas: eval() and query() in Pandas.

 

matplotlib

IPython Notebook(s) demonstrating matplotlib functionality.

Notebook Description
matplotlib Python 2D plotting library which produces publication quality figures in a variety of hardcopy formats and interactive environments across platforms.
matplotlib-applied Apply matplotlib visualizations to Kaggle competitions for exploratory data analysis. Learn how to create bar plots, histograms, subplot2grid, normalized plots, scatter plots, subplots, and kernel density estimation plots.
Introduction-To-Matplotlib Introduction to Matplotlib.
Simple-Line-Plots Learn about simple line plots in Matplotlib.
Simple-Scatter-Plots Learn about simple scatter plots in Matplotlib.
Errorbars.ipynb Learn about visualizing errors in Matplotlib.
Density-and-Contour-Plots Learn about density and contour plots in Matplotlib.
Histograms-and-Binnings Learn about histograms, binnings, and density in Matplotlib.
Customizing-Legends Learn about customizing plot legends in Matplotlib.
Customizing-Colorbars Learn about customizing colorbars in Matplotlib.
Multiple-Subplots Learn about multiple subplots in Matplotlib.
Text-and-Annotation Learn about text and annotation in Matplotlib.
Customizing-Ticks Learn about customizing ticks in Matplotlib.
Settings-and-Stylesheets Learn about customizing Matplotlib: configurations and stylesheets.
Three-Dimensional-Plotting Learn about three-dimensional plotting in Matplotlib.
Geographic-Data-With-Basemap Learn about geographic data with basemap in Matplotlib.
Visualization-With-Seaborn Learn about visualization with Seaborn.

 

numpy

IPython Notebook(s) demonstrating NumPy functionality.

Notebook Description
numpy Adds Python support for large, multi-dimensional arrays and matrices, along with a large library of high-level mathematical functions to operate on these arrays.
Introduction-to-NumPy Introduction to NumPy.
Understanding-Data-Types Learn about data types in Python.
The-Basics-Of-NumPy-Arrays Learn about the basics of NumPy arrays.
Computation-on-arrays-ufuncs Learn about computations on NumPy arrays: universal functions.
Computation-on-arrays-aggregates Learn about aggregations: min, max, and everything in between in NumPy.
Computation-on-arrays-broadcasting Learn about computation on arrays: broadcasting in NumPy.
Boolean-Arrays-and-Masks Learn about comparisons, masks, and boolean logic in NumPy.
Fancy-Indexing Learn about fancy indexing in NumPy.
Sorting Learn about sorting arrays in NumPy.
Structured-Data-NumPy Learn about structured data: NumPy’s structured arrays.

 

python-data

IPython Notebook(s) demonstrating Python functionality geared towards data analysis.

Notebook Description
data structures Learn Python basics with tuples, lists, dicts, sets.
data structure utilities Learn Python operations such as slice, range, xrange, bisect, sort, sorted, reversed, enumerate, zip, list comprehensions.
functions Learn about more advanced Python features: Functions as objects, lambda functions, closures, *args, **kwargs currying, generators, generator expressions, itertools.
datetime Learn how to work with Python dates and times: datetime, strftime, strptime, timedelta.
logging Learn about Python logging with RotatingFileHandler and TimedRotatingFileHandler.
pdb Learn how to debug in Python with the interactive source code debugger.
unit tests Learn how to test in Python with Nose unit tests.

 

kaggle-and-business-analyses

IPython Notebook(s) used in kaggle competitions and business analyses.

Notebook Description
titanic Predict survival on the Titanic. Learn data cleaning, exploratory data analysis, and machine learning.
churn-analysis Predict customer churn. Exercise logistic regression, gradient boosting classifers, support vector machines, random forests, and k-nearest-neighbors. Includes discussions of confusion matrices, ROC plots, feature importances, prediction probabilities, and calibration/descrimination.

 

spark

IPython Notebook(s) demonstrating spark and HDFS functionality.

Notebook Description
spark In-memory cluster computing framework, up to 100 times faster for certain applications and is well suited for machine learning algorithms.
hdfs Reliably stores very large files across machines in a large cluster.

 

mapreduce-python

IPython Notebook(s) demonstrating Hadoop MapReduce with mrjob functionality.

Notebook Description
mapreduce-python Runs MapReduce jobs in Python, executing jobs locally or on Hadoop clusters. Demonstrates Hadoop Streaming in Python code with unit test and mrjob config file to analyze Amazon S3 bucket logs on Elastic MapReduce. Disco is another python-based alternative.

 

aws

IPython Notebook(s) demonstrating Amazon Web Services (AWS) and AWS tools functionality.

Also check out:

  • SAWS: A Supercharged AWS command line interface (CLI).
  • Awesome AWS: A curated list of libraries, open source repos, guides, blogs, and other resources.
Notebook Description
boto Official AWS SDK for Python.
s3cmd Interacts with S3 through the command line.
s3distcp Combines smaller files and aggregates them together by taking in a pattern and target file. S3DistCp can also be used to transfer large volumes of data from S3 to your Hadoop cluster.
s3-parallel-put Uploads multiple files to S3 in parallel.
redshift Acts as a fast data warehouse built on top of technology from massive parallel processing (MPP).
kinesis Streams data in real time with the ability to process thousands of data streams per second.
lambda Runs code in response to events, automatically managing compute resources.

 

commands

IPython Notebook(s) demonstrating various command lines for Linux, Git, etc.

Notebook Description
linux Unix-like and mostly POSIX-compliant computer operating system. Disk usage, splitting files, grep, sed, curl, viewing running processes, terminal syntax highlighting, and Vim.
anaconda Distribution of the Python programming language for large-scale data processing, predictive analytics, and scientific computing, that aims to simplify package management and deployment.
ipython notebook Web-based interactive computational environment where you can combine code execution, text, mathematics, plots and rich media into a single document.
git Distributed revision control system with an emphasis on speed, data integrity, and support for distributed, non-linear workflows.
ruby Used to interact with the AWS command line and for Jekyll, a blog framework that can be hosted on GitHub Pages.
jekyll Simple, blog-aware, static site generator for personal, project, or organization sites. Renders Markdown or Textile and Liquid templates, and produces a complete, static website ready to be served by Apache HTTP Server, Nginx or another web server.
pelican Python-based alternative to Jekyll.
django High-level Python Web framework that encourages rapid development and clean, pragmatic design. It can be useful to share reports/analyses and for blogging. Lighter-weight alternatives include Pyramid, Flask, Tornado, and Bottle.

misc

IPython Notebook(s) demonstrating miscellaneous functionality.

Notebook Description
regex Regular expression cheat sheet useful in data wrangling.
algorithmia Algorithmia is a marketplace for algorithms. This notebook showcases 4 different algorithms: Face Detection, Content Summarizer, Latent Dirichlet Allocation and Optical Character Recognition.

notebook-installation

anaconda

Anaconda is a free distribution of the Python programming language for large-scale data processing, predictive analytics, and scientific computing that aims to simplify package management and deployment.

Follow instructions to install Anaconda or the more lightweight miniconda.

dev-setup

For detailed instructions, scripts, and tools to set up your development environment for data analysis, check out the dev-setup repo.

running-notebooks

To view interactive content or to modify elements within the IPython notebooks, you must first clone or download the repository then run the notebook. More information on IPython Notebooks can be found here.

$ git clone https://github.com/donnemartin/data-science-ipython-notebooks.git
$ cd data-science-ipython-notebooks
$ jupyter notebook

Notebooks tested with Python 2.7.x.

credits

contributing

Contributions are welcome! For bug reports or requests please submit an issue.

contact-info

Feel free to contact me to discuss any issues, questions, or comments.

license

This repository contains a variety of content; some developed by Donne Martin, and some from third-parties. The third-party content is distributed under the license provided by those parties.

The content developed by Donne Martin is distributed under the following license:

I am providing code and resources in this repository to you under an open source license. Because this is my personal repository, the license you receive to my code and resources is from me and not my employer (Facebook).

Copyright 2015 Donne Martin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License

Flume+Spark+Hive+Spark SQL离线分析系统

前段时间把Scala和Spark一起学习了,所以借此机会在这里做个总结,顺便和大家一起分享一下目前最火的分布式计算技术Spark!当然Spark不光是可以做离线计算,还提供了许多功能强大的组件,比如说,Spark Streaming 组件做实时计算,和Kafka等消息系统也有很好的兼容性;Spark Sql,可以让用户通过标准SQL语句操作从不同的数据源中过来的结构化数据;还提供了种类丰富的MLlib库方便用户做机器学习等等。Spark是由Scala语言编写而成的,Scala是运行在JVM上的面向函数的编程语言,它的学习过程简直反人类,可读性就我个人来看,也不是能广为让大众接受的语言,但是它功能强大,熟练后能极大提高开发速度,对于实现同样的功能,所需要写的代码量比Java少得多得多,这都得益于Scala的语言特性。本文借鉴作者之前写的另一篇关于Hadoop离线计算的文章,继续使用那篇文章中点击流分析的案例,只不过MapReduce部分改为由Spark离线计算来完成,同时,你会发现做一模一样的日志清洗任务,相比上一篇文章,代码总数少了非常非常多,这都是Scala语言的功劳。本篇文章在Flume部分的内容和之前的Hadoop离线分析文章的内容基本一致,Hive部分新加了对Hive数据仓库的简单说明,同时还补充了对HDFS的说明和配置,并且新加了大量对Spark框架的详细介绍,文章的最后一如既往地添加了Troubleshooting段落,和大家分享作者在部署时遇到的各种问题,读者们可以有选择性的阅读。

PS:本文Spark说明部分的最后一段非常重要,作者总结了Spark在集群环境下不得忽略的一些特性,所有使用Spark的用户都应该要重点理解。或者读者们可以直接阅读官方文档加深理解:http://spark.apache.org/docs/latest/programming-guide.html

Spark离线分析系统架构图

这里写图片描述
整个离线分析的总体架构就是使用Flume从FTP服务器上采集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Spark的RDDs操作函数清洗日志文件,最后使用Spark SQL配合HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,当然大家也可以尝试一些自动化的任务调度工具,比如说AZKABAN或者OOZIE等。
分析所使用的点击流日志文件主要来自Nginx的access.log日志文件,需要注意的是在这里并不是用Flume直接去生产环境上拉取nginx的日志文件,而是多设置了一层FTP服务器来缓冲所有的日志文件,然后再用Flume监听FTP服务器上指定的目录并拉取目录里的日志文件到HDFS服务器上(具体原因下面分析)。从生产环境推送日志文件到FTP服务器的操作可以通过Shell脚本配合Crontab定时器来实现。

网站点击流数据


图片来源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

一般在WEB系统中,用户对站点的页面的访问浏览,点击行为等一系列的数据都会记录在日志中,每一条日志记录就代表着上图中的一个数据点;而点击流数据关注的就是所有这些点连起来后的一个完整的网站浏览行为记录,可以认为是一个用户对网站的浏览session。比如说用户从哪一个外站进入到当前的网站,用户接下来浏览了当前网站的哪些页面,点击了哪些图片链接按钮等一系列的行为记录,这一个整体的信息就称为是该用户的点击流记录。这篇文章中设计的离线分析系统就是收集WEB系统中产生的这些数据日志,并清洗日志内容存储分布式的HDFS文件存储系统上,接着使用离线分析工具HIVE去统计所有用户的点击流信息。
本系统中我们采用Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式如下:

样例数据格式:
124.42.13.230 – – [18/Sep/2013:06:57:50 +0000] “GET /shoppingMall?ver=1.2.1 HTTP/1.1” 200 7200 “http://www.baidu.com.cn” “Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)”

格式分析:
1. 访客ip地址:124.42.13.230
2. 访客用户信息: – –
3. 请求时间:[18/Sep/2013:06:57:50 +0000]
4. 请求方式:GET
5. 请求的url:/shoppingMall?ver=1.10.2
6. 请求所用协议:HTTP/1.1
7. 响应码:200
8. 返回的数据流量:7200
9. 访客的来源url:http://www.baidu.com.cn
10. 访客所用浏览器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

HDFS

Apache Hadoop是用来支持海量数据分布式计算的软件框架,它具备高可靠性,高稳定性,动态扩容,运用简单的计算模型(MapReduce)在集群上进行分布式计算,并支持海量数据的存储。Apache Hadoop主要包含4个重要的模块,一个是 Hadoop Common,支持其它模块运行的通用组件;Hadoop Distributed File System(HDFS), 分布式文件存储系统;Hadoop Yarn,负责计算任务的调度和集群上资源的管理;Hadoop MapReduce,基于Hadoop Yarn的分布式计算框架。在本文的案例中,我们主要用到HDFS作为点击流数据存储,分布式计算框架我们将采用Spark RDDs Operations去替代MapReduce。

要配置Hadoop集群,首先需要配置Hadoop daemons, 它是所有其它Hadoop组件运行所必须的守护进程, 它的配置文件是

etc/hadoop/hadoop-env.sh

# set to the root of your Java installation
export JAVA_HOME=/usr/java/latest

Hadoop的运行需要Java开发环境的支持,一定要显示地标明集群上所有机器的JDK安装目录,即使你自己本机的环境已经配置好了JAVA_HOME,因为Hadoop是通过SSH来启动守护进程的,即便是NameNode启动自己本机的守护进程;如果不显示配置JDK安装目录,那么Hadoop在通过SSH启动守护进程时会找不到Java环境而报错。

在本文的案例中,我们只使用Hadoop HDFS组件,所以我们只需要配置HDFS的守护进程,NameNode daemons,SecondaryNameNode daemons以及DataNode daemons,它们的配置文件主要是core-site.xml和hdfs-site.xml:

etc/hadoop/core-site.xml



<configuration>
   <property>
      <name>fs.defaultFSname>
      <value>hdfs://ymhHadoop:9000value>
   property>
   <property>
       <name>hadoop.tmp.dirname>
       <value>/root/apps/hadoop/tmpvalue>
   property>
configuration>

fs.defaultFS属性是指定用来做NameNode的主机URI;而hadoop.tmp.dir是配置Hadoop依赖的一些系统运行时产生的文件的目录,默认是在/tmp/${username}目录下的,但是系统一重启这个目录下的文件就会被清空,所以我们重新指定它的目录

etc/hadoop/hdfs-site.xml




<configuration>
   <property>
      <name>dfs.replicationname>
      <value>1value>
   property>
    <property>
      <name>dfs.namenode.name.dirname>
      <value>/your/pathvalue>
   property>
   <property>
      <name>dfs.blocksizename>
      <value>268435456value>
   property>
   <property>
      <name>dfs.datanode.data.dirname>
      <value>/your/pathvalue>
   property>

configuration>

dfs.replication 是配置每一份在HDFS系统上的文件有几个备份;dfs.namenode.name.dir 是配置用户自定义的目录存储HDFS的业务日志和命名空间日志,也就是操作日志,集群发生故障时可以通过这份文件来恢复数据。dfs.blocksize,定义HDFS最大的文件分片是多大,默认256M,我们不需要改动;dfs.datanode.data.dir, 用来配置DataNode中的数据Blocks应该存储在哪个文件目录下。

最后把配置文件拷贝到集群的所有机子上,接下来就是启动HDFS集群,如果是第一次启动,记得一定要格式化整个HDFS文件系统

$HADOOP_PREFIX/bin/hdfs namenode -format 

接下来就是通过下面的命令分别启动NameNode和DataNode

$HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
$HADOOP_PREFIX/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script hdfs start datanode

收集用户数据

网站会通过前端JS代码或服务器端的后台代码收集用户浏览数据并存储在网站服务器中。一般运维人员会在离线分析系统和真实生产环境之间部署FTP服务器,并将生产环境上的用户数据每天定时发送到FTP服务器上,离线分析系统就会从FTP服务上采集数据而不会影响到生产环境。
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,Flume是Apache的一个顶级项目,与Hadoop也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。
Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server 产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分发给下一个装在分布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。
本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。
需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。

FTP服务器上的Flume配置文件如下:

    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = spooldir  
    agent.sources.origin.spoolDir = /export/data/trivial/weblogs  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.deserializer.maxLineLength = 2048  

    agent.sources.origin.interceptors = i2  
    agent.sources.origin.interceptors.i2.type = host  
    agent.sources.origin.interceptors.i2.hostHeader = hostname  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 10000  

    agent.sinks.target.type = avro  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hostname = 172.16.124.130  
    agent.sinks.target.port = 4545  

这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

在Hadoop服务器上的配置文件如下:

    agent.sources = origin  
    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = avro  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.bind = 0.0.0.0  
    agent.sources.origin.port = 4545  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 5000000  
    agent.channels.memorychannel.transactionCapacity = 1000000  

    agent.sinks.target.type = hdfs  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S  
    agent.sinks.target.hdfs.filePrefix = data-%{hostname}  
    agent.sinks.target.hdfs.rollInterval = 60  
    agent.sinks.target.hdfs.rollSize = 1073741824  
    agent.sinks.target.hdfs.rollCount = 1000000  
    agent.sinks.target.hdfs.round = true  
    agent.sinks.target.hdfs.roundValue = 10  
    agent.sinks.target.hdfs.roundUnit = minute  
    agent.sinks.target.hdfs.useLocalTimeStamp = true  
    agent.sinks.target.hdfs.minBlockReplicas=1  
    agent.sinks.target.hdfs.writeFormat=Text  
    agent.sinks.target.hdfs.fileType=DataStream  

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。用户分别在日志文件服务器及HDFS服务器端启动如下命令,便可以一直监听是否有新日志产生,然后拉取到HDFS文件系统中:

$ nohup bin/flume-ng agent -n $your_agent_name -c conf -f conf/$your_conf_name &

Spark

Spark是最近特别火的一个分布式计算框架,最主要原因就是快!和男人不一样,在大数据领域,一个框架会不会火,快是除了可靠性之外一个最重要的话语权,几乎所有新出的分布式框架或即将推出的新版本的MapReduce都在强调一点,我很快。Spark官网上给出的数据是Spark程序和中间数据运行在内存上时计算速度是Hadoop的100倍,即使在磁盘上也是比Hadoop快10倍。
每一个Spark程序都是提供了一个Driver进程来负责运行用户提供的程序,这个Driver进程会生成一个SparkContext,负责和Cluster Manager(可以是Spark自己提供的集群管理工具,也可以是Hadoop 的资源调度工具 Yarn)沟通,Cluster负责协调和调度集群上的Worker Node资源,当Driver获取到集群上Worker Node资源后,就会向Worker Node的Executor发送计算程序(通过Jar或者python文件),接着再向Exectutor发送计算任务去执行,Executor会启动多个线程并行运行计算任务,同时还会根据需求在Worker Node上缓存计算过程中的中间数据。需要注意的虽然Worker Node上可以启动多个物理JVM来运行不同Spark程序的Executor,但是不同的Spark程序之间不能进行通讯和数据交换。另一方面,对于Cluster Manager来说,不需要知道Spark Driver的底层,只要Spark Driver和Cluster Manager能互相通信并获取计算资源就可以协同工作,所以Spark Driver能较为方便地和各种资源调度框架整合,比如Yarn,Mesos等。
这里写图片描述
图片来源:http://spark.apache.org/docs/latest/cluster-overview.html

Spark就是通过Driver来发送用户的计算程序到集群的工作节点中,然后去并行计算数据,这其中有一个很重要的Spark专有的数据模型叫做RDD(Resilient
distributed dataset), 它代表着每一个计算阶段的数据集合,这些数据集合可以继续它所在的工作节点上,或者通过“shuffle”动作在集群中重新分发后,进行下一步的并行计算,形成新的RDD数据集。这些RDD有一个最重要的特点就是可以并行计算。RDD最开始有两种方式进行创建,一种是从Driver程序中的Scala Collections创建而来(或者其它语言的Collections),将它们转化成RDD然后在工作结点中并发处理,另一种就是从外部的分布式数据文件系统中创建RDD,如HDFS,HBASE或者任何实现了Hadoop InputFormat接口的对象。

对于Driver程序中的Collections数据,可以使用parallelize()方法将数据根据集群节点数进行切片(partitions),然后发送到集群中并发处理,一般一个节点一个切片一个task进行处理,用户也可以自定义数据的切片数。而对于外部数据源的数据,Spark可以从任何基于Hadoop框架的数据源创建RDD,一般一个文件块(blocks)创建一个RDD切片,然后在集群上并行计算。

在Spark中,对于RDDs的计算操作有两种类型,一种是Transformations,另一种是Actions。Transformations相当于Hadoop的Map组件,通过对RDDs的并发计算,然后返回新的RDDs对象;而actions则相当于Hadoop的Reduce组件,通过计算(我们这里说的计算就是function)汇总之前Transformation操作产生的RDDs对象,产生最终结果,然后返回到Driver程序中。特别需要说明的是,所有的Transformations操作都是延迟计算的(lazy), 它们一开始只会记录这个Transformations是用在哪一个RDDs上,并不会开始执行计算,除非遇到了需要返回最终结果到Driver程序中的Action操作,这时候Transformations才会开始真正意义上的计算。所以用户的Spark程序最后一步都需要一个Actions类型的操作,否则这个程序并不会触发任何计算。这么做的好处在于能提高Spark的运行效率,因为通过Transformations操作创建的RDDs对象最终只会在Actions类型的方法中用到,而且只会返回包含最终结果的RDDs到Driver中,而不是大量的中间结果。有时候,有些RDDs的计算结果会多次被重复调用,这就触发多次的重复计算,用户可以使用persist()或者cache()方法将部分RDDs的计算结果缓存在整个集群的内存中,这样当其它的RDDs需要之前的RDDs的计算结果时就可以直接从集群的内存中获得,提高运行效率。

在Spark中,另外一个需要了解的概念就是“Shuffle”,当遇到类似“reduceByKey”的Actions操作时,会把集群上所有分片的RDDs都读一遍,然后在集群之间相互拷贝并全部收集起来,统一计算这所有的RDDs,获得一个整体的结果而不再是单个分片的计算结果,接着再重新分发到集群中或者发送回Driver程序。在Shuffle过程中,Spark会产生两种类型的任务,一种是Map task,用于匹配本地分片需要shuffle的数据并将这些数据写入文件中,然后Reduce task就会读取这些文件并整合所有的数据。 所以说”Shuffle”过程会消耗许多本地磁盘的I/O资源,内存资源,网络I/O,附带还会产生许多的序列化过程。通常,repartition类型的操作,比如:repartitions和coalesce,ByKey类型的操作,比如:reduceByKey,groupByKey,join类型的操作,如:cogroup和join等,都会产生Shuffle过程。

接下来,来谈一谈Spark在集群环境下的一些特性,这部分内容非常非常重要,请大家一定要重点理解。首先,读者们一定要记住,Spark是通过Driver把用户打包提交的Spark程序序列化以后,分发到集群中的工作节点上去运行,对于计算结果的汇总是返回到Driver端,也就是说通常用户都是从Driver服务器上获取到最终的计算结果!在这个大前提下我们来探讨下面几个问题:
1. 关于如何正确地将函数传入RDD operation中,有两种推荐的方式,一种就是直接传函数体,另一种是在伴生对象中创建方法,然后通过类名.方法名的方式传入;如下面的代码所示

object DateHandler {
  def parseDate(s: String): String = { ... }
}

rdd.map(DateHandler.parseDate)

错误的传函数的方式如下:

Class MySpark {
 def parseDate(s: String): String = { ... }
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => this.parseDate(x))}
}
…………
val myspark = new MySpark
myspark.rddOperation(sc.rdd)
这样子的传递方式会把整个mySpark对象序列化后传到集群中,会造成不必要的内存开支。
因为向map中传入的“this.parseDate(x)”是一个对象实例和它里面的函数。

当在RDD operation中访问类中的变量时,也会造成传递整个对象的开销,比如:

Class MySpark {
 val myVariable
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => x + myVariable)}
}
这样也相当于x => this.x + myVariable,又关联了这个对象实例,
解决方法就是把这个类的变量传入方法内部做局部变量,
就会从访问对象中的变量变为访问局部变量值
def rddOperation(rdd:RDD[String]):RDD[String] = {val _variable = this.myVariable;rdd.map(x => x + _variable)}

2.第二个特别需要注意的问题就是在RDD operations中去更改一个全局变量,
在集群环境中也是很容易出现错误的,注意下面的代码:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

这段代码最终返回的结果还是0。这是因为这段代码连同counter是序列化后分发到集群上所有的节点机器上,不同的节点上拥有各自独立的counter,并不会是原先Driver上counter的引用,并且统计的值也不一样,最后统计结果也不会返回给Driver去重新赋值。Driver主机上的counter还是它原来的值,不会发生任何变化。如果需要在RDD operations中操作全局变量,就需要使用accumulator()方法,这是一个线程安全的方法,能在并发环境下原子性地改变全局变量的值。

3.对于集群环境下的Spark,第三个重要的是如何去合理地打印RDDs中的值。如果只是使用rdd.foreach(println()) 或者 rdd.map(println())是行不通的,一定要记住,程序会被分送到集群的工作节点上各自运行,println方法调用的也是工作节点上的输入输出接口,而用户获取数据和计算结果都是在Driver主机上的,所以是无法看到这些打印的结果。解决方法之一就是打印前将所有数据先返回Driver,如rdd.collect().foreach(println),但是这可能会让Driver瞬间耗光内存,因为collect操作将集群上的所有数据全部一次性返回给Driver。较为合理的操作为使用take() 方法先获取部分数据,然后再打印,如:rdd.take(100).foreach(println)。
4. 另外需要补充说明的是foreach(func)这个Action操作,它的作用是对集群上每一个datasets元素执行传入的func方法,这个func方法是在各个工作节点上分别执行的。虽然foreach是action操作,但是它并不是先全部将数据返回给Driver然后再在Driver上执行func方法,它返回的给Driver的Unit,这点要特别注意。所以foreach(func)操作里传入的func函数对Driver中的全局变量的操作或者打印数据等操作对于Driver来说都是无效的,这个func函数只运行在工作节点上。
5. 最后要提的是Spark的共享变量,其中一个共享变量就是使用accumulator方法封装的变量,而另一个共享变量就是广播变量(Broadcast Variables)。在谈广播变量之前,大家需要了解一个概念叫“stage”,每次进行shuffle操作之前的所有RDDs的操作都属于同一个stage。所以每次在shuffle操作时,上一个stage计算的结果都会被Spark封装成广播变量,并通过一定的高效算法将这些计算结果在集群上的每个节点里都缓存上一份,并且是read-only的,这样当下一个stage的任务再次需要之前stage的计算结果时就不用再重新计算了。用户可以自定义广播变量,一般是在某个stage的datasets需要被后续多个stage的任务重复使用的情况下设置会比较有意义。

日志清洗

当Flume从日志服务器上获取到Nginx访问日志并拉取到HDFS系统后,我们接下来要做的就是使用Spark进行日志清洗。
首先是启动Spark集群,Spark目前主要有三种集群部署方式,一种是Spark自带Standalone模式做为cluster manager,另外两种分别是Yarn和Mesos作为cluster manager。在Yarn的部署方式下,又细分了两种提交Spark程序的模式,一种是cluster模式,Driver程序直接运行在Application Master上,并直接由Yarn管理,当程序完成初始化工作后相关的客户端进程就会退出;另一种是client模式,提交程序后,Driver一直运行在客户端进程中并和Yarn的Application Master通信获取工作节点资源。在Standalone的部署方式下,也同样是细分了cluster模式和client模式的Spark程序提交方式,cluster模式下Driver是运行在工作节点的进程中,一旦完成提交程序的任务,相关的客户端进程就会退出;而client模式中,Driver会一直运行在客户端进程中并一直向console输出运行信息。本文案例中,使用Standalone模式部署Spark集群,同时我们选择手动部署的方式来启动Spark集群:

//启动 master 节点 启动完后可以通过 localhost:8080 访问Spark自带的UI界面
./sbin/start-master.sh

//启动 Worker 节点 
./sbin/start-slave.sh spark://HOST:PORT

//然后通过spark-submit script 提交Spark程序
//默认是使用client模式运行,也可以手动设置成 cluster模式
//--deploy-mode cluster
$bin/spark-submit --class com.guludada.Spark_ClickStream.VisitsInfo --master spark://ymhHadoop:7077 --executor-memory 1G --total-executor-cores 2 /export/data/spark/sparkclickstream.jar

下面是清洗日志的Spark代码,主要是过滤掉无效的访问日志信息:

package com.guludada.Spark_ClickStream

import scala.io.Source
import java.text.SimpleDateFormat;
import java.util.Locale;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.util.Date;

class WebLogClean extends Serializable {

  def weblogParser(logLine:String):String =  {

      //过滤掉信息不全或者格式不正确的日志信息
      val isStandardLogInfo = logLine.split(" ").length >= 12;

      if(isStandardLogInfo) {

        //过滤掉多余的符号
        val newLogLine:String = logLine.replace("- - ", "").replaceFirst("""\[""", "").replace(" +0000]", "");
        //将日志格式替换成正常的格式
        val logInfoGroup:Array[String] = newLogLine.split(" ");
        val oldDateFormat = logInfoGroup(1);
        //如果访问时间不存在,也是一个不正确的日志信息
        if(oldDateFormat == "-") return ""
        val newDateFormat = WebLogClean.sdf_standard.format(WebLogClean.sdf_origin.parse(oldDateFormat)) 
        return newLogLine.replace(oldDateFormat, newDateFormat)

      } else {

        return ""

      }
  }
}

object WebLogClean {

   val sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   def main(args: Array[String]) {

    val curDate = new Date(); 
    val weblogclean = new WebLogClean
    val logFile = "hdfs://ymhHadoop:9000/flume/events/"+WebLogClean.sdf_hdfsfolder.format(curDate)+"/*" // Should be some file on your system
    val conf = new SparkConf().setAppName("WebLogCleaner").setMaster("local")
    val sc = new SparkContext(conf)
    val logFileSource = sc.textFile(logFile,1).cache()

    val logLinesMapRDD = logFileSource.map(x => weblogclean.weblogParser(x)).filter(line => line != "");
    logLinesMapRDD.saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogClean.sdf_hdfsfolder.format(curDate)) 

  }

}

经过清洗后的日志格式如下:
这里写图片描述

接着为每一条访问记录拼上sessionID

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.UUID;
import java.util.Date;

class WebLogSession {

}

object WebLogSession {

   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(1)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(1)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def distinctLogInfoBySession(logInfoGroup:List[String]):List[String] = {

       val logInfoBySession:ListBuffer[String] = new ListBuffer[String]
       var lastRequestTime:Long = 0;
       var lastSessionID:String = "";

       for(logInfo <- logInfoGroup) {

         //某IP的用户第一次访问网站的记录做为该用户的第一个session日志
         if(lastRequestTime == 0) {

           lastSessionID = UUID.randomUUID().toString();
           //将该次访问日志记录拼上sessionID并放进按session分类的日志信息数组中
           logInfoBySession += lastSessionID + " " +logInfo
           //记录该次访问日志的时间,并用户和下一条访问记录比较,看时间间隔是否超过30分钟,是的话就代表新Session开始
           lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

         } else {

           //当前日志记录和上一次的访问时间相比超过30分钟,所以认为是一个新的Session,重新生成sessionID
           if(sdf_standard.parse(logInfo.split(" ")(1)).getTime - lastRequestTime >= 30 * 60 * 1000) {
               //和上一条访问记录相比,时间间隔超过了30分钟,所以当做一次新的session,并重新生成sessionID
               lastSessionID = UUID.randomUUID().toString();
               logInfoBySession += lastSessionID + " " +logInfo
               //记录该次访问日志的时间,做为一个新session开始的时间,并继续和下一条访问记录比较,看时间间隔是否又超过30分钟
               lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

           } else { //当前日志记录和上一次的访问时间相比没有超过30分钟,所以认为是同一个Session,继续沿用之前的sessionID

               logInfoBySession += lastSessionID + " " +logInfo
           }           
         }         
       }
       return logInfoBySession.toList
   }

   def main(args: Array[String]) {



      val curDate = new Date(); 
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("WebLogSession").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile, 1).cache()

      //将log信息变为(IP,log信息)的tuple格式,也就是按IP地址将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(IP[String],log信息[Iterator])中的日志按时间的升序排序
      //(其实这一步没有必要,本来Nginx的日志信息就是按访问先后顺序记录的,这一步只是为了演示如何在Scala语境下进行自定义排序) 
      //排完序后(IP[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => WebLogSession.dateComparator(A,B)))

      //将每一个IP的日志信息按30分钟的session分类并拼上session信息
      val logInfoBySessionRDD = sortedLogRDD.map(WebLogSession.distinctLogInfoBySession(_))
      //将List中的日志信息拆分成单条日志信息输出
      val logInfoWithSessionRDD =  logInfoBySessionRDD.flatMap(line => line).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   } 
}

拼接上sessionID的日志如下所示:
这里写图片描述

最后一步就是根据SessionID来整理用户的浏览信息,代码如下:

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date;

class VisitsInfo {

}

object VisitsInfo {

  val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
  val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(2)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(2)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def getVisitsInfo(logInfoGroup:List[String]):String = {

     //获取用户在该次session里所访问的页面总数
     //先用map函数将某次session里的所有访问记录变成(url,logInfo)元组的形式,然后再用groupBy函数按url分组,最后统计共有几个组
    val visitPageNum = logInfoGroup.map(log => (log.split(" ")(4),log)).groupBy(x => x._1).count(p => true)

    //获取该次session的ID
    val sessionID = logInfoGroup(0).split(" ")(0)

    //获取该次session的开始时间
    val startTime = logInfoGroup(0).split(" ")(2)

    //获取该次session的结束时间
    val endTime = logInfoGroup(logInfoGroup.length-1).split(" ")(2)

    //获取该次session第一次访问的url
    val entryPage = logInfoGroup(0).split(" ")(4)

    //获取该次session最后一次访问的url
    val leavePage = logInfoGroup(logInfoGroup.length-1).split(" ")(4)

    //获取该次session的用户IP
    val IP = logInfoGroup(0).split(" ")(1)

    //获取该次session的用户从哪个网站过来
    val referal = logInfoGroup(0).split(" ")(8)

     return sessionID + " " + startTime + " " + endTime + " " + entryPage + " " + leavePage + " " + visitPageNum + " " + IP + " " + referal;

   }

   def main(args: Array[String]) {

      val curDate = new Date();      
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("VisitsInfo").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile,1).cache()

      //将log信息变为(session,log信息)的tuple格式,也就是按session将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(session[String],log信息[Iterator])中的日志按时间的升序排序
      //排完序后(session[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => VisitsInfo.dateComparator(A,B)))

      //统计每一个单独的Session的相关信息
      sortedLogRDD.map(VisitsInfo.getVisitsInfo(_)).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/visits_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   }
}

最后整理出来的日志信息的格式和示例图:
SessionID 访问时间 离开时间 第一次访问页面 最后一次访问的页面 访问的页面总数 IP Referal
Session1 2016-05-30 15:17:00 2016-05-30 15:19:00 /blog/me /blog/others 5 192.168.12.130 www.baidu.com
Session2 2016-05-30 14:17:00 2016-05-30 15:19:38 /home /profile 10 192.168.12.140 www.178.com
Session3 2016-05-30 12:17:00 2016-05-30 15:40:00 /products /detail 6 192.168.12.150 www.78dm.com

这里写图片描述

Hive

Hive是一个数据仓库,让用户可以使用SQL语言操作分布式存储系统中的数据。在客户端,用户可以使用如何关系型数据库一样的建表SQL语句来创建数据仓库的数据表,并将HDFS中的数据导入到数据表中,接着就可以使用Hive SQL语句非常方便地对HDFS中的数据做一些增删改查的操作;在底层,当用户输入Hive Sql语句后,Hive会将SQL语句发送到它的Driver进程中的语义分析器进行分析,然后根据Hive SQL的语义转化为对应的Hadoop MapReduce程序来对HDFS中数据来进行操作;同时,Hive还将表的表名,列名,分区,属性,以及表中的数据的路径等元数据信息都存储在外部的数据库中,如:Mysql或者自带的Derby数据库等。
Hive中主要由以下几种数据模型组成:
1. Databases,相当于命名空间的作用,用来避免同名的表,视图,列名的冲突,就相当于管理同一类别的一组表的库。具体的表现为HDFS中/user/hive/warehouse/中的一个目录。
2. Tables,是具有同一模式的数据的抽象,简单点来说就是传统关系型数据库中的表。具体的表现形式为Databases下的子目录,里面存储着表中的数据块文件,而这些文件是从经过MapReduce清洗后的贴源数据文件块拷贝过来的,也就是使用Hive SQL 中的Load语句,Load语句就是将原先HDFS系统中的某个路径里的数据拷贝到/user/hive/warehouse/路径里的过程,然后通过Mysql中存储的元数据信息将这些数据和Hive的表映射起来。
3. Partitions,创建表时,用户可以指定以某个Key值来为表中的数据分片。从Tables的层面来讲,Partition就是表中新加的一个虚拟字段,用来为数据分类,在HDFS文件系统中的体现就是这个表的数据分片都按Key来划分并进入到不同的目录中,但是Hive不会保证属于某个Key的内容就一定会进入到某个分片中,因为Hive无法感知,所以需要用户在插入数据时自己要将数据根据key值划分到所对应的数据分片中,这样在以后才能提高查询效率。
4. Buckets(Clusters),是指每一个分片上的数据根据表中某个列的hash值组织在一起,也就是进入到同一个桶中,这样能提升数据查询的效率。分桶最大的意义在于增加join的效率。比如 select user.id, user.name,admin.tele from user join admin on user.id=admin.id, 已经根据id将数据分进不同的桶里,两个数据表join的时候,只要把hash结果相同的桶直接相连就行,提高join的效率。一般两张表的分桶数量要一致,才能达到join的最高效率,如果是倍数关系,也会提高join的效率但没有一致数量的分桶效率高,如果不是倍数关系分桶又不一致,那么效率和没分桶没什么区别。

Spark SQL

在作者之前的Hadoop文章里,使用MapReduce清洗完日志文件后,在Hive的客户端中使用Hive SQL去构建对应的数据仓库并对数据进行分析。和之前不同的是,在本篇文章中, 作者使用的是Spark SQL去对Hive数据仓库进行操作。因为文章篇幅有限,下面只对Spark SQL进行一个简单的介绍,更多具体的内容读者们可以去阅读官方文档。

Spark SQL是Spark项目中专门用来处理结构化数据的一个模块,用户可以通过SQL,DataFrames API,DataSets API和Spark SQL进行交互。Spark SQL可以通过标准的SQL语句对各种数据源中的数据进行操作,如Json,Parquet等,也可以通过Hive SQL操作Hive中的数据;DataFrames是一组以列名组织的数据结构,相当于关系型数据库中的表,DataFrames可以从结构化的数据文件中创建而来,如Json,Parquet等,也可以从Hive中的表,外部数据库,RDDs等创建出来;Datasets是Spark1.6后新加入的API,类似于RDDs,可以使用Transformations和Actions API 操作数据,同时提供了很多运行上的优化,并且用Encoder来替代Java Serialization接口进行序列化相关的操作。

DataFrames可以通过RDDs转化而来,其中一种转化方式就是通过case class来定义DataFrames中的列结构,也可以说是表结构,然后将RDDs中的数据转化为case class对象,接着通过反射机制获取到case class对表结构的定义并转化成DataFrames对象。转化成DF对象后,用户可以方便地使用DataFrames提供的“domain-specific”操作语言来操作里面的数据,亦或是将DataFrames对象注册成其对应的表,然后通过标准SQL语句来操作里面的数据。总之,Spark SQL提供了多样化的数据结构和操作方法让我们能以SQL语句方便地对数据进行操作,减少运维和开发成本,十分方便和强大!

而在本案例里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。
Visits数据分析
页面具体访问记录Visits的事实表和维度表结构
这里写图片描述

接下来启动spark shell,然后使用Spark SQL去操作Hive数据仓库

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

在spark shell顺序执行如下命令操作Hive数据仓库,在此过程中,大家会发现执行速度比在Hive客户端中快很多,原因就在于使用Spark SQL去操作Hive,其底层使用的是Spark RDDs去操作HDFS中的数据,而不再是原来的Hadoop MapReduce。

//创建HiveContext对象,并且该对象继承了SqlContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

//在数据仓库中创建Visits信息的贴源数据表:
sqlContext.sql("create table visitsinfo_spark(session string,startdate string,enddate string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate) into 4 buckets row format delimited fields terminated by ' '")

//将HDFS中的数据导入到HIVE的Visits信息贴源数据表中
sqlContext.sql("load data inpath '/spark_clickstream/visits_log/16-07-18' overwrite into table visitsinfo_spark partition(inputDate='2016-07-27')")

这里写图片描述

//  根据具体的业务分析逻辑创建ODS层的Visits事实表,并从visitsinfo_spark的贴源表中导入数据
sqlContext.sql("create table ods_visits_spark(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' '")

sqlContext.sql("insert into table ods_visits_spark partition(inputDate='2016-07-27') select vi.session,vi.startdate,vi.enddate,vi.entrypage,vi.leavepage,vi.viewpagenum,vi.ip,vi.referal from visitsinfo_spark as vi where vi.inputDate='2016-07-27'")

//创建Visits事实表的时间维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_time_spark(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' '")

// 将“访问时间”和“离开时间”两列的值合并后再放入时间维度表中,减少数据的冗余
sqlContext.sql("insert overwrite table ods_dim_visits_time_spark partition(inputDate='2016-07-27') select distinct ov.timeparam, substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits_spark as ov1 union select ov2.leavetime as timeparam from ods_visits_spark as ov2) as ov")

这里写图片描述

//创建visits事实表的URL维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_url_spark(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields terminated by ' '")

//将每个session的进入页面和离开页面的URL合并后存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits_spark as ov1 union select ov2.leavepage as pageurl from ods_visits_spark as ov2 ) as ov lateral view parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query")

//将每个session从哪个外站进入当前网站的信息存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.referal,b.host,b.path,b.query from ods_visits_spark as ov lateral view parse_url_tuple(substr(ov.referal,2,length(ov.referal)-2),'HOST','PATH','QUERY') b as host,path,query")

这里写图片描述

//查询访问网站页面最多的前20个session的信息
sqlContext.sql("select * from ods_visits_spark as ov sort by viewpagenum desc").show()

这里写图片描述

Troubleshooting

使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题

需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。

使用Flume拉取到HDFS中的文件格式错乱

这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致

启动Spark任务的时候会报任务无法序列化的错误

这里写图片描述
而这个错误的主要原因是Driver向worker通过RPC通信发送的任务无法序列化,很有可能就是用户在使用transformations或actions方法的时候,向这个方法中传入的函数里包含不可序列化的对象,如上面的程序中 logFileSource.map(x => weblogclean.weblogParser(x)) 向map中传入的函数包含不可序列化的对象weblogclean,所以要将该对象的相关类变为可序列化的类,通过extends Serializable的方法解决

在分布式环境下如何设置每个用户的SessionID

可以使用UUID,UUID是分布式环境下唯一的元素识别码,它由日期和时间,时钟序列,机器识别码(一般为网卡MAC地址)三部分组成。这样就保证了每个用户的SessionID的唯一性。

使用maven编译Spark程序时报错

在使用maven编译Spark程序时会报错,[ERROR] error: error while loading CharSequence, class file ‘/Library/Java/JavaVirtualMachines/jdk1.8.0_77.jdk/Contents/Home/jre/lib/rt.jar(java/lang/CharSequence.class)’ is broken
如图:
这里写图片描述
主要原因是Scala 2.10 和 JDK1.8的版本冲突问题,解决方案只能是将JDK降到1.7去编译

要在Spark中使用HiveContext,配置完后启动spark-shell报错

要在Spark中使用HiveContext,将所需的Hive配置文件拷贝到Spark项目的conf目录下,并且把连接数据库的Driver包也放到了Spark项目中的lib目录下,然后启动spark-shell报错,主要还是找不到CLASSPATH中的数据库连接驱动包,如下图:
这里写图片描述
这里写图片描述
目前作者想到的解决方案比较笨拙:就是启动spark-shell的时候显示地告诉驱动jar包的位置

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

from:http://blog.csdn.net/ymh198816/article/details/52014315

BigData

作者:Xiaoyu Ma
链接:https://www.zhihu.com/question/27974418/answer/38965760
来源:知乎
著作权归作者所有,转载请联系作者获得授权。

大数据本身是个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的。你可以把它比作一个厨房所以需要的各种工具。锅碗瓢盆,各有各的用处,互相之间又有重合。你可以用汤锅直接当碗吃饭喝汤,你可以用小刀或者刨子去皮。但是每个工具有自己的特性,虽然奇怪的组合也能工作,但是未必是最佳选择。大数据,首先你要能存的下大数据。
传统的文件系统是单机的,不能横跨不同的机器。HDFS(Hadoop Distributed FileSystem)的设计本质上是为了大量的数据能横跨成百上千台机器,但是你看到的是一个文件系统而不是很多文件系统。比如你说我要获取/hdfs/tmp/file1的数据,你引用的是一个文件路径,但是实际的数据存放在很多不同的机器上。你作为用户,不需要知道这些,就好比在单机上你不关心文件分散在什么磁道什么扇区一样。HDFS为你管理这些数据。

存的下数据之后,你就开始考虑怎么处理数据。虽然HDFS可以为你整体管理不同机器上的数据,但是这些数据太大了。一台机器读取成T上P的数据(很大的数据哦,比如整个东京热有史以来所有高清电影的大小甚至更大),一台机器慢慢跑也许需要好几天甚至好几周。对于很多公司来说,单机处理是不可忍受的,比如微博要更新24小时热博,它必须在24小时之内跑完这些处理。那么我如果要用很多台机器处理,我就面临了如何分配工作,如果一台机器挂了如何重新启动相应的任务,机器之间如何互相通信交换数据以完成复杂的计算等等。这就是MapReduce / Tez / Spark的功能。MapReduce是第一代计算引擎,Tez和Spark是第二代。MapReduce的设计,采用了很简化的计算模型,只有Map和Reduce两个计算过程(中间用Shuffle串联),用这个模型,已经可以处理大数据领域很大一部分问题了。
那什么是Map什么是Reduce?
考虑如果你要统计一个巨大的文本文件存储在类似HDFS上,你想要知道这个文本里各个词的出现频率。你启动了一个MapReduce程序。Map阶段,几百台机器同时读取这个文件的各个部分,分别把各自读到的部分分别统计出词频,产生类似
(hello, 12100次),(world,15214次)等等这样的Pair(我这里把Map和Combine放在一起说以便简化);这几百台机器各自都产生了如上的集合,然后又有几百台机器启动Reduce处理。Reducer机器A将从Mapper机器收到所有以A开头的统计结果,机器B将收到B开头的词汇统计结果(当然实际上不会真的以字母开头做依据,而是用函数产生Hash值以避免数据串化。因为类似X开头的词肯定比其他要少得多,而你不希望数据处理各个机器的工作量相差悬殊)。然后这些Reducer将再次汇总,(hello,12100)+(hello,12311)+(hello,345881)= (hello,370292)。每个Reducer都如上处理,你就得到了整个文件的词频结果。
这看似是个很简单的模型,但很多算法都可以用这个模型描述了。
Map+Reduce的简单模型很黄很暴力,虽然好用,但是很笨重。第二代的Tez和Spark除了内存Cache之类的新feature,本质上来说,是让Map/Reduce模型更通用,让Map和Reduce之间的界限更模糊,数据交换更灵活,更少的磁盘读写,以便更方便地描述复杂算法,取得更高的吞吐量。

有了MapReduce,Tez和Spark之后,程序员发现,MapReduce的程序写起来真麻烦。他们希望简化这个过程。这就好比你有了汇编语言,虽然你几乎什么都能干了,但是你还是觉得繁琐。你希望有个更高层更抽象的语言层来描述算法和数据处理流程。于是就有了Pig和Hive。Pig是接近脚本方式去描述MapReduce,Hive则用的是SQL。它们把脚本和SQL语言翻译成MapReduce程序,丢给计算引擎去计算,而你就从繁琐的MapReduce程序中解脱出来,用更简单更直观的语言去写程序了。

有了Hive之后,人们发现SQL对比Java有巨大的优势。一个是它太容易写了。刚才词频的东西,用SQL描述就只有一两行,MapReduce写起来大约要几十上百行。而更重要的是,非计算机背景的用户终于感受到了爱:我也会写SQL!于是数据分析人员终于从乞求工程师帮忙的窘境解脱出来,工程师也从写奇怪的一次性的处理程序中解脱出来。大家都开心了。Hive逐渐成长成了大数据仓库的核心组件。甚至很多公司的流水线作业集完全是用SQL描述,因为易写易改,一看就懂,容易维护。

自从数据分析人员开始用Hive分析数据之后,它们发现,Hive在MapReduce上跑,真鸡巴慢!流水线作业集也许没啥关系,比如24小时更新的推荐,反正24小时内跑完就算了。但是数据分析,人们总是希望能跑更快一些。比如我希望看过去一个小时内多少人在充气娃娃页面驻足,分别停留了多久,对于一个巨型网站海量数据下,这个处理过程也许要花几十分钟甚至很多小时。而这个分析也许只是你万里长征的第一步,你还要看多少人浏览了跳蛋多少人看了拉赫曼尼诺夫的CD,以便跟老板汇报,我们的用户是猥琐男闷骚女更多还是文艺青年/少女更多。你无法忍受等待的折磨,只能跟帅帅的工程师蝈蝈说,快,快,再快一点!
于是Impala,Presto,Drill诞生了(当然还有无数非著名的交互SQL引擎,就不一一列举了)。三个系统的核心理念是,MapReduce引擎太慢,因为它太通用,太强壮,太保守,我们SQL需要更轻量,更激进地获取资源,更专门地对SQL做优化,而且不需要那么多容错性保证(因为系统出错了大不了重新启动任务,如果整个处理时间更短的话,比如几分钟之内)。这些系统让用户更快速地处理SQL任务,牺牲了通用性稳定性等特性。如果说MapReduce是大砍刀,砍啥都不怕,那上面三个就是剔骨刀,灵巧锋利,但是不能搞太大太硬的东西。

这些系统,说实话,一直没有达到人们期望的流行度。因为这时候又两个异类被造出来了。他们是Hive on Tez / Spark和SparkSQL。它们的设计理念是,MapReduce慢,但是如果我用新一代通用计算引擎Tez或者Spark来跑SQL,那我就能跑的更快。而且用户不需要维护两套系统。这就好比如果你厨房小,人又懒,对吃的精细程度要求有限,那你可以买个电饭煲,能蒸能煲能烧,省了好多厨具。

上面的介绍,基本就是一个数据仓库的构架了。底层HDFS,上面跑MapReduce/Tez/Spark,在上面跑Hive,Pig。或者HDFS上直接跑Impala,Drill,Presto。这解决了中低速数据处理的要求。

那如果我要更高速的处理呢?
如果我是一个类似微博的公司,我希望显示不是24小时热博,我想看一个不断变化的热播榜,更新延迟在一分钟之内,上面的手段都将无法胜任。于是又一种计算模型被开发出来,这就是Streaming(流)计算。Storm是最流行的流计算平台。流计算的思路是,如果要达到更实时的更新,我何不在数据流进来的时候就处理了?比如还是词频统计的例子,我的数据流是一个一个的词,我就让他们一边流过我就一边开始统计了。流计算很牛逼,基本无延迟,但是它的短处是,不灵活,你想要统计的东西必须预先知道,毕竟数据流过就没了,你没算的东西就无法补算了。因此它是个很好的东西,但是无法替代上面数据仓库和批处理系统。

还有一个有些独立的模块是KV Store,比如Cassandra,HBase,MongoDB以及很多很多很多很多其他的(多到无法想象)。所以KV Store就是说,我有一堆键值,我能很快速滴获取与这个Key绑定的数据。比如我用身份证号,能取到你的身份数据。这个动作用MapReduce也能完成,但是很可能要扫描整个数据集。而KV Store专用来处理这个操作,所有存和取都专门为此优化了。从几个P的数据中查找一个身份证号,也许只要零点几秒。这让大数据公司的一些专门操作被大大优化了。比如我网页上有个根据订单号查找订单内容的页面,而整个网站的订单数量无法单机数据库存储,我就会考虑用KV Store来存。KV Store的理念是,基本无法处理复杂的计算,大多没法JOIN,也许没法聚合,没有强一致性保证(不同数据分布在不同机器上,你每次读取也许会读到不同的结果,也无法处理类似银行转账那样的强一致性要求的操作)。但是丫就是快。极快。
每个不同的KV Store设计都有不同取舍,有些更快,有些容量更高,有些可以支持更复杂的操作。必有一款适合你。

除此之外,还有一些更特制的系统/组件,比如Mahout是分布式机器学习库,Protobuf是数据交换的编码和库,ZooKeeper是高一致性的分布存取协同系统,等等。

有了这么多乱七八糟的工具,都在同一个集群上运转,大家需要互相尊重有序工作。所以另外一个重要组件是,调度系统。现在最流行的是Yarn。你可以把他看作中央管理,好比你妈在厨房监工,哎,你妹妹切菜切完了,你可以把刀拿去杀鸡了。只要大家都服从你妈分配,那大家都能愉快滴烧菜。

你可以认为,大数据生态圈就是一个厨房工具生态圈。为了做不同的菜,中国菜,日本菜,法国菜,你需要各种不同的工具。而且客人的需求正在复杂化,你的厨具不断被发明,也没有一个万用的厨具可以处理所有情况,因此它会变的越来越复杂。

Spark

Spark在美团的实践

技术在京东智能供应链预测的应用

性能优化指南——基础篇

Spark性能优化指南——高级篇

Flume+Spark+Hive+Spark SQL离线分析系统

[原]Spark MLlib系列(二):基于协同过滤的电影推荐系统

基于 Spark 的文本情感分析

IBM 公司在 2015 年对外宣告了一个新的科技和商务时代的来临—认知时代。这个巨大的转变,来自 IBM 对技术和商业领域的三个重要的洞察力[1]。第一,这个世界被数据所充斥。第二,这个世界通过代码被改造。第三,认知计算的出现。其中,认知计算可以:

  • 通过感知与互动,理解非结构化数据
  • 通过生成假设、评估、辩证、和建议来推理
  • 从专家培训、每一次互动、及持续取得数据中学习。

本文描述了一个基于 Spark 构建的认知系统:文本情感分析系统,分析和理解社交论坛的非结构化文本数据。

基于 Spark 的文本情感分析

文本情感分析是指对具有人为主观情感色彩文本材料进行处理、分析和推理的过程。文本情感分析主要的应用场景是对用户关于某个主题的评论文本进行处理和分析。比如,人们在打算去看一部电影之前,通常会去看豆瓣电影板块上的用户评论,再决定是否去看这部电影。另外一方面,电影制片人会通过对专业论坛上的用户评论进行分析,了解市场对于电影的总体反馈。本文中文本分析的对象为网络短评,为非正式场合的短文本语料,在只考虑正面倾向和负面倾向的情况下,实现文本倾向性的分类。

文本情感分析主要涉及如下四个技术环节。

  1. 收集数据集:本文中,以分析电影《疯狂动物城》的用户评论为例子,采集豆瓣上《疯狂动物城》的用户短评和短评评分作为样本数据,通过样本数据训练分类模型来判断微博上的一段话对该电影的情感倾向。
  2. 设计文本的表示模型:让机器“读懂”文字,是文本情感分析的基础,而这首先要解决的问题是文本的表示模型。通常,文本的表示采用向量空间模型,也就是说采用向量表示文本。向量的特征项是模型中最小的单元,可以是一个文档中的字、词或短语,一个文档的内容可以看成是它的特征项组成的集合,而每一个特征项依据一定的原则都被赋予上权重。
  3. 选择文本的特征:当可以把一个文档映射成向量后,那如何选择特征项和特征值呢?通常的做法是先进行中文分词(­­­­本文使用 jieba 分词工具),把用户评论转化成词语后,可以使用 TF-IDF(Term Frequency–Inverse Document Frequency,词频-逆文档频率)算法来抽取特征,并计算出特征值。
  4. 选择分类模型:常用的分类算法有很多,如:决策树、贝叶斯、人工神经网络、K-近邻、支持向量机等等。在文本分类上使用较多的是贝叶斯和支持向量机。本文中,也以这两种方法来进行模型训练。

为什么采用 Spark

传统的单节点计算已经难以满足用户生成的海量数据的处理和分析的要求。比如,豆瓣网站上《疯狂动物城》电影短评就有 111421 条,如果需要同时处理来自多个大型专业网站上所有电影的影评,单台服务器的计算能力和存储能力都很难满足需求。这个时候需要考虑引入分布式计算的技术,使得计算能力和存储能力能够线性扩展。

Spark 是一个快速的、通用的集群计算平台,也是业内非常流行的开源分布式技术。Spark 围绕着 RDD(Resilient Distributed Dataset)弹性分布式数据集,扩展了广泛使用的 MapReduce[5]计算模型,相比起 Hadoop[6]的 MapReduce 计算框架,Spark 更为高效和灵活。Spark 主要的特点如下:

  1. 内存计算:能够在内存中进行计算,它会优先考虑使用各计算节点的内存作为存储,当内存不足时才会考虑使用磁盘,这样极大的减少了磁盘 I/O,提高了效率。
  2. 惰性求值:RDD 丰富的计算操作可以分为两类,转化操作和行动操作。而当程序调用 RDD 的转化操作(如数据的读取、Map、Filter)的时候,Spark 并不会立刻开始计算,而是记下所需要执行的操作,尽可能的将一些转化操作合并,来减少计算数据的步骤,只有在调用行动操作(如获取数据的行数 Count)的时候才会开始读入数据,进行转化操作、行动操作,得到结果。
  3. 接口丰富:Spark 提供 Scala,Java,Python,R 四种编程语言接口,可以满足不同技术背景的工程人员的需求。并且还能和其他大数据工具密切配合。例如 Spark 可以运行在 Hadoop 之上,能够访问所有支持 Hadoop 的数据源(如 HDFS、Cassandra、Hbase)。

本文以 Spark 的 Python 接口为例,介绍如何构建一个文本情感分析系统。作者采用 Python 3.5.0,Spark1.6.1 作为开发环境,使用 Jupyter Notebook[7]编写代码。Jupyter Notebook 是由 IPython Notebook 演化而来,是一套基于 Web 的交互环境,允许大家将代码、代码执行、数学函数、富文档、绘图以及其它元素整合为单一文件。在运行 pyspark 的之前,需要指定一下 pyspark 的运行环境,如下所示:

清单 1. 指定 pyspark 的 ipython notebook 运行环境
export PYSPARK_PYTHON=ipython3 PYSPARK_DRIVER_PYTHON_OPTS="notebook"

接下里就可以在 Jupyter Notebook 里编写代码了。


基于 Spark 如何构建文本情感分析系统

在本文第 1 章,介绍了文本情感分析主要涉及的四个技术环节。基于 Spark 构建的文本分类系统的技术流程也是这样的。在大规模的文本数据的情况下,有所不同的是文本的特征维度一般都是非常巨大的。试想一下所有的中文字、词有多少,再算上其他的语言和所有能在互联网上找到的文本,那么文本数据按照词的维度就能轻松的超过数十万、数百万维,所以需要寻找一种可以处理极大维度文本数据的方法。

在本文后续章节中,将依次按照基于 Spark 做数据预处理、文本建模、特征提取、训练分类模型、实现待输入文本分类展开讨论。系统的上下文关系图如图 1 所示,系统的功能架构图如图 2 所示。

图 1. 基于 Spark 文本情感分析系统上下文

img001

图 2. 基于 Spark 文本情感分析系统功能架构图

img002

爬取的数据说明

为了说明文本分类系统的构建过程,作者爬取了豆瓣网络上《疯狂动物城》的短评和评分(https://movie.douban.com/subject/25662329/comments)。示例数据如下所示:

表 1. 示例数据
评分 评论文本
5 做冰棍那机智的不像话!!!全片最爱!!!想吃!!!
5 绝对的好片子裂墙推荐。实在是因为另一场满了…随手挑了这个片子。真是 5 分钟一小笑 10 分钟哄堂大笑。看那个又懒又慢树獭简直要锤墙了。旁边法国妹子精辟的吐槽!看!这是我们法国人。我要憋到内伤了。最后散场大家都静坐着等着整首歌放完…五星好评。2016 年度十佳。
5 不要看任何影评,如果可以预告片都别看,直接买票就好了。你要啥这电影里有啥!
3 最精彩的动画是用想象力拍出真实世界难以实现的故事,而不是用动物化填充一段如果是真人就普通到不能再普通的烂俗故事。笑料有,萌趣有,但更有的是莫名其妙的主旋律和政治正确,恐怕没有评分所体现的那么出色。
4 换了新领导就是不一样。迪士尼暗黑大电影,洛杉矶罪案片风格和内核。还真是动物乌托邦,美国针对有色人种,欧洲针对难民,天朝针对公知和五毛吗?人设精彩,细节丰富,但要说创意超《头脑特工队》显然就不实事求是了。
…… ………………

表格中每一行为一条评论数据,按照“评分,评论文本”排放,中间以制表符切分,评分范围从 1 分到 5 分,这样的数据共采集了 116567 条。

数据预处理

这一节本文是要说明用 Spark 是如何做数据清洗和抽取的。在该子系统中输入为爬虫的数据,输出为包含相同数量好评和坏评的 Saprk 弹性分布式数据集。

Spark 数据处理主要是围绕 RDD(Resilient Distributed Datasets) 弹性分布式数据集对象展开,本文首先将爬虫数据载入到 Spark 系统,抽象成为一个 RDD。可以用 distinct 方法对数据去重。数据转换主要是用了 map 方法,它接受传入的一个数据转换的方法来按行执行方法,从而达到转换的操作它只需要用一个函数将输入和输出映射好,那么就能完成转换。数据过滤使用 filter 方法,它能够保留判断条件为真的数据。可以用下面这个语句,将每一行文本变成一个 list,并且只保留长度为 2 的数据。

清单 2. Spark 做数据预处理
originData=sc.textFile('YOUR_FILE_PATH')
originDistinctData=originData.distinct()
rateDocument=originDistinctData.map(lambda line : line.split('\t')).\
filter(lambda line : len(line)==2)
清单 3. 统计数据基本信息
fiveRateDocument=rateDocument.filter(lambda line : int(line[0])==5)
fiveRateDocument.count()

本文得到,五分的数据有 30447 条,4 分、3 分、2 分、1 分的数据分别有 11711 条,123 条,70 条。打五分的毫无疑问是好评;考虑到不同人对于评分的不同偏好,对于打四分的数据,本文无法得知它是好评还是坏评;对于打三分及三分以下的是坏评。

下面就可以将带有评分数据转化成为好评数据和坏评数据,为了提高计算效率,本文将其重新分区。

清单 4. 合并负样本数据
negRateDocument=oneRateDocument.union(twoRateDocument).\
union(threeRateDocument)
negRateDocument.repartition(1)

通过计算得到,好评和坏评分别有 30447 条和 2238 条,属于非平衡样本的机器模型训练。本文只取部分好评数据,好评和坏评的数量一样,这样训练的正负样本就是均衡的。最后把正负样本放在一起,并把分类标签和文本分开,形成训练数据集

清单 5. 生̧成训练数̧据集
posRateDocument=sc.parallelize(fiveRateDocument.take(negRateDocument.count())).repartition(1)
allRateDocument=negRateDocument.union(posRateDocument)
allRateDocument.repartition(1)
rate=allRateDocument.map(lambda s : ReduceRate(s[0]))
document=allRateDocument.map(lambda s: s[1])

文本的向量表示和文本特征提取

这一节中,本文主要介绍如何做文本分词,如何用 TF-IDF 算法抽取文本特征。将输入的文本数据转化为向量,让计算能够“读懂”文本。

解决文本分类问题,最重要的就是要让文本可计算,用合适的方式来表示文本,其中的核心就是找到文本的特征和特征值。相比起英文,中文多了一个分词的过程。本文首先用 jieba 分词器将文本分词,这样每个词都可以作为文本的一个特征。jieba 分词器有三种模式的分词:

  1. 精确模式,试图将句子最精确地切开,适合文本分析;
  2. 全模式,把句子中所有的可以成词的词语都扫描出来, 速度非常快,但是不能解决歧义;
  3. 搜索引擎模式,在精确模式的基础上,对长词再次切分,提高召回率,适合用于搜索引擎分词。

这里本文用的是搜索引擎模式将每一句评论转化为词。

清单 6. 分词
words=document.map(lambda w:"/".\
join(jieba.cut_for_search(w))).\
map(lambda line: line.split("/"))

出于对大规模数据计算需求的考虑,spark 的词频计算是用特征哈希(HashingTF)来计算的。特征哈希是一种处理高维数据的技术,经常应用在文本和分类数据集上。普通的 k 分之一特征编码需要在一个向量中维护可能的特征值及其到下标的映射,而每次构建这个映射的过程本身就需要对数据集进行一次遍历。这并不适合上千万甚至更多维度的特征处理。

特征哈希是通过哈希方程对特征赋予向量下标的,所以在不同情况下,同样的特征就是能够得到相同的向量下标,这样就不需要维护一个特征值及其下表的向量。

要使用特征哈希来处理文本,需要先实例化一个 HashingTF 对象,将词转化为词频,为了高效计算,本文将后面会重复使用的词频缓存。

清单 7. 训练词频矩阵
hashingTF = HashingTF()
tf = hashingTF.transform(words)
tf.cache()

缺省情况下,实例化的 HashingTF 特征维数 numFeatures 取了 220次方维,在 spark 的源码中可以看到,HashingTF 的过程就是对每一个词作了一次哈希并对特征维数取余得到该词的位置,然后按照该词出现的次数计次。所以就不用像传统方法一样每次维护一张词表,运用 HashingTF 就可以方便的得到该词所对应向量元素的位置。当然这样做的代价就是向量维数会非常大,好在 spark 可以支持稀疏向量,所以计算开销并不大。

图 3. HashingTF 源码

img003

词频是一种抽取特征的方法,但是它还有很多问题,比如在这句话中“这几天的天气真好,项目组的老师打算组织大家一起去春游。“的”相比于“项目组”更容易出现在人们的语言中,“的”和“项目组”同样只出现一次,但是项目组对于这句话来说更重要。

本文采用 TF-IDF 作为特征提取的方法,它的权重与特征项在文档中出现的评率成正相关,与在整个语料中出现该特征项的文档成反相关。下面依据 tf 来计算逆词频 idf,并计算出 TF-IDF

清单 8. 计算 TF-IDF 矩阵
idfModel = IDF().fit(tf)
tfidf = idfModel.transform(tf)

至此,本文就抽取出了文本的特征,并用向量去表示了文本。

训练分类模型

在这一小节中,本文介绍如何用 Spark 训练朴素贝叶斯分类模型,这一流程的输入是文本的特征向量及已经标记好的分类标签。在这里本文得到的是分类模型及文本分类的正确率。

现在,有了文本的特征项及特征值,也有了分类标签,需要用 RDD 的 zip 算子将这两部分数据连接起来,并将其转化为分类模型里的 LabeledPoint 类型。并随机将数据分为训练集和测试集,60%作为训练集,40%作为测试集。

清单 9. 生成训练集和测试集
zipped=rate.zip(tfidf)
data=zipped.map(lambda line:LabeledPoint(line[0],line[1]))
training, test = data.randomSplit([0.6, 0.4], seed = 0)

本文用训练数据来训练贝叶斯模型,得到 NBmodel 模型来预测测试集的文本特征向量,并且计算出各个模型的正确率,这个模型的正确率为 74.83%。

清单 10. 训练贝叶斯分类模型
NBmodel = NaiveBayes.train(training, 1.0)
predictionAndLabel = test.map(lambda p : (NBmodel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: 1.0 \
if x[0] == x[1] else 0.0).count() / test.count()

可以看出贝叶斯模型最后的预测模型并不高,但是基于本文采集的数据资源有限,特征提取过程比较简单直接。所以还有很大的优化空间,在第四章中,本文将介绍提高正确率的方法。

分类未标记文档

现在可以用本文训练好的模型来对未标记文本分类,流程是获取用户输入的评论,然后将输入的评论文本分词并转化成 tf-idf 特征向量,然后用 3.4 节中训练好的分类模型来分类。

清单 11. 分类未分类文本
 yourDocument=input("输入待分类的评论:")
 yourwords="/".join(jieba.cut_for_search(yourDocument)).split("/")
yourtf = hashingTF.transform(yourwords)
yourtfidf=idfModel.transform(yourtf)
print('NaiveBayes Model Predict:',NBmodel.predict(yourtfidf),'

当程序输入待分类的评论:“这部电影没有意思,剧情老套,真没劲, 后悔来看了”

程序输出为“NaiveBayes Model Predict: 0.0”。

当程序输入待分类的评论:“太精彩了讲了一个关于梦想的故事剧情很反转制作也很精良”

程序输出为“NaiveBayes Model Predict: 1.0”。

至此,最为简单的文本情感分类系统就构建完整了。


提高正确率的方法

在第三章中,本文介绍了构建文本分类系统的方法,但是正确率只有 74.83%,在这一章中,本文将讲述文本分类正确率低的原因及改进方法。

文本分类正确率低的原因主要有:

  1. 文本预处理比较粗糙,可以进一步处理,比如去掉停用词,去掉低频词;
  2. 特征词抽取信息太少,搜索引擎模式的分词模式不如全分词模式提供的特征项多;
  3. 朴素贝叶斯模型比较简单,可以用其他更为先进的模型算法,如 SVM;
  4. 数据资源太少,本文只能利用了好评、坏评论各 2238 条。数据量太少,由于爬虫爬取的数据,没有进行人工的进一步的筛选,数据质量也得不到 100%的保证。

下面分别就这四个方面,本文进一步深入的进行处理,对模型进行优化。

数据预处理中去掉停用词

停用词是指出现在所有文档中很多次的常用词,比如“的”、“了”、“是”等,可以在提取特征的时候将这些噪声去掉。

首先需要统计一下词频,看哪些词是使用最多的,然后定义一个停用词表,在构建向量前,将这些词去掉。本文先进行词频统计,查看最常用的词是哪些。

清单 12. 统计词频
text=words.flatMap(lambda w:w)
wordCounts = text.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b).\
sortBy(lambda x: x[1],ascending=False)
wordCounts.take(10)

通过观察,选择出现次数比较多,但是对于文本情感表达没有意义的词,作为停用词,构建停用词表。然后定义一个过滤函数,如果该词在停用词表中那么需要将这个词过滤掉。

清单 13. 去掉停用词

stopwords = set([" ","","","","","",……])

def filterStopWords(line):
 for i in line:
 if i in stopwords:
 line.remove(i)
return line
words=words.map(lambda w : filterStopWords(w))

尝试不用的分词模式

本文在分词的时候使用的搜索引擎分词模式,在这种模式下只抽取了重要的关键字,可能忽略了一些可能的特征词。可以把分词模式切换到全分词模式,尽可能的不漏掉特征词,同样的模型训练,正确率会有 1%~2%的提升。

清单 14. 全分词模式分词
words=document.map(lambda w:"/".join(jieba.\
cut(w, cut_all=True))).\
map(lambda line: line.split("/"))

更换训练模型方法

在不进行深入优化的情况下,SVM 往往有着比其他分类模型更好的分类效果。下面在相同的条件下,运用 SVM 模型训练,最后得到的正确率有 78.59%。

清单 15. 用支持向量机训练分类模型
SVMmodel = SVMWithSGD.train(training, iterations=100)
predictionAndLabel = test.map(lambda p : (SVMmodel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()

训练数据的问题

本文只是为了演示如何构建这套系统,所以爬取的数据量并不多,获取的文本数据也没有人工的进一步核对其正确性。如果本文能够有更丰富且权威的数据源,那么模型的正确率将会有较大的提高。

作者对中国科学院大学的谭松波教授发布的酒店产品评论文本做了分类系统测试,该数据集是多数学者公认并且使用的。用 SVM 训练的模型正确率有 87.59%。


总结

本文向读者详细的介绍了利用 Spark 构建文本情感分类系统的过程,从数据的清洗、转换,Spark 的 RDD 有 Filter、Map 方法可以轻松胜任;对于抽取文本特征,Spark 针对大规模数据的处理不仅在计算模型上有优化,还做了算法的优化,它利用哈希特征算法来实现 TF-IDF,从而能够支持上千万维的模型训练;对于选择分类模型,Spark 也实现好了常用的分类模型,调用起来非常方便。最后希望这篇文章可以对大家学习 spark 和文本分类有帮助

参考资料

学习

from:http://www.ibm.com/developerworks/cn/cognitive/library/cc-1606-spark-seniment-analysis/index.html