Category Archives: BigData

40 Interview Questions asked at Startups in Machine Learning / Data Science

Machine learning and data science are being looked as the drivers of the next industrial revolution happening in the world today. This also means that there are numerous exciting startups looking for data scientists.  What could be a better start for your aspiring career!

However, still, getting into these roles is not easy. You obviously need to get excited about the idea, team and the vision of the company. You might also find some real difficult techincal questions on your way. The set of questions asked depend on what does the startup do. Do they provide consulting? Do they build ML products ? You should always find this out prior to beginning your interview preparation.

To help you prepare for your next interview, I’ve prepared a list of 40 plausible & tricky questions which are likely to come across your way in interviews. If you can answer and understand these question, rest assured, you will give a tough fight in your job interview.

Note: A key to answer these questions is to have concrete practical understanding on ML and related statistical concepts.

40 interview questions asked at startups in machine learning

 

Interview Questions on Machine Learning

Q1. You are given a train data set having 1000 columns and 1 million rows. The data set is based on a classification problem. Your manager has asked you to reduce the dimension of this data so that model computation time can be reduced. Your machine has memory constraints. What would you do? (You are free to make practical assumptions.)

Answer: Processing a high dimensional data on a limited memory machine is a strenuous task, your interviewer would be fully aware of that. Following are the methods you can use to tackle such situation:

  1. Since we have lower RAM, we should close all other applications in our machine, including the web browser, so that most of the memory can be put to use.
  2. We can randomly sample the data set. This means, we can create a smaller data set, let’s say, having 1000 variables and 300000 rows and do the computations.
  3. To reduce dimensionality, we can separate the numerical and categorical variables and remove the correlated variables. For numerical variables, we’ll use correlation. For categorical variables, we’ll use chi-square test.
  4. Also, we can use PCA and pick the components which can explain the maximum variance in the data set.
  5. Using online learning algorithms like Vowpal Wabbit (available in Python) is a possible option.
  6. Building a linear model using Stochastic Gradient Descent is also helpful.
  7. We can also apply our business understanding to estimate which all predictors can impact the response variable. But, this is an intuitive approach, failing to identify useful predictors might result in significant loss of information.

Note: For point 4 & 5, make sure you read about online learning algorithms & Stochastic Gradient Descent. These are advanced methods.

Q2. Is rotation necessary in PCA? If yes, Why? What will happen if you don’t rotate the components?

Answer: Yes, rotation (orthogonal) is necessary because it maximizes the difference between variance captured by the component. This makes the components easier to interpret. Not to forget, that’s the motive of doing PCA where, we aim to select fewer components (than features) which can explain the maximum variance in the data set. By doing rotation, the relative location of the components doesn’t change, it only changes the actual coordinates of the points.

If we don’t rotate the components, the effect of PCA will diminish and we’ll have to select more number of components to explain variance in the data set.

Know more: PCA

 

Q3. You are given a data set. The data set has missing values which spread along 1 standard deviation from the median. What percentage of data would remain unaffected? Why?

Answer: This question has enough hints for you to start thinking! Since, the data is spread across median, let’s assume it’s a normal distribution. We know, in a normal distribution, ~68% of the data lies in 1 standard deviation from mean (or mode, median), which leaves ~32% of the data unaffected. Therefore, ~32% of the data would remain unaffected by missing values.

 

Q4. You are given a data set on cancer detection. You’ve build a classification model and achieved an accuracy of 96%. Why shouldn’t you be happy with your model performance? What can you do about it?

Answer: If you have worked on enough data sets, you should deduce that cancer detection results in imbalanced data. In an imbalanced data set, accuracy should not be used as a measure of performance because 96% (as given) might only be predicting majority class correctly, but our class of interest is minority class (4%) which is the people who actually got diagnosed with cancer. Hence, in order to evaluate model performance, we should use Sensitivity (True Positive Rate), Specificity (True Negative Rate), F measure to determine class wise performance of the classifier. If the minority class performance is found to to be poor, we can undertake the following steps:

  1. We can use undersampling, oversampling or SMOTE to make the data balanced.
  2. We can alter the prediction threshold value by doing probability caliberation and finding a optimal threshold using AUC-ROC curve.
  3. We can assign weight to classes such that the minority classes gets larger weight.
  4. We can also use anomaly detection.

Know more: Imbalanced Classification

 

Q5. Why is naive Bayes so ‘naive’ ?

Answer: naive Bayes is so ‘naive’ because it assumes that all of the features in a data set are equally important and independent. As we know, these assumption are rarely true in real world scenario.

 

Q6. Explain prior probability, likelihood and marginal likelihood in context of naiveBayes algorithm?

Answer: Prior probability is nothing but, the proportion of dependent (binary) variable in the data set. It is the closest guess you can make about a class, without any further information. For example: In a data set, the dependent variable is binary (1 and 0). The proportion of 1 (spam) is 70% and 0 (not spam) is 30%. Hence, we can estimate that there are 70% chances that any new email would  be classified as spam.

Likelihood is the probability of classifying a given observation as 1 in presence of some other variable. For example: The probability that the word ‘FREE’ is used in previous spam message is likelihood. Marginal likelihood is, the probability that the word ‘FREE’ is used in any message.

 

Q7. You are working on a time series data set. You manager has asked you to build a high accuracy model. You start with the decision tree algorithm, since you know it works fairly well on all kinds of data. Later, you tried a time series regression model and got higher accuracy than decision tree model. Can this happen? Why?

Answer: Time series data is known to posses linearity. On the other hand, a decision tree algorithm is known to work best to detect non – linear interactions. The reason why decision tree failed to provide robust predictions because it couldn’t map the linear relationship as good as a regression model did. Therefore, we learned that, a linear regression model can provide robust prediction given the data set satisfies its linearity assumptions.

 

Q8. You are assigned a new project which involves helping a food delivery company save more money. The problem is, company’s delivery team aren’t able to deliver food on time. As a result, their customers get unhappy. And, to keep them happy, they end up delivering food for free. Which machine learning algorithm can save them?

Answer: You might have started hopping through the list of ML algorithms in your mind. But, wait! Such questions are asked to test your machine learning fundamentals.

This is not a machine learning problem. This is a route optimization problem. A machine learning problem consist of three things:

  1. There exist a pattern.
  2. You cannot solve it mathematically (even by writing exponential equations).
  3. You have data on it.

Always look for these three factors to decide if machine learning is a tool to solve a particular problem.

 

Q9. You came to know that your model is suffering from low bias and high variance. Which algorithm should you use to tackle it? Why?

Answer:  Low bias occurs when the model’s predicted values are near to actual values. In other words, the model becomes flexible enough to mimic the training data distribution. While it sounds like great achievement, but not to forget, a flexible model has no generalization capabilities. It means, when this model is tested on an unseen data, it gives disappointing results.

In such situations, we can use bagging algorithm (like random forest) to tackle high variance problem. Bagging algorithms divides a data set into subsets made with repeated randomized sampling. Then, these samples are used to generate  a set of models using a single learning algorithm. Later, the model predictions are combined using voting (classification) or averaging (regression).

Also, to combat high variance, we can:

  1. Use regularization technique, where higher model coefficients get penalized, hence lowering model complexity.
  2. Use top n features from variable importance chart. May be, with all the variable in the data set, the algorithm is having difficulty in finding the meaningful signal.

 

Q10. You are given a data set. The data set contains many variables, some of which are highly correlated and you know about it. Your manager has asked you to run PCA. Would you remove correlated variables first? Why?

Answer: Chances are, you might be tempted to say No, but that would be incorrect. Discarding correlated variables have a substantial effect on PCA because, in presence of correlated variables, the variance explained by a particular component gets inflated.

For example: You have 3 variables in a data set, of which 2 are correlated. If you run PCA on this data set, the first principal component would exhibit twice the variance than it would exhibit with uncorrelated variables. Also, adding correlated variables lets PCA put more importance on those variable, which is misleading.

 

Q11. After spending several hours, you are now anxious to build a high accuracy model. As a result, you build 5 GBM models, thinking a boosting algorithm would do the magic. Unfortunately, neither of models could perform better than benchmark score. Finally, you decided to combine those models. Though, ensembled models are known to return high accuracy, but you are unfortunate. Where did you miss?

Answer: As we know, ensemble learners are based on the idea of combining weak learners to create strong learners. But, these learners provide superior result when the combined models are uncorrelated. Since, we have used 5 GBM models and got no accuracy improvement, suggests that the models are correlated. The problem with correlated models is, all the models provide same information.

For example: If model 1 has classified User1122 as 1, there are high chances model 2 and model 3 would have done the same, even if its actual value is 0. Therefore, ensemble learners are built on the premise of combining weak uncorrelated models to obtain better predictions.

 

Q12. How is kNN different from kmeans clustering?

Answer: Don’t get mislead by ‘k’ in their names. You should know that the fundamental difference between both these algorithms is, kmeans is unsupervised in nature and kNN is supervised in nature. kmeans is a clustering algorithm. kNN is a classification (or regression) algorithm.

kmeans algorithm partitions a data set into clusters such that a cluster formed is homogeneous and the points in each cluster are close to each other. The algorithm tries to maintain enough separability between these clusters. Due to unsupervised nature, the clusters have no labels.

kNN algorithm tries to classify an unlabeled observation based on its k (can be any number ) surrounding neighbors. It is also known as lazy learner because it involves minimal training of model. Hence, it doesn’t use training data to make generalization on unseen data set.

 

Q13. How is True Positive Rate and Recall related? Write the equation.

Answer: True Positive Rate = Recall. Yes, they are equal having the formula (TP/TP + FN).

Know more: Evaluation Metrics

 

Q14. You have built a multiple regression model. Your model R² isn’t as good as you wanted. For improvement, your remove the intercept term, your model R² becomes 0.8 from 0.3. Is it possible? How?

Answer: Yes, it is possible. We need to understand the significance of intercept term in a regression model. The intercept term shows model prediction without any independent variable i.e. mean prediction. The formula of R² = 1 – ∑(y – y´)²/∑(y – ymean)² where y´ is predicted value.   

When intercept term is present, R² value evaluates your model wrt. to the mean model. In absence of intercept term (ymean), the model can make no such evaluation, with large denominator, ∑(y - y´)²/∑(y)² equation’s value becomes smaller than actual, resulting in higher R².

 

Q15. After analyzing the model, your manager has informed that your regression model is suffering from multicollinearity. How would you check if he’s true? Without losing any information, can you still build a better model?

Answer: To check multicollinearity, we can create a correlation matrix to identify & remove variables having correlation above 75% (deciding a threshold is subjective). In addition, we can use calculate VIF (variance inflation factor) to check the presence of multicollinearity. VIF value <= 4 suggests no multicollinearity whereas a value of >= 10 implies serious multicollinearity. Also, we can use tolerance as an indicator of multicollinearity.

But, removing correlated variables might lead to loss of information. In order to retain those variables, we can use penalized regression models like ridge or lasso regression. Also, we can add some random noise in correlated variable so that the variables become different from each other. But, adding noise might affect the prediction accuracy, hence this approach should be carefully used.

Know more: Regression

 

Q16. When is Ridge regression favorable over Lasso regression?

Answer: You can quote ISLR’s authors Hastie, Tibshirani who asserted that, in presence of few variables with medium / large sized effect, use lasso regression. In presence of many variables with small / medium sized effect, use ridge regression.

Conceptually, we can say, lasso regression (L1) does both variable selection and parameter shrinkage, whereas Ridge regression only does parameter shrinkage and end up including all the coefficients in the model. In presence of correlated variables, ridge regression might be the preferred choice. Also, ridge regression works best in situations where the least square estimates have higher variance. Therefore, it depends on our model objective.

Know more: Ridge and Lasso Regression

 

Q17. Rise in global average temperature led to decrease in number of pirates around the world. Does that mean that decrease in number of pirates caused the climate change?

Answer: After reading this question, you should have understood that this is a classic case of “causation and correlation”. No, we can’t conclude that decrease in number of pirates caused the climate change because there might be other factors (lurking or confounding variables) influencing this phenomenon.

Therefore, there might be a correlation between global average temperature and number of pirates, but based on this information we can’t say that pirated died because of rise in global average temperature.

Know more: Causation and Correlation

 

Q18. While working on a data set, how do you select important variables? Explain your methods.

Answer: Following are the methods of variable selection you can use:

  1. Remove the correlated variables prior to selecting important variables
  2. Use linear regression and select variables based on p values
  3. Use Forward Selection, Backward Selection, Stepwise Selection
  4. Use Random Forest, Xgboost and plot variable importance chart
  5. Use Lasso Regression
  6. Measure information gain for the available set of features and select top n features accordingly.

 

Q19. What is the difference between covariance and correlation?

Answer: Correlation is the standardized form of covariance.

Covariances are difficult to compare. For example: if we calculate the covariances of salary ($) and age (years), we’ll get different covariances which can’t be compared because of having unequal scales. To combat such situation, we calculate correlation to get a value between -1 and 1, irrespective of their respective scale.

 

Q20. Is it possible capture the correlation between continuous and categorical variable? If yes, how?

Answer: Yes, we can use ANCOVA (analysis of covariance) technique to capture association between continuous and categorical variables.

 

Q21. Both being tree based algorithm, how is random forest different from Gradient boosting algorithm (GBM)?

Answer: The fundamental difference is, random forest uses bagging technique to make predictions. GBM uses boosting techniques to make predictions.

In bagging technique, a data set is divided into n samples using randomized sampling. Then, using a single learning algorithm a model is build on all samples. Later, the resultant predictions are combined using voting or averaging. Bagging is done is parallel. In boosting, after the first round of predictions, the algorithm weighs misclassified predictions higher, such that they can be corrected in the succeeding round. This sequential process of giving higher weights to misclassified predictions continue until a stopping criterion is reached.

Random forest improves model accuracy by reducing variance (mainly). The trees grown are uncorrelated to maximize the decrease in variance. On the other hand, GBM improves accuracy my reducing both bias and variance in a model.

Know more: Tree based modeling

 

Q22. Running a binary classification tree algorithm is the easy part. Do you know how does a tree splitting takes place i.e. how does the tree decide which variable to split at the root node and succeeding nodes?

Answer: A classification trees makes decision based on Gini Index and Node Entropy. In simple words, the tree algorithm find the best possible feature which can divide the data set into purest possible children nodes.

Gini index says, if we select two items from a population at random then they must be of same class and probability for this is 1 if population is pure. We can calculate Gini as following:

  1. Calculate Gini for sub-nodes, using formula sum of square of probability for success and failure (p^2+q^2).
  2. Calculate Gini for split using weighted Gini score of each node of that split

Entropy is the measure of impurity as given by (for binary class):

Entropy, Decision Tree

Here p and q is probability of success and failure respectively in that node. Entropy is zero when a node is homogeneous. It is maximum when a both the classes are present in a node at 50% – 50%.  Lower entropy is desirable.

 

Q23. You’ve built a random forest model with 10000 trees. You got delighted after getting training error as 0.00. But, the validation error is 34.23. What is going on? Haven’t you trained your model perfectly?

Answer: The model has overfitted. Training error 0.00 means the classifier has mimiced the training data patterns to an extent, that they are not available in the unseen data. Hence, when this classifier was run on unseen sample, it couldn’t find those patterns and returned prediction with higher error. In random forest, it happens when we use larger number of trees than necessary. Hence, to avoid these situation, we should tune number of trees using cross validation.

 

Q24. You’ve got a data set to work having p (no. of variable) > n (no. of observation). Why is OLS as bad option to work with? Which techniques would be best to use? Why?

Answer: In such high dimensional data sets, we can’t use classical regression techniques, since their assumptions tend to fail. When p > n, we can no longer calculate a unique least square coefficient estimate, the variances become infinite, so OLS cannot be used at all.

To combat this situation, we can use penalized regression methods like lasso, LARS, ridge which can shrink the coefficients to reduce variance. Precisely, ridge regression works best in situations where the least square estimates have higher variance.

Among other methods include subset regression, forward stepwise regression.

 

11222Q25. What is convex hull ? (Hint: Think SVM)

Answer: In case of linearly separable data, convex hull represents the outer boundaries of the two group of data points. Once convex hull is created, we get maximum margin hyperplane (MMH) as a perpendicular bisector between two convex hulls. MMH is the line which attempts to create greatest separation between two groups.

 

Q26. We know that one hot encoding increasing the dimensionality of a data set. But, label encoding doesn’t. How ?

Answer: Don’t get baffled at this question. It’s a simple question asking the difference between the two.

Using one hot encoding, the dimensionality (a.k.a features) in a data set get increased because it creates a new variable for each level present in categorical variables. For example: let’s say we have a variable ‘color’. The variable has 3 levels namely Red, Blue and Green. One hot encoding ‘color’ variable will generate three new variables as Color.Red, Color.Blue and Color.Green containing 0 and 1 value.

In label encoding, the levels of a categorical variables gets encoded as 0 and 1, so no new variable is created. Label encoding is majorly used for binary variables.

 

Q27. What cross validation technique would you use on time series data set? Is it k-fold or LOOCV?

Answer: Neither.

In time series problem, k fold can be troublesome because there might be some pattern in year 4 or 5 which is not in year 3. Resampling the data set will separate these trends, and we might end up validation on past years, which is incorrect. Instead, we can use forward chaining strategy with 5 fold as shown below:

  • fold 1 : training [1], test [2]
  • fold 2 : training [1 2], test [3]
  • fold 3 : training [1 2 3], test [4]
  • fold 4 : training [1 2 3 4], test [5]
  • fold 5 : training [1 2 3 4 5], test [6]

where 1,2,3,4,5,6 represents “year”.

 

Q28. You are given a data set consisting of variables having more than 30% missing values? Let’s say, out of 50 variables, 8 variables have missing values higher than 30%. How will you deal with them?

Answer: We can deal with them in the following ways:

  1. Assign a unique category to missing values, who knows the missing values might decipher some trend
  2. We can remove them blatantly.
  3. Or, we can sensibly check their distribution with the target variable, and if found any pattern we’ll keep those missing values and assign them a new category while removing others.

 

29. ‘People who bought this, also bought…’ recommendations seen on amazon is a result of which algorithm?

Answer: The basic idea for this kind of recommendation engine comes from collaborative filtering.

Collaborative Filtering algorithm considers “User Behavior” for recommending items. They exploit behavior of other users and items in terms of transaction history, ratings, selection and purchase information. Other users behaviour and preferences over the items are used to recommend items to the new users. In this case, features of the items are not known.

Know more: Recommender System

 

Q30. What do you understand by Type I vs Type II error ?

Answer: Type I error is committed when the null hypothesis is true and we reject it, also known as a ‘False Positive’. Type II error is committed when the null hypothesis is false and we accept it, also known as ‘False Negative’.

In the context of confusion matrix, we can say Type I error occurs when we classify a value as positive (1) when it is actually negative (0). Type II error occurs when we classify a value as negative (0) when it is actually positive(1).

 

Q31. You are working on a classification problem. For validation purposes, you’ve randomly sampled the training data set into train and validation. You are confident that your model will work incredibly well on unseen data since your validation accuracy is high. However, you get shocked after getting poor test accuracy. What went wrong?

Answer: In case of classification problem, we should always use stratified sampling instead of random sampling. A random sampling doesn’t takes into consideration the proportion of target classes. On the contrary, stratified sampling helps to maintain the distribution of target variable in the resultant distributed samples also.

 

Q32. You have been asked to evaluate a regression model based on R², adjusted R² and tolerance. What will be your criteria?

Answer: Tolerance (1 / VIF) is used as an indicator of multicollinearity. It is an indicator of percent of variance in a predictor which cannot be accounted by other predictors. Large values of tolerance is desirable.

We will consider adjusted R² as opposed to R² to evaluate model fit because R² increases irrespective of improvement in prediction accuracy as we add more variables. But, adjusted R² would only increase if an additional variable improves the accuracy of model, otherwise stays same. It is difficult to commit a general threshold value for adjusted R² because it varies between data sets. For example: a gene mutation data set might result in lower adjusted R² and still provide fairly good predictions, as compared to a stock market data where lower adjusted R² implies that model is not good.

 

Q33. In k-means or kNN, we use euclidean distance to calculate the distance between nearest neighbors. Why not manhattan distance ?

Answer: We don’t use manhattan distance because it calculates distance horizontally or vertically only. It has dimension restrictions. On the other hand, euclidean metric can be used in any space to calculate distance. Since, the data points can be present in any dimension, euclidean distance is a more viable option.

Example: Think of a chess board, the movement made by a bishop or a rook is calculated by manhattan distance because of their respective vertical & horizontal movements.

 

Q34. Explain machine learning to me like a 5 year old.

Answer: It’s simple. It’s just like how babies learn to walk. Every time they fall down, they learn (unconsciously) & realize that their legs should be straight and not in a bend position. The next time they fall down, they feel pain. They cry. But, they learn ‘not to stand like that again’. In order to avoid that pain, they try harder. To succeed, they even seek support from the door or wall or anything near them, which helps them stand firm.

This is how a machine works & develops intuition from its environment.

Note: The interview is only trying to test if have the ability of explain complex concepts in simple terms.

 

Q35. I know that a linear regression model is generally evaluated using Adjusted R² or F value. How would you evaluate a logistic regression model?

Answer: We can use the following methods:

  1. Since logistic regression is used to predict probabilities, we can use AUC-ROC curve along with confusion matrix to determine its performance.
  2. Also, the analogous metric of adjusted R² in logistic regression is AIC. AIC is the measure of fit which penalizes model for the number of model coefficients. Therefore, we always prefer model with minimum AIC value.
  3. Null Deviance indicates the response predicted by a model with nothing but an intercept. Lower the value, better the model. Residual deviance indicates the response predicted by a model on adding independent variables. Lower the value, better the model.

Know more: Logistic Regression

 

Q36. Considering the long list of machine learning algorithm, given a data set, how do you decide which one to use?

Answer: You should say, the choice of machine learning algorithm solely depends of the type of data. If you are given a data set which is exhibits linearity, then linear regression would be the best algorithm to use. If you given to work on images, audios, then neural network would help you to build a robust model.

If the data comprises of non linear interactions, then a boosting or bagging algorithm should be the choice. If the business requirement is to build a model which can be deployed, then we’ll use regression or a decision tree model (easy to interpret and explain) instead of black box algorithms like SVM, GBM etc.

In short, there is no one master algorithm for all situations. We must be scrupulous enough to understand which algorithm to use.

 

Q37. Do you suggest that treating a categorical variable as continuous variable would result in a better predictive model?

Answer: For better predictions, categorical variable can be considered as a continuous variable only when the variable is ordinal in nature.

 

Q38. When does regularization becomes necessary in Machine Learning?

Answer: Regularization becomes necessary when the model begins to ovefit / underfit. This technique introduces a cost term for bringing in more features with the objective function. Hence, it tries to push the coefficients for many variables to zero and hence reduce cost term. This helps to reduce model complexity so that the model can become better at predicting (generalizing).

 

Q39. What do you understand by Bias Variance trade off?

Answer:  The error emerging from any model can be broken down into three components mathematically. Following are these component :

error of a model

Bias error is useful to quantify how much on an average are the predicted values different from the actual value. A high bias error means we have a under-performing model which keeps on missing important trends. Variance on the other side quantifies how are the prediction made on same observation different from each other. A high variance model will over-fit on your training population and perform badly on any observation beyond training.

 

Q40. OLS is to linear regression. Maximum likelihood is to logistic regression. Explain the statement.

Answer: OLS and Maximum likelihood are the methods used by the respective regression methods to approximate the unknown parameter (coefficient) value. In simple words,

Ordinary least square(OLS) is a method used in linear regression which approximates the parameters resulting in minimum distance between actual and predicted values. Maximum Likelihood helps in choosing the the values of parameters which maximizes the likelihood that the parameters are most likely to produce observed data.

 

End Notes

You might have been able to answer all the questions, but the real value is in understanding them and generalizing your knowledge on similar questions. If you have struggled at these questions, no worries, now is the time to learn and not perform. You should right now focus on learning these topics scrupulously.

These questions are meant to give you a wide exposure on the types of questions asked at startups in machine learning. I’m sure these questions would leave you curious enough to do deeper topic research at your end. If you are planning for it, that’s a good sign.

Did you like reading this article? Have you appeared in any startup interview recently for data scientist profile? Do share your experience in comments below. I’d love to know your experience.

from:https://www.analyticsvidhya.com/blog/2016/09/40-interview-questions-asked-at-startups-in-machine-learning-data-science/

Data science Python notebooks

 

data-science-ipython-notebooks

Index

 

deep-learning

IPython Notebook(s) demonstrating deep learning functionality.

 

tensor-flow-tutorials

Additional TensorFlow tutorials:

Notebook Description
tsf-basics Learn basic operations in TensorFlow, a library for various kinds of perceptual and language understanding tasks from Google.
tsf-linear Implement linear regression in TensorFlow.
tsf-logistic Implement logistic regression in TensorFlow.
tsf-nn Implement nearest neighboars in TensorFlow.
tsf-alex Implement AlexNet in TensorFlow.
tsf-cnn Implement convolutional neural networks in TensorFlow.
tsf-mlp Implement multilayer perceptrons in TensorFlow.
tsf-rnn Implement recurrent neural networks in TensorFlow.
tsf-gpu Learn about basic multi-GPU computation in TensorFlow.
tsf-gviz Learn about graph visualization in TensorFlow.
tsf-lviz Learn about loss visualization in TensorFlow.

tensor-flow-exercises

Notebook Description
tsf-not-mnist Learn simple data curation by creating a pickle with formatted datasets for training, development and testing in TensorFlow.
tsf-fully-connected Progressively train deeper and more accurate models using logistic regression and neural networks in TensorFlow.
tsf-regularization Explore regularization techniques by training fully connected networks to classify notMNIST characters in TensorFlow.
tsf-convolutions Create convolutional neural networks in TensorFlow.
tsf-word2vec Train a skip-gram model over Text8 data in TensorFlow.
tsf-lstm Train a LSTM character model over Text8 data in TensorFlow.

 

theano-tutorials

Notebook Description
theano-intro Intro to Theano, which allows you to define, optimize, and evaluate mathematical expressions involving multi-dimensional arrays efficiently. It can use GPUs and perform efficient symbolic differentiation.
theano-scan Learn scans, a mechanism to perform loops in a Theano graph.
theano-logistic Implement logistic regression in Theano.
theano-rnn Implement recurrent neural networks in Theano.
theano-mlp Implement multilayer perceptrons in Theano.

 

keras-tutorials

Notebook Description
keras Keras is an open source neural network library written in Python. It is capable of running on top of either Tensorflow or Theano.
setup Learn about the tutorial goals and how to set up your Keras environment.
intro-deep-learning-ann Get an intro to deep learning with Keras and Artificial Neural Networks (ANN).
theano Learn about Theano by working with weights matrices and gradients.
keras-otto Learn about Keras by looking at the Kaggle Otto challenge.
ann-mnist Review a simple implementation of ANN for MNIST using Keras.
conv-nets Learn about Convolutional Neural Networks (CNNs) with Keras.
conv-net-1 Recognize handwritten digits from MNIST using Keras – Part 1.
conv-net-2 Recognize handwritten digits from MNIST using Keras – Part 2.
keras-models Use pre-trained models such as VGG16, VGG19, ResNet50, and Inception v3 with Keras.
auto-encoders Learn about Autoencoders with Keras.
rnn-lstm Learn about Recurrent Neural Networks (RNNs) with Keras.
lstm-sentence-gen Learn about RNNs using Long Short Term Memory (LSTM) networks with Keras.

deep-learning-misc

Notebook Description
deep-dream Caffe-based computer vision program which uses a convolutional neural network to find and enhance patterns in images.

 

scikit-learn

IPython Notebook(s) demonstrating scikit-learn functionality.

Notebook Description
intro Intro notebook to scikit-learn. Scikit-learn adds Python support for large, multi-dimensional arrays and matrices, along with a large library of high-level mathematical functions to operate on these arrays.
knn Implement k-nearest neighbors in scikit-learn.
linear-reg Implement linear regression in scikit-learn.
svm Implement support vector machine classifiers with and without kernels in scikit-learn.
random-forest Implement random forest classifiers and regressors in scikit-learn.
k-means Implement k-means clustering in scikit-learn.
pca Implement principal component analysis in scikit-learn.
gmm Implement Gaussian mixture models in scikit-learn.
validation Implement validation and model selection in scikit-learn.

 

statistical-inference-scipy

IPython Notebook(s) demonstrating statistical inference with SciPy functionality.

Notebook Description
scipy SciPy is a collection of mathematical algorithms and convenience functions built on the Numpy extension of Python. It adds significant power to the interactive Python session by providing the user with high-level commands and classes for manipulating and visualizing data.
effect-size Explore statistics that quantify effect size by analyzing the difference in height between men and women. Uses data from the Behavioral Risk Factor Surveillance System (BRFSS) to estimate the mean and standard deviation of height for adult women and men in the United States.
sampling Explore random sampling by analyzing the average weight of men and women in the United States using BRFSS data.
hypothesis Explore hypothesis testing by analyzing the difference of first-born babies compared with others.

 

pandas

IPython Notebook(s) demonstrating pandas functionality.

Notebook Description
pandas Software library written for data manipulation and analysis in Python. Offers data structures and operations for manipulating numerical tables and time series.
github-data-wrangling Learn how to load, clean, merge, and feature engineer by analyzing GitHub data from the Viz repo.
Introduction-to-Pandas Introduction to Pandas.
Introducing-Pandas-Objects Learn about Pandas objects.
Data Indexing and Selection Learn about data indexing and selection in Pandas.
Operations-in-Pandas Learn about operating on data in Pandas.
Missing-Values Learn about handling missing data in Pandas.
Hierarchical-Indexing Learn about hierarchical indexing in Pandas.
Concat-And-Append Learn about combining datasets: concat and append in Pandas.
Merge-and-Join Learn about combining datasets: merge and join in Pandas.
Aggregation-and-Grouping Learn about aggregation and grouping in Pandas.
Pivot-Tables Learn about pivot tables in Pandas.
Working-With-Strings Learn about vectorized string operations in Pandas.
Working-with-Time-Series Learn about working with time series in pandas.
Performance-Eval-and-Query Learn about high-performance Pandas: eval() and query() in Pandas.

 

matplotlib

IPython Notebook(s) demonstrating matplotlib functionality.

Notebook Description
matplotlib Python 2D plotting library which produces publication quality figures in a variety of hardcopy formats and interactive environments across platforms.
matplotlib-applied Apply matplotlib visualizations to Kaggle competitions for exploratory data analysis. Learn how to create bar plots, histograms, subplot2grid, normalized plots, scatter plots, subplots, and kernel density estimation plots.
Introduction-To-Matplotlib Introduction to Matplotlib.
Simple-Line-Plots Learn about simple line plots in Matplotlib.
Simple-Scatter-Plots Learn about simple scatter plots in Matplotlib.
Errorbars.ipynb Learn about visualizing errors in Matplotlib.
Density-and-Contour-Plots Learn about density and contour plots in Matplotlib.
Histograms-and-Binnings Learn about histograms, binnings, and density in Matplotlib.
Customizing-Legends Learn about customizing plot legends in Matplotlib.
Customizing-Colorbars Learn about customizing colorbars in Matplotlib.
Multiple-Subplots Learn about multiple subplots in Matplotlib.
Text-and-Annotation Learn about text and annotation in Matplotlib.
Customizing-Ticks Learn about customizing ticks in Matplotlib.
Settings-and-Stylesheets Learn about customizing Matplotlib: configurations and stylesheets.
Three-Dimensional-Plotting Learn about three-dimensional plotting in Matplotlib.
Geographic-Data-With-Basemap Learn about geographic data with basemap in Matplotlib.
Visualization-With-Seaborn Learn about visualization with Seaborn.

 

numpy

IPython Notebook(s) demonstrating NumPy functionality.

Notebook Description
numpy Adds Python support for large, multi-dimensional arrays and matrices, along with a large library of high-level mathematical functions to operate on these arrays.
Introduction-to-NumPy Introduction to NumPy.
Understanding-Data-Types Learn about data types in Python.
The-Basics-Of-NumPy-Arrays Learn about the basics of NumPy arrays.
Computation-on-arrays-ufuncs Learn about computations on NumPy arrays: universal functions.
Computation-on-arrays-aggregates Learn about aggregations: min, max, and everything in between in NumPy.
Computation-on-arrays-broadcasting Learn about computation on arrays: broadcasting in NumPy.
Boolean-Arrays-and-Masks Learn about comparisons, masks, and boolean logic in NumPy.
Fancy-Indexing Learn about fancy indexing in NumPy.
Sorting Learn about sorting arrays in NumPy.
Structured-Data-NumPy Learn about structured data: NumPy’s structured arrays.

 

python-data

IPython Notebook(s) demonstrating Python functionality geared towards data analysis.

Notebook Description
data structures Learn Python basics with tuples, lists, dicts, sets.
data structure utilities Learn Python operations such as slice, range, xrange, bisect, sort, sorted, reversed, enumerate, zip, list comprehensions.
functions Learn about more advanced Python features: Functions as objects, lambda functions, closures, *args, **kwargs currying, generators, generator expressions, itertools.
datetime Learn how to work with Python dates and times: datetime, strftime, strptime, timedelta.
logging Learn about Python logging with RotatingFileHandler and TimedRotatingFileHandler.
pdb Learn how to debug in Python with the interactive source code debugger.
unit tests Learn how to test in Python with Nose unit tests.

 

kaggle-and-business-analyses

IPython Notebook(s) used in kaggle competitions and business analyses.

Notebook Description
titanic Predict survival on the Titanic. Learn data cleaning, exploratory data analysis, and machine learning.
churn-analysis Predict customer churn. Exercise logistic regression, gradient boosting classifers, support vector machines, random forests, and k-nearest-neighbors. Includes discussions of confusion matrices, ROC plots, feature importances, prediction probabilities, and calibration/descrimination.

 

spark

IPython Notebook(s) demonstrating spark and HDFS functionality.

Notebook Description
spark In-memory cluster computing framework, up to 100 times faster for certain applications and is well suited for machine learning algorithms.
hdfs Reliably stores very large files across machines in a large cluster.

 

mapreduce-python

IPython Notebook(s) demonstrating Hadoop MapReduce with mrjob functionality.

Notebook Description
mapreduce-python Runs MapReduce jobs in Python, executing jobs locally or on Hadoop clusters. Demonstrates Hadoop Streaming in Python code with unit test and mrjob config file to analyze Amazon S3 bucket logs on Elastic MapReduce. Disco is another python-based alternative.

 

aws

IPython Notebook(s) demonstrating Amazon Web Services (AWS) and AWS tools functionality.

Also check out:

  • SAWS: A Supercharged AWS command line interface (CLI).
  • Awesome AWS: A curated list of libraries, open source repos, guides, blogs, and other resources.
Notebook Description
boto Official AWS SDK for Python.
s3cmd Interacts with S3 through the command line.
s3distcp Combines smaller files and aggregates them together by taking in a pattern and target file. S3DistCp can also be used to transfer large volumes of data from S3 to your Hadoop cluster.
s3-parallel-put Uploads multiple files to S3 in parallel.
redshift Acts as a fast data warehouse built on top of technology from massive parallel processing (MPP).
kinesis Streams data in real time with the ability to process thousands of data streams per second.
lambda Runs code in response to events, automatically managing compute resources.

 

commands

IPython Notebook(s) demonstrating various command lines for Linux, Git, etc.

Notebook Description
linux Unix-like and mostly POSIX-compliant computer operating system. Disk usage, splitting files, grep, sed, curl, viewing running processes, terminal syntax highlighting, and Vim.
anaconda Distribution of the Python programming language for large-scale data processing, predictive analytics, and scientific computing, that aims to simplify package management and deployment.
ipython notebook Web-based interactive computational environment where you can combine code execution, text, mathematics, plots and rich media into a single document.
git Distributed revision control system with an emphasis on speed, data integrity, and support for distributed, non-linear workflows.
ruby Used to interact with the AWS command line and for Jekyll, a blog framework that can be hosted on GitHub Pages.
jekyll Simple, blog-aware, static site generator for personal, project, or organization sites. Renders Markdown or Textile and Liquid templates, and produces a complete, static website ready to be served by Apache HTTP Server, Nginx or another web server.
pelican Python-based alternative to Jekyll.
django High-level Python Web framework that encourages rapid development and clean, pragmatic design. It can be useful to share reports/analyses and for blogging. Lighter-weight alternatives include Pyramid, Flask, Tornado, and Bottle.

misc

IPython Notebook(s) demonstrating miscellaneous functionality.

Notebook Description
regex Regular expression cheat sheet useful in data wrangling.
algorithmia Algorithmia is a marketplace for algorithms. This notebook showcases 4 different algorithms: Face Detection, Content Summarizer, Latent Dirichlet Allocation and Optical Character Recognition.

notebook-installation

anaconda

Anaconda is a free distribution of the Python programming language for large-scale data processing, predictive analytics, and scientific computing that aims to simplify package management and deployment.

Follow instructions to install Anaconda or the more lightweight miniconda.

dev-setup

For detailed instructions, scripts, and tools to set up your development environment for data analysis, check out the dev-setup repo.

running-notebooks

To view interactive content or to modify elements within the IPython notebooks, you must first clone or download the repository then run the notebook. More information on IPython Notebooks can be found here.

$ git clone https://github.com/donnemartin/data-science-ipython-notebooks.git
$ cd data-science-ipython-notebooks
$ jupyter notebook

Notebooks tested with Python 2.7.x.

credits

contributing

Contributions are welcome! For bug reports or requests please submit an issue.

contact-info

Feel free to contact me to discuss any issues, questions, or comments.

license

This repository contains a variety of content; some developed by Donne Martin, and some from third-parties. The third-party content is distributed under the license provided by those parties.

The content developed by Donne Martin is distributed under the following license:

I am providing code and resources in this repository to you under an open source license. Because this is my personal repository, the license you receive to my code and resources is from me and not my employer (Facebook).

Copyright 2015 Donne Martin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License

Flume+Kafka+Storm+Redis实时分析系统基本架构

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同。这篇文章的目的只是带大家入个门,让大家对实时分析技术有一个简单的认识,并和大家一起做学习交流。
文章的最后还有Troubleshooting,分享了作者在部署本文示例程序过程中所遇到的各种问题和解决方案。

系统基本架构

整个实时分析系统的架构就是先由电商系统的订单服务器产生订单日志, 然后使用Flume去监听订单日志,并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 接着由Storm系统消费Kafka中的消息,同时消费记录由Zookeeper集群管理,这样即使Kafka宕机重启后也能找到上次的消费记录,接着从上次宕机点继续从Kafka的Broker中进行消费。但是由于存在先消费后记录日志或者先记录后消费的非原子操作,如果出现刚好消费完一条消息并还没将信息记录到Zookeeper的时候就宕机的类似问题,或多或少都会存在少量数据丢失或重复消费的问题, 其中一个解决方案就是Kafka的Broker和Zookeeper都部署在同一台机子上。接下来就是使用用户定义好的Storm Topology去进行日志信息的分析并输出到Redis缓存数据库中(也可以进行持久化),最后用Web APP去读取Redis中分析后的订单信息并展示给用户。之所以在Flume和Storm中间加入一层Kafka消息系统,就是因为在高并发的条件下, 订单日志的数据会井喷式增长,如果Storm的消费速度(Storm的实时计算能力那是最快之一,但是也有例外, 而且据说现在Twitter的开源实时计算框架Heron比Storm还要快)慢于日志的产生速度,加上Flume自身的局限性,必然会导致大量数据滞后并丢失,所以加了Kafka消息系统作为数据缓冲区,而且Kafka是基于log File的消息系统,也就是说消息能够持久化在硬盘中,再加上其充分利用Linux的I/O特性,提供了可观的吞吐量。架构中使用Redis作为数据库也是因为在实时的环境下,Redis具有很高的读写速度。

业务背景
各大电商网站在合适的时间进行各种促销活动已是常态,在能为网站带来大量的流量和订单的同时,对于用户也有不小的让利,必然是大家伙儿喜闻乐见的。在促销活动期间,老板和运营希望能实时看到订单情况,老板开心,运营也能根据实时的订单数据调整运营策略,而让用户能实时看到网站的订单数据,也会勾起用户的购买欲。但是普通的离线计算系统已然不能满足在高并发环境下的实时计算要求,所以我们得使用专门实时计算系统,如:Storm, Heron, Spark Stream等,去满足类似的需求。
既然要分析订单数据,那必然在订单产生的时候要把订单信息记录在日志文件中。本文中,作者通过使用log4j2,以及结合自己之前开发电商系统的经验,写了一个订单日志生成模拟器,代码如下,能帮助大家随机产生订单日志。下面所展示的订单日志文件格式和数据就是我们本文中的分析目标,本文的案例中用来分析所有商家的订单总销售额并找出销售额钱20名的商家。

订单数据格式:
orderNumber: XX | orderDate: XX | paymentNumber: XX | paymentDate: XX | merchantName: XX | sku: [ skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;] | price: [ totalPrice: XX discount: XX paymentPrice: XX ]

订单日志生成程序:
使用log4j2将日志信息写入文件中,每小时滚动一次日志文件

  1.       
  2.         
  3.           
  4.         
  5.         
  6.           filePattern=”/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log”>
  7.             
  8.             
  9.                 
  10.             
  11.         
  12.       
  13.       
  14.         
  15.           
  16.           
  17.         
  18.       


      
        
          
        
    	
        	
       		
        		
      		
                  		
      
      
        
          
          
        
      

生成器代码:

  1. package com.guludada.ordersInfo;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Random;
  5. // Import log4j classes.
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. public class ordersInfoGenerator {
  9.     public enum paymentWays {
  10.         Wechat,Alipay,Paypal
  11.     }
  12.     public enum merchantNames {
  13.         优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
  14.         暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
  15.     }
  16.     public enum productNames {
  17.         黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
  18.         沙滩拖鞋
  19.     }
  20.     float[] skuPriceGroup = {299,399,699,899,1000,2000};
  21.     float[] discountGroup = {10,20,50,100};
  22.     float totalPrice = 0;
  23.     float discount = 0;
  24.     float paymentPrice = 0;
  25.     private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
  26.     private int logsNumber = 1000;
  27.     public void generate() {
  28.         for(int i = 0; i <= logsNumber; i++) {
  29.             logger.info(randomOrderInfo());
  30.         }
  31.     }
  32.     public String randomOrderInfo() {
  33.         SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
  34.         Date date = new Date();
  35.         String orderNumber = randomNumbers(5) + date.getTime();
  36.         String orderDate = sdf.format(date);
  37.         String paymentNumber = randomPaymentWays() + “-” + randomNumbers(8);
  38.         String paymentDate = sdf.format(date);
  39.         String merchantName = randomMerchantNames();
  40.         String skuInfo = randomSkus();
  41.         String priceInfo = calculateOrderPrice();
  42.         return “orderNumber: ” + orderNumber + ” | orderDate: ” + orderDate + ” | paymentNumber: ” +
  43.             paymentNumber + ” | paymentDate: ” + paymentDate + ” | merchantName: ” + merchantName +
  44.             ” | sku: ” + skuInfo + ” | price: ” + priceInfo;
  45.     }
  46.     private String randomPaymentWays() {
  47.         paymentWays[] paymentWayGroup = paymentWays.values();
  48.         Random random = new Random();
  49.         return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
  50.     }
  51.     private String randomMerchantNames() {
  52.         merchantNames[] merchantNameGroup = merchantNames.values();
  53.         Random random = new Random();
  54.         return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
  55.     }
  56.     private String randomProductNames() {
  57.         productNames[] productNameGroup = productNames.values();
  58.         Random random = new Random();
  59.         return productNameGroup[random.nextInt(productNameGroup.length)].name();
  60.     }
  61.     private String randomSkus() {
  62.         Random random = new Random();
  63.         int skuCategoryNum = random.nextInt(3);
  64.         String skuInfo =”[“;
  65.         totalPrice = 0;
  66.         for(int i = 1; i <= 3; i++) {
  67.             int skuNum = random.nextInt(3)+1;
  68.             float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
  69.             float totalSkuPrice = skuPrice * skuNum;
  70.             String skuName = randomProductNames();
  71.             String skuCode = randomCharactersAndNumbers(10);
  72.             skuInfo += ” skuName: ” + skuName + ” skuNum: ” + skuNum + ” skuCode: ” + skuCode
  73.                     + ” skuPrice: ” + skuPrice + ” totalSkuPrice: ” + totalSkuPrice + “;”;
  74.             totalPrice += totalSkuPrice;
  75.         }
  76.         skuInfo += ” ]”;
  77.         return skuInfo;
  78.     }
  79.     private String calculateOrderPrice() {
  80.         Random random = new Random();
  81.         discount = discountGroup[random.nextInt(discountGroup.length)];
  82.         paymentPrice = totalPrice – discount;
  83.         String priceInfo = “[ totalPrice: ” + totalPrice + ” discount: ” + discount + ” paymentPrice: ” + paymentPrice +” ]”;
  84.         return priceInfo;
  85.     }
  86.     private String randomCharactersAndNumbers(int length) {
  87.         String characters = “abcdefghijklmnopqrstuvwxyz0123456789”;
  88.         String randomCharacters = “”;
  89.                 Random random = new Random();
  90.                 for (int i = 0; i < length; i++) {
  91.               randomCharacters += characters.charAt(random.nextInt(characters.length()));
  92.                 }
  93.                 return randomCharacters;
  94.     }
  95.     private String randomNumbers(int length) {
  96.         String characters = “0123456789”;
  97.         String randomNumbers = “”;
  98.                 Random random = new Random();
  99.                 for (int i = 0; i < length; i++) {
  100.              randomNumbers += characters.charAt(random.nextInt(characters.length()));
  101.                 }
  102.                return randomNumbers;
  103.     }
  104.     public static void main(String[] args) {
  105.         ordersInfoGenerator generator = new ordersInfoGenerator();
  106.         generator.generate();
  107.     }
  108. }
package com.guludada.ordersInfo;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class ordersInfoGenerator {
	
	public enum paymentWays {
		Wechat,Alipay,Paypal
	}
	public enum merchantNames {
		优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
		暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
	}
	
	public enum productNames {
		黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
		沙滩拖鞋
	}
	
	float[] skuPriceGroup = {299,399,699,899,1000,2000};
	float[] discountGroup = {10,20,50,100};
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
	private int logsNumber = 1000;
	
	public void generate() {
				
		for(int i = 0; i <= logsNumber; i++) {			
			logger.info(randomOrderInfo());			
		}
	}
	
	public String randomOrderInfo() {
		
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");		
		Date date = new Date();		
		
		String orderNumber = randomNumbers(5) + date.getTime();
		
		String orderDate = sdf.format(date);
		
		String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
		
		String paymentDate = sdf.format(date);
		
		String merchantName = randomMerchantNames();
		
		String skuInfo = randomSkus();
		
		String priceInfo = calculateOrderPrice();
		
		return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
			paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName + 
			" | sku: " + skuInfo + " | price: " + priceInfo;
	}
		
	private String randomPaymentWays() {
		
		paymentWays[] paymentWayGroup = paymentWays.values();
		Random random = new Random();
		return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
	}
	
	private String randomMerchantNames() {
		
		merchantNames[] merchantNameGroup = merchantNames.values();
		Random random = new Random();
		return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
	}
	
	private String randomProductNames() {
		
		productNames[] productNameGroup = productNames.values();
		Random random = new Random();
		return productNameGroup[random.nextInt(productNameGroup.length)].name();
	}
	
	
	private String randomSkus() {
		
		Random random = new Random();
		int skuCategoryNum = random.nextInt(3);
		
		String skuInfo ="[";
		
		totalPrice = 0;
		for(int i = 1; i <= 3; i++) {
			
			int skuNum = random.nextInt(3)+1;
			float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
			float totalSkuPrice = skuPrice * skuNum;			
			String skuName = randomProductNames();
			String skuCode = randomCharactersAndNumbers(10);
			skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
					+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";		
			totalPrice += totalSkuPrice;
		}
		
		
		skuInfo += " ]";
		
		return skuInfo;
	}
	
	private String calculateOrderPrice() {
		
		Random random = new Random();
		discount = discountGroup[random.nextInt(discountGroup.length)];
		paymentPrice = totalPrice - discount;
		
		String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
		
		return priceInfo;
	}
	
	private String randomCharactersAndNumbers(int length) {
		
		String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
		String randomCharacters = "";  
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	  randomCharacters += characters.charAt(random.nextInt(characters.length()));  
                }  
                return randomCharacters;  
	}
	
	private String randomNumbers(int length) {
		
		String characters = "0123456789";
		String randomNumbers = "";   
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	 randomNumbers += characters.charAt(random.nextInt(characters.length()));  
                }  
               return randomNumbers;		
	}
	
	public static void main(String[] args) {
		
		ordersInfoGenerator generator = new ordersInfoGenerator();
		generator.generate();
	}
}

收集日志数据
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,FlumeApache的一个顶级项目,与Kafka也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。

Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件(Flume Event)并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分给下一个装在分布式系统中其它服务器上的Flume Agent进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。

在本文中,Flume的Source我们选择的是Exec Source,因为是实时系统,直接通过tail 命令来监听日志文件,而在Kafka的Broker集群端的Flume我们选择Kafka Sink 来把数据下沉到Kafka消息系统中。

下图是来自Flume官网里的Flume拉取数据的架构图:

图片来源:http://flume.apache.org/FlumeUserGuide.html

订单日志产生端的Flume配置文件如下:

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = exec
  5. agent.sources.origin.command = tail -F /export/data/trivial/app.log
  6. agent.sources.origin.channels = memorychannel
  7. agent.sources.origin.interceptors = i1
  8. agent.sources.origin.interceptors.i1.type = static
  9. agent.sources.origin.interceptors.i1.key = topic
  10. agent.sources.origin.interceptors.i1.value = ordersInfo
  11. agent.sinks.loggerSink.type = logger
  12. agent.sinks.loggerSink.channel = memorychannel
  13. agent.channels.memorychannel.type = memory
  14. agent.channels.memorychannel.capacity = 10000
  15. agent.sinks.target.type = avro
  16. agent.sinks.target.channel = memorychannel
  17. agent.sinks.target.hostname = 172.16.124.130
  18. agent.sinks.target.port = 4545
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /export/data/trivial/app.log
agent.sources.origin.channels = memorychannel

agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545

Kafka消息系统端Flume配置文件

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = avro
  5. agent.sources.origin.channels = memorychannel
  6. agent.sources.origin.bind = 0.0.0.0
  7. agent.sources.origin.port = 4545
  8. agent.sinks.loggerSink.type = logger
  9. agent.sinks.loggerSink.channel = memorychannel
  10. agent.channels.memorychannel.type = memory
  11. agent.channels.memorychannel.capacity = 5000000
  12. agent.channels.memorychannel.transactionCapacity = 1000000
  13. agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
  14. #agent.sinks.target.topic = bigdata
  15. agent.sinks.target.brokerList=localhost:9092
  16. agent.sinks.target.requiredAcks=1
  17. agent.sinks.target.batchSize=100
  18. agent.sinks.target.channel = memorychannel
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
#agent.sinks.target.topic = bigdata
agent.sinks.target.brokerList=localhost:9092
agent.sinks.target.requiredAcks=1
agent.sinks.target.batchSize=100
agent.sinks.target.channel = memorychannel

这里需要注意的是,在日志服务器端的Flume agent中我们配置了一个interceptors,这个是用来为Flume Event(Flume Event就是拉取到的一行行的日志信息)的头部添加key为“topic”的K-V键值对,这样这条抓取到的日志信息就会根据topic的值去到Kafka中指定的topic消息池中,当然还可以为Flume Event额外配置一个key为“Key”的键值对,Kafka Sink会根据key“Key”的值将这条日志信息下沉到不同的Kafka分片上,否则就是随机分配。在Kafka集群端的Flume配置里,有几个重要的参数需要注意,“topic”是指定抓取到的日志信息下沉到Kafka哪一个topic池中,如果之前Flume发送端为Flume Event添加了带有topic的头信息,则这里可以不用配置;brokerList就是配置Kafka集群的主机地址和端口;requireAcks=1是配置当下沉到Kafka的消息储存到特定partition的leader中成功后就返回确认消息,requireAcks=0是不需要确认消息成功写入Kafka中,requireAcks=-1是指不光需要确认消息被写入partition的leander中,还要确认完成该条消息的所有备份;batchSize配置每次下沉多少条消息,每次下沉的数量越多延迟也高。

Kafka消息系统
这一部分我们将谈谈Kafka的配置和使用,Kafka在我们的系统中实际上就相当于起到一个数据缓冲池的作用, 有点类似于ActiveQ的消息队列和Redis这样的缓存区的作用,但是更可靠,因为是基于log File的消息系统,数据不容易丢失,以及能记录数据的消费位置并且用户还可以自定义消息消费的起始位置,这就使得重复消费消息也可以得以实现,而且同时具有队列和发布订阅两种消息消费模式,十分灵活,并且与Storm的契合度很高,充分利用Linux系统的I/O提高读写速度等等。另一个要提的方面就是Kafka的Consumer是pull-based模型的,而Flume是push-based模型。push-based模型是尽可能大的消费数据,但是当生产者速度大于消费者时数据会被覆盖。而pull-based模型可以缓解这个压力,消费速度可以慢于生产速度,有空余时再拉取那些没拉取到的数据。

Kafka是一个分布式的高吞吐量的消息系统,同时兼有点对点和发布订阅两种消息消费模式Kafka主要由Producer,Consumer和Broker组成。Kafka中引入了一个叫“topic”的概念,用来管理不同种类的消息,不同类别的消息会记录在到其对应的topic池中,而这些进入到topic中的消息会被Kafka写入磁盘的log文件中进行持久化处理。Kafka会把消息写入磁盘的log file中进行持久化对于每一个topic里的消息log文件,Kafka都会对其进行分片处理,而每一个消息都会顺序写入中log分片中,并且被标上“offset”的标量来代表这条消息在这个分片中的顺序,并且这些写入的消息无论是内容还是顺序都是不可变的。所以Kafka和其它消息队列系统的一个区别就是它能做到分片中的消息是能顺序被消费的,但是要做到全局有序还是有局限性的,除非整个topic只有一个log分片。并且无论消息是否有被消费,这条消息会一直保存在log文件中,当留存时间足够长到配置文件中指定的retention的时间后,这条消息才会被删除以释放空间。对于每一个Kafka的Consumer,它们唯一要存的Kafka相关的元数据就是这个“offset”值,记录着Consumer在分片上消费到了哪一个位置。通常Kafka是使用Zookeeper来为每一个Consumer保存它们的offset信息,所以在启动Kafka之前需要有一个Zookeeper集群;而且Kafka默认采用的是先记录offset再读取数据的策略,这种策略会存在少量数据丢失的可能。不过用户可以灵活设置Consumer的“offset”的位置,在加上消息记录在log文件中,所以是可以重复消费消息的。log的分片和它们的备份分散保存在集群的服务器上,对于每一个partition,在集群上都会有一台这个partition存在服务器作为leader,而这个partitionpartition的其它备份所在的服务器做为follower,leader负责处理关于这个partition所有请求,而follower负责这个partition的其它备份的同步工作,当leader服务器宕机时,其中一个follower服务器就会被选举为新的leader。

一般的消息系统分为两种模式,一种是点对点的消费模式,也就是queuing模式,另一种是发布订阅模式,也就是publish-subscribe模式,而Kafka引入了一个Consumer Group的概念,使得其能兼有两种模式。在Kafka中,每一个consumer都会标明自己属于哪个consumer group,每个topic的消息都会分发给每一个subscribe了这个topic的所有consumer group中的一个consumer实例。所以当所有的consumers都在同一个consumer group中,那么就像queuing的消息系统,一个message一次只被一个consumer消费。如果每一个consumer都有不同consumer group,那么就像public-subscribe消息系统一样,一个消息分发给所有的consumer实例。对于普通的消息队列系统,可能存在多个consumer去同时消费message,虽然message是有序地分发出去的,但是由于网络延迟的时候到达不同的consumer的时间不是顺序的,这时就失去了顺序性,解决方案是只用一个consumer去消费message,但显然不太合适。而对于Kafka来说,一个partiton只分发给每一个consumer group中的一个consumer实例,也就是说这个partition只有一个consumer实例在消费,所以可以保证在一个partition内部数据的处理是有序的,不同之处就在于Kafka内部消息进行了分片处理,虽然看上去也是单consumer的做法,但是分片机制保证了并发消费。如果要做到全局有序,那么整个topic中的消息只有一个分片,并且每一个consumer group中只能有一个consumer实例。这实际上就是彻底牺牲了消息消费时的并发度。

 

Kafka的配置和部署十分简单
1. 首先启动Zookeeper集群,Kafka需要Zookeeper集群来帮助记录每一个Consumer的offset
2. 为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有两个节点的Kafka集群,那么节点1和节点2的最基本的配置如下:

  1. config/server-1.properties:
  2.     broker.id=1
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=export/data/kafka
    zookeeper.connect=localhost:2181
  1. config/server-2.properties:
  2.     broker.id=2
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=/export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9093
    log.dir=/export/data/kafka
    zookeeper.connect=localhost:2181

broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。
3. 配置完上面的配置文件后,只要分别在节点上
输入下面命令启动Kafka进程就可以使用了

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...


Storm实时计算框架

接下来开始介绍本篇文章要使用的实时计算框架StormStrom是一个非常快的实时计算框架,至于快到什么程度呢?官网首页给出的数据是每一个Storm集群上的节点每一秒能处理一百万条数据。相比Hadoop“Mapreduce”计算框架,Storm使用的是"Topology"Mapreduce程序在计算完成后最终会停下来,而Topology则是会永远运行下去除非你显式地使用“kill -9 XXX”命令停掉它。和大多数的集群系统一样,Storm集群也存在着Master节点和Worker节点,在Master节点上运行的一个守护进程叫“Nimbus”,类似于Hadoop“JobTracker”的功能,负责集群中计算程序的分发,任务的分发,监控任务和工作节点的运行情况等;Worker节点上运行的守护进程叫“Supervisor”,负责接收Nimbus分发的任务并运行,每一个Worker上都会运行着Topology程序的一部分,而一个Topology程序的运行就是由集群上多个Worker一起协同工作的。值得注意的是NimubsSupervisor之间的协调工作也是通过Zookeeper来管理的,NimbusSupervisor自己本身在集群上都是无状态的,它们的状态都保存在Zookeeper上,所以任何节点的宕机和动态扩容都不会影响整个集群的工作运行,并支持fast-fail机制。

Storm有一个很重要的对数据的抽象概念,叫做“Stream”,我们姑且称之为数据流,数据流Stream就是由之间没有任何关系的松散的一个一个的数据元组“tuples”所组成的序列。要在Storm上做实时计算,首先你得有一个计算程序,这就是“Topology”,一个Topology程序由“Spout”“Bolt”共同组成。Storm就是通过Topology程序将数据流Stream通过可靠(ACK机制)的分布式计算生成我们的目标数据流Stream,就比如说把婚恋网站上当日注册的所有用户信息数据流Stream通过Topology程序计算出月收入上万年龄在30岁以下的新的用户信息流Stream。在我们的文章中,Spout就是实现了特定接口的Java类,它相当于数据源,用于产生数据或者从外部接收数据;而Bolt就是实现了Storm Bolt接口的Java类,用于消费从Spout发送出来的数据流并实现用户自定义的数据处理逻辑;对于复杂的数据处理,可以定义多个连续的Bolt去协同处理。最后在程序中通过SpoutBolt生成Topology对象并提交到Storm集群上执行。

tuplesStorm的数据模型,,由值和其所对应的field所组成,比如说在SpoutBolt中定义了发出的元组的field为:(name,age,gender),那么从这个SpoutBolt中发出的数据流的每一个元组值就类似于(''咕噜大大",27,"中性")Storm中还有一个Stream Group的概念,它用来决定从Spout或或或Bolt组件中发出的tuples接下来应该传到哪一个组件中或者更准确地说在程序里设置某个组件应该接收来自哪一个组件的tuples; 并且在Storm中提供了多个用于数据流分组的机制,比如说shuffleGrouping,用来将当前组件产生的tuples随机分发到下一个组件中,或者 fieldsGrouping,根据tuplesfield值来决定当前组件产生的tuples应该分发到哪一个组件中。
 
另一部分需要了解的就是Stormtasksworkers的概念。每一个worker都是一个运行在物理机器上的JVM进程,每个worker中又运行着多个task线程,这些task线程可能是Spout任务也可能是Bolt任务,由Nimbus根据RoundRobin负载均衡策略来分配,而至于在整个Topology程序里要起几个Spout线程或Bolt线程,也就是tasks,由用户在程序中设置并发度来决定。

Storm集群的配置文件如下:
Storm的配置文件在项目的conf目录下,也就是:conf/storm.yaml

  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements.  See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership.  The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License.  You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ########### These MUST be filled in for a storm configuration
  17. storm.zookeeper.servers:
  18.   - "ymhHadoop"
  19.   - "ymhHadoop2"
  20.   - "ymhHadoop3"
  21. storm.local.dir: "/export/data/storm/workdir"
  22. nimbus.host: "ymhHadoop"
  23. supervisor.slots.ports:
  24.   -6700
  25.   -6701
  26.   -6702
  27.   -6703
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
  - "ymhHadoop"
  - "ymhHadoop2"
  - "ymhHadoop3"    

storm.local.dir: "/export/data/storm/workdir"
 
nimbus.host: "ymhHadoop"

supervisor.slots.ports:
  -6700
  -6701
  -6702
  -6703 
 
storm.zookeeper.servers自然就是用来配置我们熟悉的Zookeeper集群中各个节点的URI地址和端口的
storm.local.dir 是用来配置storm节点相关文件的存储目录的,每一个storm集群的节点在本地服务器上都要有一个目录存储少量的和该节点有关的一些信息。记得要开发这个目录的读写权限哦
nimbus.host 自然就是用来指定nimbus服务器的URI
supervisor.slots.ports 这个是用来配置supervisor服务器启动的worker所监听的端口,每一个worker就是一个物理的JVM进程。上面这些是基本配置,并且要严格按照上面的格式来,少一个空格都会报错。

接下来就是将配置文件拷贝到集群的各个机器上,然后在分别在nimbussupervisor机器上通过$bin/storm nimbus $bin/storm supervisor命令来启动集群上的机子。最后在nimbus上通过$bin/storm UI 命令可以启动Storm提供的UI界面,功能十分强大,可以监控集群上各个节点的运行状态,提交Topology任务,监控Topology任务的运行情况等。这个UI界面可以通过http://{nimbus host}:8080的地址访问到。



Redis数据库

Redis是一个基于内存的多种数据结构的存储工具,经常有人说Redis是一个基于key-value数据结构的缓存数据库,这种说法必然是不准确的,Key-Value只是其中的一种数据结构的实现,Redis支持Stringshasheslistssetssorted sets等多种常见的数据结构,并提供了功能强大的范围查询,以及提供了INCRINCRBY,DECR,DECRBY等多种原子命令操作,保证在并发的环境下不会出现脏数据。虽然Redis是基于内存的数据库,但也提供了多种硬盘持久化策略,比如说RDB策略,用来将某个时间点的Redis的数据快照存储在硬盘中,或者是AOF策略,将每一个Redis操作命令都不可变的顺序记录在log文件中,恢复数据时就将log文件中的所有命令顺序执行一遍等等。Redis不光可以作为网站热点数据的缓存服务器,还可以用来做数据库,或者消息队列服务器的broker等。在本文中选择Redis作为订单分析结果的存储工具,一方面是其灵活的数据结构和强大的数据操作命令,另一方面就是在大数据的实时计算环境下,需要Redis这样的具备高速I/O的数据库。
 
在本文的例子中,作者使用Sorted Sets数据结构来存储各个商家的总订单销售额,Sorted Sets数据结构由Key, Scoreelement value 三部分组成,Set的数据结构保证同一个key中的元素值不会重复,而在Sorted Sets结构中是通过 Score来为元素值排序,这很自然地就能将各个商家的总订单销售额设置为Score,然后商家名称为element value,这样就能根据总订单销售额来为商家排序。在Storm程序中,我们通过Jedis API来调用Redis
$ZINCRBY KEY INCREMENT MEMBER
的命令来统计商家总销售额, ZINCRBY是一个原子命令,能保证在Storm的并发计算的环境下,正确地增加某个商家的Score的值,也就是它们的订单总销售额。而对于两个商家同名这种情况应该在业务系统中去避免而不应该由我们的数据分析层来处理。最后提一个小trips,就是如果所有商家的Score都设置成相同的分数,那么Redis就会默认使用商家名的字母字典序来排序。

Kafka+Storm+Redis的整合
当数据被Flume拉取进Kafka消息系统中,我们就可以使用Storm来进行消费,Redis来对结果进行存储。StormKafka有很好的兼容性,我们可以通过Kafka Spout来从Kafka中获取数据;在Bolt处理完数据后,通过Jedis API在程序中将数据存储在Redis数据库中。

下面就是Kafka Spout和创建Topology的程序代码:

BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
zkHosts是用来指定Zookeeper集群的节点的URI和端口,而Zookeeper集群是用来记录SpoutKafka消息消费的offset位置

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
主要是用来将SpoutKafka拉取来的byte[]数组格式的数据转化为Stormtuples

  1. package com.guludada.ordersanalysis;
  2. import java.util.UUID;
  3. import backtype.storm.Config;
  4. import backtype.storm.LocalCluster;
  5. import backtype.storm.StormSubmitter;
  6. import backtype.storm.generated.AlreadyAliveException;
  7. import backtype.storm.generated.InvalidTopologyException;
  8. import backtype.storm.spout.SchemeAsMultiScheme;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. import storm.kafka.Broker;
  12. import storm.kafka.BrokerHosts;
  13. import storm.kafka.KafkaSpout;
  14. import storm.kafka.SpoutConfig;
  15. import storm.kafka.StaticHosts;
  16. import storm.kafka.StringScheme;
  17. import storm.kafka.ZkHosts;
  18. import storm.kafka.trident.GlobalPartitionInformation;
  19. public class ordersAnalysisTopology {
  20.     private static String topicName = "ordersInfo";
  21.     private static String zkRoot = "/stormKafka/"+topicName;
  22.     public static void main(String[] args) {
  23.         BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
  24.         SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
  25.         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  26.         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  27.         TopologyBuilder builder = new TopologyBuilder();
  28.         builder.setSpout("kafkaSpout",kafkaSpout);
  29.         builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");
  30.         Config conf = new Config();
  31.         conf.setDebug(true);
  32.         if(args != null && args.length > 0) {
  33.             conf.setNumWorkers(1);
  34.             try {
  35.                 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  36.             } catch (AlreadyAliveException e) {
  37.                 // TODO Auto-generated catch block
  38.                 e.printStackTrace();
  39.             } catch (InvalidTopologyException e) {
  40.                 // TODO Auto-generated catch block
  41.                 e.printStackTrace();
  42.             }
  43.         } else {
  44.             conf.setMaxSpoutPending(3);
  45.             LocalCluster cluster = new LocalCluster();
  46.             cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
  47.         }
  48.     }
  49. }
package com.guludada.ordersanalysis;

import java.util.UUID;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.Broker;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.GlobalPartitionInformation;

public class ordersAnalysisTopology {
	
	private static String topicName = "ordersInfo";
	private static String zkRoot = "/stormKafka/"+topicName;
	
	public static void main(String[] args) {
		
		BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");

		
		SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();        
		builder.setSpout("kafkaSpout",kafkaSpout);        
		builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");

		Config conf = new Config();
		conf.setDebug(true);
		
		if(args != null && args.length > 0) {
			conf.setNumWorkers(1);
			try {
				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} else {
			
			conf.setMaxSpoutPending(3);
			
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
			
			
		}

	}
}

下面是Bolt程序,主要是用来处理从Kafka拉取到的订单日志信息, 并计算出所有商家的总订单收入,然后使用Jedis API将计算结果存入到Redis数据库中。

 

  1. package com.guludada.domain;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. public class ordersBean {
  5.     Date createTime = null;
  6.     String number = "";
  7.     String paymentNumber = "";
  8.     Date paymentDate = null;
  9.     String merchantName = "";
  10.     ArrayList skuGroup = null;
  11.     float totalPrice = 0;
  12.     float discount = 0;
  13.     float paymentPrice = 0;
  14.     public Date getCreateTime() {
  15.         return createTime;
  16.     }
  17.     public void setCreateTime(Date createTime) {
  18.         this.createTime = createTime;
  19.     }
  20.     public String getNumber() {
  21.         return number;
  22.     }
  23.     public void setNumber(String number) {
  24.         this.number = number;
  25.     }
  26.     public String getPaymentNumber() {
  27.         return paymentNumber;
  28.     }
  29.     public void setPaymentNumber(String paymentNumber) {
  30.         this.paymentNumber = paymentNumber;
  31.     }
  32.     public Date getPaymentDate() {
  33.         return paymentDate;
  34.     }
  35.     public void setPaymentDate(Date paymentDate) {
  36.         this.paymentDate = paymentDate;
  37.     }
  38.     public String getMerchantName() {
  39.         return merchantName;
  40.     }
  41.     public void setMerchantName(String merchantName) {
  42.         this.merchantName = merchantName;
  43.     }
  44.     public ArrayList getSkuGroup() {
  45.         return skuGroup;
  46.     }
  47.     public void setSkuGroup(ArrayList skuGroup) {
  48.         this.skuGroup = skuGroup;
  49.     }
  50.     public float getTotalPrice() {
  51.         return totalPrice;
  52.     }
  53.     public void setTotalPrice(float totalPrice) {
  54.         this.totalPrice = totalPrice;
  55.     }
  56.     public float getDiscount() {
  57.         return discount;
  58.     }
  59.     public void setDiscount(float discount) {
  60.         this.discount = discount;
  61.     }
  62.     public float getPaymentPrice() {
  63.         return paymentPrice;
  64.     }
  65.     public void setPaymentPrice(float paymentPrice) {
  66.         this.paymentPrice = paymentPrice;
  67.     }
  68. }
package com.guludada.domain;

import java.util.ArrayList;
import java.util.Date;

public class ordersBean {

	Date createTime = null;
	String number = "";
	String paymentNumber = "";
	Date paymentDate = null;
	String merchantName = "";
	ArrayList skuGroup = null;
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	public Date getCreateTime() {
		return createTime;
	}
	public void setCreateTime(Date createTime) {
		this.createTime = createTime;
	}
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}
	public String getPaymentNumber() {
		return paymentNumber;
	}
	public void setPaymentNumber(String paymentNumber) {
		this.paymentNumber = paymentNumber;
	}
	public Date getPaymentDate() {
		return paymentDate;
	}
	public void setPaymentDate(Date paymentDate) {
		this.paymentDate = paymentDate;
	}
	public String getMerchantName() {
		return merchantName;
	}
	public void setMerchantName(String merchantName) {
		this.merchantName = merchantName;
	}
	public ArrayList getSkuGroup() {
		return skuGroup;
	}
	public void setSkuGroup(ArrayList skuGroup) {
		this.skuGroup = skuGroup;
	}
	public float getTotalPrice() {
		return totalPrice;
	}
	public void setTotalPrice(float totalPrice) {
		this.totalPrice = totalPrice;
	}
	public float getDiscount() {
		return discount;
	}
	public void setDiscount(float discount) {
		this.discount = discount;
	}
	public float getPaymentPrice() {
		return paymentPrice;
	}
	public void setPaymentPrice(float paymentPrice) {
		this.paymentPrice = paymentPrice;
	}
	
	
}

本文例子中用不到skusbean,所以这里作者就没有写委屈偷懒一下下

  1. package com.guludada.domain;
  2. public class skusBean {
  3.       ………………
  4. }
package com.guludada.domain;

public class skusBean {
      ………………
}

logInfoHandler用来过滤订单的日志信息,并保存到ordersBean和skusBean中,方便Bolt获取日志数据的各项属性进行处理

  1. package com.guludada.common;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. import com.guludada.domain.ordersBean;
  7. public class logInfoHandler {
  8.     SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  9.     public ordersBean getOrdersBean(String orderInfo) {
  10.         ordersBean order = new ordersBean();
  11.         //从日志信息中过滤出订单信息
  12.         Pattern orderPattern = Pattern.compile("orderNumber:.+");
  13.         Matcher orderMatcher = orderPattern.matcher(orderInfo);
  14.         if(orderMatcher.find()) {
  15.             String orderInfoStr = orderMatcher.group(0);
  16.             String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
  17.             //获取订单号
  18.             String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
  19.             order.setNumber(orderNum);
  20.             //获取创建时间
  21.             String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
  22.             try {
  23.                 order.setCreateTime(sdf_final.parse(orderCreateTime));
  24.             } catch (ParseException e) {
  25.                 // TODO Auto-generated catch block
  26.                 e.printStackTrace();
  27.             }
  28.             //获取商家名称
  29.             String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
  30.             order.setMerchantName(merchantName);
  31.             //获取订单总额
  32.             String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
  33.             String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
  34.             order.setTotalPrice(Float.parseFloat(totalPrice));
  35.             return order;
  36.         } else {
  37.             return order;
  38.         }
  39.     }
  40. }
package com.guludada.common;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.domain.ordersBean;

public class logInfoHandler {
	
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public ordersBean getOrdersBean(String orderInfo) {
		
		ordersBean order = new ordersBean();
		
		//从日志信息中过滤出订单信息
		Pattern orderPattern = Pattern.compile("orderNumber:.+");
		Matcher orderMatcher = orderPattern.matcher(orderInfo);
		if(orderMatcher.find()) {
			
			String orderInfoStr = orderMatcher.group(0);
			String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
			
			//获取订单号
			String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
			order.setNumber(orderNum);
						
			//获取创建时间
			String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
			try {
				order.setCreateTime(sdf_final.parse(orderCreateTime));
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
			//获取商家名称
			String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
			order.setMerchantName(merchantName);
			
			//获取订单总额
			String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
			String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
			order.setTotalPrice(Float.parseFloat(totalPrice));
						
			return order;
						
		} else {
			return order;
		}
	}
}

 

  1. package com.guludada.ordersanalysis;
  2. import java.util.Map;
  3. import com.guludada.common.logInfoHandler;
  4. import com.guludada.domain.ordersBean;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichBolt;
  9. import backtype.storm.tuple.Tuple;
  10. import redis.clients.jedis.Jedis;
  11. import redis.clients.jedis.JedisPool;
  12. import redis.clients.jedis.JedisPoolConfig;
  13. public class merchantsSalesAnalysisBolt extends BaseRichBolt {
  14.     private OutputCollector _collector;
  15.     logInfoHandler loginfohandler;
  16.     JedisPool pool;
  17.     public void execute(Tuple tuple) {
  18.         String orderInfo = tuple.getString(0);
  19.         ordersBean order = loginfohandler.getOrdersBean(orderInfo);
  20.         //store the salesByMerchant infomation into Redis
  21.         Jedis jedis = pool.getResource();
  22.         jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
  23.     }
  24.     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
  25.         this._collector = collector;
  26.         this.loginfohandler = new logInfoHandler();
  27.         this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
  28.     }
  29.     public void declareOutputFields(OutputFieldsDeclarer arg0) {
  30.         // TODO Auto-generated method stub
  31.     }
  32. }
package com.guludada.ordersanalysis;

import java.util.Map;

import com.guludada.common.logInfoHandler;
import com.guludada.domain.ordersBean;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class merchantsSalesAnalysisBolt extends BaseRichBolt {
	
	private OutputCollector _collector;
	logInfoHandler loginfohandler;
	JedisPool pool;

	public void execute(Tuple tuple) {
		String orderInfo = tuple.getString(0);
		ordersBean order = loginfohandler.getOrdersBean(orderInfo);
		
		//store the salesByMerchant infomation into Redis
		Jedis jedis = pool.getResource();
		jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this._collector = collector;
		this.loginfohandler = new logInfoHandler();
		this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
		
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		
	}

}

Topology项目的Maven配置文件

  1.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2.   4.0.0
  3.   com.guludada
  4.   Storm_OrdersAnalysis
  5.   war
  6.   0.0.1-SNAPSHOT
  7.   Storm_OrdersAnalysis Maven Webapp
  8.   http://maven.apache.org
  9.   
  10.     
  11.         org.apache.storm
  12.         storm-core
  13.         0.9.6
  14.         provided
  15.     
  16.     
  17.         org.apache.storm
  18.         storm-kafka
  19.         0.9.6
  20.     
  21.     
  22.         org.apache.kafka
  23.         kafka_2.10
  24.         0.9.0.1
  25.             
  26.                 
  27.                     org.apache.zookeeper
  28.                     zookeeper
  29.                 
  30.                 
  31.                     log4j
  32.                     log4j
  33.                 
  34.                 
  35.                     org.slf4j
  36.                     slf4j-log4j12
  37.                 
  38.             
  39.     
  40.     
  41.         redis.clients
  42.         jedis
  43.         2.8.1
  44.     
  45.   
  46.   
  47.     Storm_OrdersAnalysis
  48.     
  49.         
  50.             maven-assembly-plugin
  51.             
  52.                 
  53.                     jar-with-dependencies
  54.                 
  55.                 
  56.                    
  57.                      com.guludada.ordersanalysis.ordersAnalysisTopology
  58.                    
  59.                  
  60.              
  61.           
  62.       
  63.   

  4.0.0
  com.guludada
  Storm_OrdersAnalysis
  war
  0.0.1-SNAPSHOT
  Storm_OrdersAnalysis Maven Webapp
  http://maven.apache.org
  
    
		org.apache.storm
		storm-core
		0.9.6
		provided
	
	
        org.apache.storm
        storm-kafka
        0.9.6
    
    
    	org.apache.kafka
        kafka_2.10
        0.9.0.1
            
                
                    org.apache.zookeeper
                    zookeeper
                
                
                    log4j
                    log4j
                
                
                    org.slf4j
    				slf4j-log4j12
                
            
    
    
	    redis.clients
	    jedis
	    2.8.1
		
  
  
    Storm_OrdersAnalysis
    
		
			maven-assembly-plugin
			
				  
			    	jar-with-dependencies
			    
			    
			       
			         com.guludada.ordersanalysis.ordersAnalysisTopology
			       
			     
			 
		  
	  
  

maven配置文件中配置了一个官方推荐的maven-assembly-plugin插件,用来帮助用户方便地打包Topology程序的。只需要进入到项目的根路径,然后运行
$mvn assembly:assembly
命令就可以打包好Topologyjar包了。

最后我带大家梳理一下整个项目的部署流程
1.  启动Zookeeper
2. 启动Kafka
3. 启动Flume将程序拉取到Kafka
4. 启动Storm集群
5. 启动Redis服务端  通过命令
$ src/redis-server
6. 提交打包好的Topology程序到Storm集群中通过Storm UI 或者命令$storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
7. 启动RedisCLI客户端查看结果通过命令
$ src/redis-cli --raw
$  zrange key 0 -1 withscores

如下图:

 

Troubleshooting
  1. 在使用maven同时导入storm-core, storm-kaka和kafka的依赖包的时候可能会出现jar包冲突导致无法初始化Log4jLoggerFactory,并无法启动Storm程序.解决方法也很简单,按照红字提示,把多余的jar包移除就行了,通过在maven的pom文件中kafka的依赖设置部分加入下面的设置org.slf4jslf4j-log4j12
  2. 第一次执行Storm建立Topology时,作者遇到了一个十分低级的问题,就是发现明明Kafka的topic里有数据,可是Storm程序怎么都无法读取到数据,后来才从下面的文章中明白了问题的所在 http://m.blog.csdn.net/article/details?id=18615761  原因就在于Topology第一次启动前还没有在zookeeper中的zkRoot创建offset信息,Storm取不到offset信息就会使用默认的offset,也就是log文件中从最后一个元素开始读取信息,所以之前在kafka中的数据都无法读出来。Storm启动后,再往broker中写数据,这些后写的数据就能正确被Storm处理。
  3. 当Storm的topology传到Nimbus的时候,或者说你的Storm程序刚开始启动的时候可能会报关于JedisPool是一个无法序列化的对象而导致的错误:java.lang.RuntimeException:java.io.NotSerializableException: redis.clients.jedis.JedisPool 解决方案就是将Bolt类中外部的JedisPool初始化代码放入Bolt的prepare()方法中,如本文的代码示例所示
  4. 在Storm启动并开始连接Redis的时候,会报出连接被拒绝,因为Redis运行在protect mode模式下的错误。这是因为Storm程序是远程连接Redis的服务器端,如果Redis服务器端没有设置密码的话是拒绝远程连接的。解决方法也十分简单,关闭protect mode模式(强烈不推荐),或者使用下面命令为Redis设置密码就可以了$config set requirepass 123
  5. 向Storm提交Topology以后, Supervisor端会一直报“Kill XXXX No Such process”的错误,多数原因是提交的topology没有正确被执行,而Storm的日记中不会显示topology程序里的错误。解决方法就是启动Storm UI, 通过这个Storm自带的UI界面查看topology的运行情况,并且程序中的错误也会在UI界面中显示出来,能方便地查看topology程序的错误。

    6.kafka使用的时候的小问题:
        当在一台机子上启动kafka producer客户端的时候,是无法在同一台机子上继续启动kafka的consumer客户端的,因为这两个进程可能占用的同一个端口,需要在另外一台机子上启动kafka consumer程序,这样就能看见正确的结果了

最后,感谢所有耐心看完这篇文章的人,楼主也深感自己的技术水平和语言表达还有很多需要提高的地方,希望能和大家一起交流学习共同进步,欢迎大家留下宝贵的意见和评论!还有再最后吐槽一下,CSDN的文章编辑器在我的MAC系统的火狐浏览器下十分十分十分十分难用,字体格式等根本不受控制,各种莫名其妙的BUG…………

from:http://blog.csdn.net/ymh198816/article/details/51998085

Flume+Spark+Hive+Spark SQL离线分析系统

前段时间把Scala和Spark一起学习了,所以借此机会在这里做个总结,顺便和大家一起分享一下目前最火的分布式计算技术Spark!当然Spark不光是可以做离线计算,还提供了许多功能强大的组件,比如说,Spark Streaming 组件做实时计算,和Kafka等消息系统也有很好的兼容性;Spark Sql,可以让用户通过标准SQL语句操作从不同的数据源中过来的结构化数据;还提供了种类丰富的MLlib库方便用户做机器学习等等。Spark是由Scala语言编写而成的,Scala是运行在JVM上的面向函数的编程语言,它的学习过程简直反人类,可读性就我个人来看,也不是能广为让大众接受的语言,但是它功能强大,熟练后能极大提高开发速度,对于实现同样的功能,所需要写的代码量比Java少得多得多,这都得益于Scala的语言特性。本文借鉴作者之前写的另一篇关于Hadoop离线计算的文章,继续使用那篇文章中点击流分析的案例,只不过MapReduce部分改为由Spark离线计算来完成,同时,你会发现做一模一样的日志清洗任务,相比上一篇文章,代码总数少了非常非常多,这都是Scala语言的功劳。本篇文章在Flume部分的内容和之前的Hadoop离线分析文章的内容基本一致,Hive部分新加了对Hive数据仓库的简单说明,同时还补充了对HDFS的说明和配置,并且新加了大量对Spark框架的详细介绍,文章的最后一如既往地添加了Troubleshooting段落,和大家分享作者在部署时遇到的各种问题,读者们可以有选择性的阅读。

PS:本文Spark说明部分的最后一段非常重要,作者总结了Spark在集群环境下不得忽略的一些特性,所有使用Spark的用户都应该要重点理解。或者读者们可以直接阅读官方文档加深理解:http://spark.apache.org/docs/latest/programming-guide.html

Spark离线分析系统架构图

这里写图片描述
整个离线分析的总体架构就是使用Flume从FTP服务器上采集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Spark的RDDs操作函数清洗日志文件,最后使用Spark SQL配合HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,当然大家也可以尝试一些自动化的任务调度工具,比如说AZKABAN或者OOZIE等。
分析所使用的点击流日志文件主要来自Nginx的access.log日志文件,需要注意的是在这里并不是用Flume直接去生产环境上拉取nginx的日志文件,而是多设置了一层FTP服务器来缓冲所有的日志文件,然后再用Flume监听FTP服务器上指定的目录并拉取目录里的日志文件到HDFS服务器上(具体原因下面分析)。从生产环境推送日志文件到FTP服务器的操作可以通过Shell脚本配合Crontab定时器来实现。

网站点击流数据


图片来源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

一般在WEB系统中,用户对站点的页面的访问浏览,点击行为等一系列的数据都会记录在日志中,每一条日志记录就代表着上图中的一个数据点;而点击流数据关注的就是所有这些点连起来后的一个完整的网站浏览行为记录,可以认为是一个用户对网站的浏览session。比如说用户从哪一个外站进入到当前的网站,用户接下来浏览了当前网站的哪些页面,点击了哪些图片链接按钮等一系列的行为记录,这一个整体的信息就称为是该用户的点击流记录。这篇文章中设计的离线分析系统就是收集WEB系统中产生的这些数据日志,并清洗日志内容存储分布式的HDFS文件存储系统上,接着使用离线分析工具HIVE去统计所有用户的点击流信息。
本系统中我们采用Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式如下:

样例数据格式:
124.42.13.230 – – [18/Sep/2013:06:57:50 +0000] “GET /shoppingMall?ver=1.2.1 HTTP/1.1” 200 7200 “http://www.baidu.com.cn” “Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)”

格式分析:
1. 访客ip地址:124.42.13.230
2. 访客用户信息: – –
3. 请求时间:[18/Sep/2013:06:57:50 +0000]
4. 请求方式:GET
5. 请求的url:/shoppingMall?ver=1.10.2
6. 请求所用协议:HTTP/1.1
7. 响应码:200
8. 返回的数据流量:7200
9. 访客的来源url:http://www.baidu.com.cn
10. 访客所用浏览器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

HDFS

Apache Hadoop是用来支持海量数据分布式计算的软件框架,它具备高可靠性,高稳定性,动态扩容,运用简单的计算模型(MapReduce)在集群上进行分布式计算,并支持海量数据的存储。Apache Hadoop主要包含4个重要的模块,一个是 Hadoop Common,支持其它模块运行的通用组件;Hadoop Distributed File System(HDFS), 分布式文件存储系统;Hadoop Yarn,负责计算任务的调度和集群上资源的管理;Hadoop MapReduce,基于Hadoop Yarn的分布式计算框架。在本文的案例中,我们主要用到HDFS作为点击流数据存储,分布式计算框架我们将采用Spark RDDs Operations去替代MapReduce。

要配置Hadoop集群,首先需要配置Hadoop daemons, 它是所有其它Hadoop组件运行所必须的守护进程, 它的配置文件是

etc/hadoop/hadoop-env.sh

# set to the root of your Java installation
export JAVA_HOME=/usr/java/latest

Hadoop的运行需要Java开发环境的支持,一定要显示地标明集群上所有机器的JDK安装目录,即使你自己本机的环境已经配置好了JAVA_HOME,因为Hadoop是通过SSH来启动守护进程的,即便是NameNode启动自己本机的守护进程;如果不显示配置JDK安装目录,那么Hadoop在通过SSH启动守护进程时会找不到Java环境而报错。

在本文的案例中,我们只使用Hadoop HDFS组件,所以我们只需要配置HDFS的守护进程,NameNode daemons,SecondaryNameNode daemons以及DataNode daemons,它们的配置文件主要是core-site.xml和hdfs-site.xml:

etc/hadoop/core-site.xml



<configuration>
   <property>
      <name>fs.defaultFSname>
      <value>hdfs://ymhHadoop:9000value>
   property>
   <property>
       <name>hadoop.tmp.dirname>
       <value>/root/apps/hadoop/tmpvalue>
   property>
configuration>

fs.defaultFS属性是指定用来做NameNode的主机URI;而hadoop.tmp.dir是配置Hadoop依赖的一些系统运行时产生的文件的目录,默认是在/tmp/${username}目录下的,但是系统一重启这个目录下的文件就会被清空,所以我们重新指定它的目录

etc/hadoop/hdfs-site.xml




<configuration>
   <property>
      <name>dfs.replicationname>
      <value>1value>
   property>
    <property>
      <name>dfs.namenode.name.dirname>
      <value>/your/pathvalue>
   property>
   <property>
      <name>dfs.blocksizename>
      <value>268435456value>
   property>
   <property>
      <name>dfs.datanode.data.dirname>
      <value>/your/pathvalue>
   property>

configuration>

dfs.replication 是配置每一份在HDFS系统上的文件有几个备份;dfs.namenode.name.dir 是配置用户自定义的目录存储HDFS的业务日志和命名空间日志,也就是操作日志,集群发生故障时可以通过这份文件来恢复数据。dfs.blocksize,定义HDFS最大的文件分片是多大,默认256M,我们不需要改动;dfs.datanode.data.dir, 用来配置DataNode中的数据Blocks应该存储在哪个文件目录下。

最后把配置文件拷贝到集群的所有机子上,接下来就是启动HDFS集群,如果是第一次启动,记得一定要格式化整个HDFS文件系统

$HADOOP_PREFIX/bin/hdfs namenode -format 

接下来就是通过下面的命令分别启动NameNode和DataNode

$HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
$HADOOP_PREFIX/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script hdfs start datanode

收集用户数据

网站会通过前端JS代码或服务器端的后台代码收集用户浏览数据并存储在网站服务器中。一般运维人员会在离线分析系统和真实生产环境之间部署FTP服务器,并将生产环境上的用户数据每天定时发送到FTP服务器上,离线分析系统就会从FTP服务上采集数据而不会影响到生产环境。
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,Flume是Apache的一个顶级项目,与Hadoop也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。
Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server 产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分发给下一个装在分布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。
本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。
需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。

FTP服务器上的Flume配置文件如下:

    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = spooldir  
    agent.sources.origin.spoolDir = /export/data/trivial/weblogs  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.deserializer.maxLineLength = 2048  

    agent.sources.origin.interceptors = i2  
    agent.sources.origin.interceptors.i2.type = host  
    agent.sources.origin.interceptors.i2.hostHeader = hostname  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 10000  

    agent.sinks.target.type = avro  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hostname = 172.16.124.130  
    agent.sinks.target.port = 4545  

这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

在Hadoop服务器上的配置文件如下:

    agent.sources = origin  
    agent.channels = memorychannel  
    agent.sinks = target  

    agent.sources.origin.type = avro  
    agent.sources.origin.channels = memorychannel  
    agent.sources.origin.bind = 0.0.0.0  
    agent.sources.origin.port = 4545  

    agent.sinks.loggerSink.type = logger  
    agent.sinks.loggerSink.channel = memorychannel  

    agent.channels.memorychannel.type = memory  
    agent.channels.memorychannel.capacity = 5000000  
    agent.channels.memorychannel.transactionCapacity = 1000000  

    agent.sinks.target.type = hdfs  
    agent.sinks.target.channel = memorychannel  
    agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S  
    agent.sinks.target.hdfs.filePrefix = data-%{hostname}  
    agent.sinks.target.hdfs.rollInterval = 60  
    agent.sinks.target.hdfs.rollSize = 1073741824  
    agent.sinks.target.hdfs.rollCount = 1000000  
    agent.sinks.target.hdfs.round = true  
    agent.sinks.target.hdfs.roundValue = 10  
    agent.sinks.target.hdfs.roundUnit = minute  
    agent.sinks.target.hdfs.useLocalTimeStamp = true  
    agent.sinks.target.hdfs.minBlockReplicas=1  
    agent.sinks.target.hdfs.writeFormat=Text  
    agent.sinks.target.hdfs.fileType=DataStream  

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。用户分别在日志文件服务器及HDFS服务器端启动如下命令,便可以一直监听是否有新日志产生,然后拉取到HDFS文件系统中:

$ nohup bin/flume-ng agent -n $your_agent_name -c conf -f conf/$your_conf_name &

Spark

Spark是最近特别火的一个分布式计算框架,最主要原因就是快!和男人不一样,在大数据领域,一个框架会不会火,快是除了可靠性之外一个最重要的话语权,几乎所有新出的分布式框架或即将推出的新版本的MapReduce都在强调一点,我很快。Spark官网上给出的数据是Spark程序和中间数据运行在内存上时计算速度是Hadoop的100倍,即使在磁盘上也是比Hadoop快10倍。
每一个Spark程序都是提供了一个Driver进程来负责运行用户提供的程序,这个Driver进程会生成一个SparkContext,负责和Cluster Manager(可以是Spark自己提供的集群管理工具,也可以是Hadoop 的资源调度工具 Yarn)沟通,Cluster负责协调和调度集群上的Worker Node资源,当Driver获取到集群上Worker Node资源后,就会向Worker Node的Executor发送计算程序(通过Jar或者python文件),接着再向Exectutor发送计算任务去执行,Executor会启动多个线程并行运行计算任务,同时还会根据需求在Worker Node上缓存计算过程中的中间数据。需要注意的虽然Worker Node上可以启动多个物理JVM来运行不同Spark程序的Executor,但是不同的Spark程序之间不能进行通讯和数据交换。另一方面,对于Cluster Manager来说,不需要知道Spark Driver的底层,只要Spark Driver和Cluster Manager能互相通信并获取计算资源就可以协同工作,所以Spark Driver能较为方便地和各种资源调度框架整合,比如Yarn,Mesos等。
这里写图片描述
图片来源:http://spark.apache.org/docs/latest/cluster-overview.html

Spark就是通过Driver来发送用户的计算程序到集群的工作节点中,然后去并行计算数据,这其中有一个很重要的Spark专有的数据模型叫做RDD(Resilient
distributed dataset), 它代表着每一个计算阶段的数据集合,这些数据集合可以继续它所在的工作节点上,或者通过“shuffle”动作在集群中重新分发后,进行下一步的并行计算,形成新的RDD数据集。这些RDD有一个最重要的特点就是可以并行计算。RDD最开始有两种方式进行创建,一种是从Driver程序中的Scala Collections创建而来(或者其它语言的Collections),将它们转化成RDD然后在工作结点中并发处理,另一种就是从外部的分布式数据文件系统中创建RDD,如HDFS,HBASE或者任何实现了Hadoop InputFormat接口的对象。

对于Driver程序中的Collections数据,可以使用parallelize()方法将数据根据集群节点数进行切片(partitions),然后发送到集群中并发处理,一般一个节点一个切片一个task进行处理,用户也可以自定义数据的切片数。而对于外部数据源的数据,Spark可以从任何基于Hadoop框架的数据源创建RDD,一般一个文件块(blocks)创建一个RDD切片,然后在集群上并行计算。

在Spark中,对于RDDs的计算操作有两种类型,一种是Transformations,另一种是Actions。Transformations相当于Hadoop的Map组件,通过对RDDs的并发计算,然后返回新的RDDs对象;而actions则相当于Hadoop的Reduce组件,通过计算(我们这里说的计算就是function)汇总之前Transformation操作产生的RDDs对象,产生最终结果,然后返回到Driver程序中。特别需要说明的是,所有的Transformations操作都是延迟计算的(lazy), 它们一开始只会记录这个Transformations是用在哪一个RDDs上,并不会开始执行计算,除非遇到了需要返回最终结果到Driver程序中的Action操作,这时候Transformations才会开始真正意义上的计算。所以用户的Spark程序最后一步都需要一个Actions类型的操作,否则这个程序并不会触发任何计算。这么做的好处在于能提高Spark的运行效率,因为通过Transformations操作创建的RDDs对象最终只会在Actions类型的方法中用到,而且只会返回包含最终结果的RDDs到Driver中,而不是大量的中间结果。有时候,有些RDDs的计算结果会多次被重复调用,这就触发多次的重复计算,用户可以使用persist()或者cache()方法将部分RDDs的计算结果缓存在整个集群的内存中,这样当其它的RDDs需要之前的RDDs的计算结果时就可以直接从集群的内存中获得,提高运行效率。

在Spark中,另外一个需要了解的概念就是“Shuffle”,当遇到类似“reduceByKey”的Actions操作时,会把集群上所有分片的RDDs都读一遍,然后在集群之间相互拷贝并全部收集起来,统一计算这所有的RDDs,获得一个整体的结果而不再是单个分片的计算结果,接着再重新分发到集群中或者发送回Driver程序。在Shuffle过程中,Spark会产生两种类型的任务,一种是Map task,用于匹配本地分片需要shuffle的数据并将这些数据写入文件中,然后Reduce task就会读取这些文件并整合所有的数据。 所以说”Shuffle”过程会消耗许多本地磁盘的I/O资源,内存资源,网络I/O,附带还会产生许多的序列化过程。通常,repartition类型的操作,比如:repartitions和coalesce,ByKey类型的操作,比如:reduceByKey,groupByKey,join类型的操作,如:cogroup和join等,都会产生Shuffle过程。

接下来,来谈一谈Spark在集群环境下的一些特性,这部分内容非常非常重要,请大家一定要重点理解。首先,读者们一定要记住,Spark是通过Driver把用户打包提交的Spark程序序列化以后,分发到集群中的工作节点上去运行,对于计算结果的汇总是返回到Driver端,也就是说通常用户都是从Driver服务器上获取到最终的计算结果!在这个大前提下我们来探讨下面几个问题:
1. 关于如何正确地将函数传入RDD operation中,有两种推荐的方式,一种就是直接传函数体,另一种是在伴生对象中创建方法,然后通过类名.方法名的方式传入;如下面的代码所示

object DateHandler {
  def parseDate(s: String): String = { ... }
}

rdd.map(DateHandler.parseDate)

错误的传函数的方式如下:

Class MySpark {
 def parseDate(s: String): String = { ... }
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => this.parseDate(x))}
}
…………
val myspark = new MySpark
myspark.rddOperation(sc.rdd)
这样子的传递方式会把整个mySpark对象序列化后传到集群中,会造成不必要的内存开支。
因为向map中传入的“this.parseDate(x)”是一个对象实例和它里面的函数。

当在RDD operation中访问类中的变量时,也会造成传递整个对象的开销,比如:

Class MySpark {
 val myVariable
 def rddOperation(rdd:RDD[String]):RDD[String] = {rdd.map(x => x + myVariable)}
}
这样也相当于x => this.x + myVariable,又关联了这个对象实例,
解决方法就是把这个类的变量传入方法内部做局部变量,
就会从访问对象中的变量变为访问局部变量值
def rddOperation(rdd:RDD[String]):RDD[String] = {val _variable = this.myVariable;rdd.map(x => x + _variable)}

2.第二个特别需要注意的问题就是在RDD operations中去更改一个全局变量,
在集群环境中也是很容易出现错误的,注意下面的代码:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

这段代码最终返回的结果还是0。这是因为这段代码连同counter是序列化后分发到集群上所有的节点机器上,不同的节点上拥有各自独立的counter,并不会是原先Driver上counter的引用,并且统计的值也不一样,最后统计结果也不会返回给Driver去重新赋值。Driver主机上的counter还是它原来的值,不会发生任何变化。如果需要在RDD operations中操作全局变量,就需要使用accumulator()方法,这是一个线程安全的方法,能在并发环境下原子性地改变全局变量的值。

3.对于集群环境下的Spark,第三个重要的是如何去合理地打印RDDs中的值。如果只是使用rdd.foreach(println()) 或者 rdd.map(println())是行不通的,一定要记住,程序会被分送到集群的工作节点上各自运行,println方法调用的也是工作节点上的输入输出接口,而用户获取数据和计算结果都是在Driver主机上的,所以是无法看到这些打印的结果。解决方法之一就是打印前将所有数据先返回Driver,如rdd.collect().foreach(println),但是这可能会让Driver瞬间耗光内存,因为collect操作将集群上的所有数据全部一次性返回给Driver。较为合理的操作为使用take() 方法先获取部分数据,然后再打印,如:rdd.take(100).foreach(println)。
4. 另外需要补充说明的是foreach(func)这个Action操作,它的作用是对集群上每一个datasets元素执行传入的func方法,这个func方法是在各个工作节点上分别执行的。虽然foreach是action操作,但是它并不是先全部将数据返回给Driver然后再在Driver上执行func方法,它返回的给Driver的Unit,这点要特别注意。所以foreach(func)操作里传入的func函数对Driver中的全局变量的操作或者打印数据等操作对于Driver来说都是无效的,这个func函数只运行在工作节点上。
5. 最后要提的是Spark的共享变量,其中一个共享变量就是使用accumulator方法封装的变量,而另一个共享变量就是广播变量(Broadcast Variables)。在谈广播变量之前,大家需要了解一个概念叫“stage”,每次进行shuffle操作之前的所有RDDs的操作都属于同一个stage。所以每次在shuffle操作时,上一个stage计算的结果都会被Spark封装成广播变量,并通过一定的高效算法将这些计算结果在集群上的每个节点里都缓存上一份,并且是read-only的,这样当下一个stage的任务再次需要之前stage的计算结果时就不用再重新计算了。用户可以自定义广播变量,一般是在某个stage的datasets需要被后续多个stage的任务重复使用的情况下设置会比较有意义。

日志清洗

当Flume从日志服务器上获取到Nginx访问日志并拉取到HDFS系统后,我们接下来要做的就是使用Spark进行日志清洗。
首先是启动Spark集群,Spark目前主要有三种集群部署方式,一种是Spark自带Standalone模式做为cluster manager,另外两种分别是Yarn和Mesos作为cluster manager。在Yarn的部署方式下,又细分了两种提交Spark程序的模式,一种是cluster模式,Driver程序直接运行在Application Master上,并直接由Yarn管理,当程序完成初始化工作后相关的客户端进程就会退出;另一种是client模式,提交程序后,Driver一直运行在客户端进程中并和Yarn的Application Master通信获取工作节点资源。在Standalone的部署方式下,也同样是细分了cluster模式和client模式的Spark程序提交方式,cluster模式下Driver是运行在工作节点的进程中,一旦完成提交程序的任务,相关的客户端进程就会退出;而client模式中,Driver会一直运行在客户端进程中并一直向console输出运行信息。本文案例中,使用Standalone模式部署Spark集群,同时我们选择手动部署的方式来启动Spark集群:

//启动 master 节点 启动完后可以通过 localhost:8080 访问Spark自带的UI界面
./sbin/start-master.sh

//启动 Worker 节点 
./sbin/start-slave.sh spark://HOST:PORT

//然后通过spark-submit script 提交Spark程序
//默认是使用client模式运行,也可以手动设置成 cluster模式
//--deploy-mode cluster
$bin/spark-submit --class com.guludada.Spark_ClickStream.VisitsInfo --master spark://ymhHadoop:7077 --executor-memory 1G --total-executor-cores 2 /export/data/spark/sparkclickstream.jar

下面是清洗日志的Spark代码,主要是过滤掉无效的访问日志信息:

package com.guludada.Spark_ClickStream

import scala.io.Source
import java.text.SimpleDateFormat;
import java.util.Locale;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.util.Date;

class WebLogClean extends Serializable {

  def weblogParser(logLine:String):String =  {

      //过滤掉信息不全或者格式不正确的日志信息
      val isStandardLogInfo = logLine.split(" ").length >= 12;

      if(isStandardLogInfo) {

        //过滤掉多余的符号
        val newLogLine:String = logLine.replace("- - ", "").replaceFirst("""\[""", "").replace(" +0000]", "");
        //将日志格式替换成正常的格式
        val logInfoGroup:Array[String] = newLogLine.split(" ");
        val oldDateFormat = logInfoGroup(1);
        //如果访问时间不存在,也是一个不正确的日志信息
        if(oldDateFormat == "-") return ""
        val newDateFormat = WebLogClean.sdf_standard.format(WebLogClean.sdf_origin.parse(oldDateFormat)) 
        return newLogLine.replace(oldDateFormat, newDateFormat)

      } else {

        return ""

      }
  }
}

object WebLogClean {

   val sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   def main(args: Array[String]) {

    val curDate = new Date(); 
    val weblogclean = new WebLogClean
    val logFile = "hdfs://ymhHadoop:9000/flume/events/"+WebLogClean.sdf_hdfsfolder.format(curDate)+"/*" // Should be some file on your system
    val conf = new SparkConf().setAppName("WebLogCleaner").setMaster("local")
    val sc = new SparkContext(conf)
    val logFileSource = sc.textFile(logFile,1).cache()

    val logLinesMapRDD = logFileSource.map(x => weblogclean.weblogParser(x)).filter(line => line != "");
    logLinesMapRDD.saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogClean.sdf_hdfsfolder.format(curDate)) 

  }

}

经过清洗后的日志格式如下:
这里写图片描述

接着为每一条访问记录拼上sessionID

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.UUID;
import java.util.Date;

class WebLogSession {

}

object WebLogSession {

   val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
   val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(1)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(1)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def distinctLogInfoBySession(logInfoGroup:List[String]):List[String] = {

       val logInfoBySession:ListBuffer[String] = new ListBuffer[String]
       var lastRequestTime:Long = 0;
       var lastSessionID:String = "";

       for(logInfo <- logInfoGroup) {

         //某IP的用户第一次访问网站的记录做为该用户的第一个session日志
         if(lastRequestTime == 0) {

           lastSessionID = UUID.randomUUID().toString();
           //将该次访问日志记录拼上sessionID并放进按session分类的日志信息数组中
           logInfoBySession += lastSessionID + " " +logInfo
           //记录该次访问日志的时间,并用户和下一条访问记录比较,看时间间隔是否超过30分钟,是的话就代表新Session开始
           lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

         } else {

           //当前日志记录和上一次的访问时间相比超过30分钟,所以认为是一个新的Session,重新生成sessionID
           if(sdf_standard.parse(logInfo.split(" ")(1)).getTime - lastRequestTime >= 30 * 60 * 1000) {
               //和上一条访问记录相比,时间间隔超过了30分钟,所以当做一次新的session,并重新生成sessionID
               lastSessionID = UUID.randomUUID().toString();
               logInfoBySession += lastSessionID + " " +logInfo
               //记录该次访问日志的时间,做为一个新session开始的时间,并继续和下一条访问记录比较,看时间间隔是否又超过30分钟
               lastRequestTime = sdf_standard.parse(logInfo.split(" ")(1)).getTime

           } else { //当前日志记录和上一次的访问时间相比没有超过30分钟,所以认为是同一个Session,继续沿用之前的sessionID

               logInfoBySession += lastSessionID + " " +logInfo
           }           
         }         
       }
       return logInfoBySession.toList
   }

   def main(args: Array[String]) {



      val curDate = new Date(); 
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/cleaned_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("WebLogSession").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile, 1).cache()

      //将log信息变为(IP,log信息)的tuple格式,也就是按IP地址将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(IP[String],log信息[Iterator])中的日志按时间的升序排序
      //(其实这一步没有必要,本来Nginx的日志信息就是按访问先后顺序记录的,这一步只是为了演示如何在Scala语境下进行自定义排序) 
      //排完序后(IP[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => WebLogSession.dateComparator(A,B)))

      //将每一个IP的日志信息按30分钟的session分类并拼上session信息
      val logInfoBySessionRDD = sortedLogRDD.map(WebLogSession.distinctLogInfoBySession(_))
      //将List中的日志信息拆分成单条日志信息输出
      val logInfoWithSessionRDD =  logInfoBySessionRDD.flatMap(line => line).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   } 
}

拼接上sessionID的日志如下所示:
这里写图片描述

最后一步就是根据SessionID来整理用户的浏览信息,代码如下:

package com.guludada.Spark_ClickStream

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date;

class VisitsInfo {

}

object VisitsInfo {

  val sdf_standard = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
  val sdf_hdfsfolder = new SimpleDateFormat("yy-MM-dd");

   //自定义的将日志信息按日志创建的时间升序排序
   def dateComparator(elementA:String ,elementB:String):Boolean = {     
     WebLogSession.sdf_standard.parse(elementA.split(" ")(2)).getTime < WebLogSession.sdf_standard.parse(elementB.split(" ")(2)).getTime
   }

   import scala.collection.mutable.ListBuffer
   def getVisitsInfo(logInfoGroup:List[String]):String = {

     //获取用户在该次session里所访问的页面总数
     //先用map函数将某次session里的所有访问记录变成(url,logInfo)元组的形式,然后再用groupBy函数按url分组,最后统计共有几个组
    val visitPageNum = logInfoGroup.map(log => (log.split(" ")(4),log)).groupBy(x => x._1).count(p => true)

    //获取该次session的ID
    val sessionID = logInfoGroup(0).split(" ")(0)

    //获取该次session的开始时间
    val startTime = logInfoGroup(0).split(" ")(2)

    //获取该次session的结束时间
    val endTime = logInfoGroup(logInfoGroup.length-1).split(" ")(2)

    //获取该次session第一次访问的url
    val entryPage = logInfoGroup(0).split(" ")(4)

    //获取该次session最后一次访问的url
    val leavePage = logInfoGroup(logInfoGroup.length-1).split(" ")(4)

    //获取该次session的用户IP
    val IP = logInfoGroup(0).split(" ")(1)

    //获取该次session的用户从哪个网站过来
    val referal = logInfoGroup(0).split(" ")(8)

     return sessionID + " " + startTime + " " + endTime + " " + entryPage + " " + leavePage + " " + visitPageNum + " " + IP + " " + referal;

   }

   def main(args: Array[String]) {

      val curDate = new Date();      
      val logFile = "hdfs://ymhHadoop:9000/spark_clickstream/session_log/"+WebLogSession.sdf_hdfsfolder.format(curDate) // Should be some file on your system
      val conf = new SparkConf().setAppName("VisitsInfo").setMaster("local")
      val sc = new SparkContext(conf)
      val logFileSource = sc.textFile(logFile,1).cache()

      //将log信息变为(session,log信息)的tuple格式,也就是按session将log分组
      val logLinesKVMapRDD = logFileSource.map(line => (line.split(" ")(0),line)).groupByKey();
      //对每个(session[String],log信息[Iterator])中的日志按时间的升序排序
      //排完序后(session[String],log信息[Iterator])的格式变为log信息[Iterator]
      val sortedLogRDD = logLinesKVMapRDD.map(_._2.toList.sortWith((A,B) => VisitsInfo.dateComparator(A,B)))

      //统计每一个单独的Session的相关信息
      sortedLogRDD.map(VisitsInfo.getVisitsInfo(_)).saveAsTextFile("hdfs://ymhHadoop:9000/spark_clickstream/visits_log/"+WebLogSession.sdf_hdfsfolder.format(curDate))

   }
}

最后整理出来的日志信息的格式和示例图:
SessionID 访问时间 离开时间 第一次访问页面 最后一次访问的页面 访问的页面总数 IP Referal
Session1 2016-05-30 15:17:00 2016-05-30 15:19:00 /blog/me /blog/others 5 192.168.12.130 www.baidu.com
Session2 2016-05-30 14:17:00 2016-05-30 15:19:38 /home /profile 10 192.168.12.140 www.178.com
Session3 2016-05-30 12:17:00 2016-05-30 15:40:00 /products /detail 6 192.168.12.150 www.78dm.com

这里写图片描述

Hive

Hive是一个数据仓库,让用户可以使用SQL语言操作分布式存储系统中的数据。在客户端,用户可以使用如何关系型数据库一样的建表SQL语句来创建数据仓库的数据表,并将HDFS中的数据导入到数据表中,接着就可以使用Hive SQL语句非常方便地对HDFS中的数据做一些增删改查的操作;在底层,当用户输入Hive Sql语句后,Hive会将SQL语句发送到它的Driver进程中的语义分析器进行分析,然后根据Hive SQL的语义转化为对应的Hadoop MapReduce程序来对HDFS中数据来进行操作;同时,Hive还将表的表名,列名,分区,属性,以及表中的数据的路径等元数据信息都存储在外部的数据库中,如:Mysql或者自带的Derby数据库等。
Hive中主要由以下几种数据模型组成:
1. Databases,相当于命名空间的作用,用来避免同名的表,视图,列名的冲突,就相当于管理同一类别的一组表的库。具体的表现为HDFS中/user/hive/warehouse/中的一个目录。
2. Tables,是具有同一模式的数据的抽象,简单点来说就是传统关系型数据库中的表。具体的表现形式为Databases下的子目录,里面存储着表中的数据块文件,而这些文件是从经过MapReduce清洗后的贴源数据文件块拷贝过来的,也就是使用Hive SQL 中的Load语句,Load语句就是将原先HDFS系统中的某个路径里的数据拷贝到/user/hive/warehouse/路径里的过程,然后通过Mysql中存储的元数据信息将这些数据和Hive的表映射起来。
3. Partitions,创建表时,用户可以指定以某个Key值来为表中的数据分片。从Tables的层面来讲,Partition就是表中新加的一个虚拟字段,用来为数据分类,在HDFS文件系统中的体现就是这个表的数据分片都按Key来划分并进入到不同的目录中,但是Hive不会保证属于某个Key的内容就一定会进入到某个分片中,因为Hive无法感知,所以需要用户在插入数据时自己要将数据根据key值划分到所对应的数据分片中,这样在以后才能提高查询效率。
4. Buckets(Clusters),是指每一个分片上的数据根据表中某个列的hash值组织在一起,也就是进入到同一个桶中,这样能提升数据查询的效率。分桶最大的意义在于增加join的效率。比如 select user.id, user.name,admin.tele from user join admin on user.id=admin.id, 已经根据id将数据分进不同的桶里,两个数据表join的时候,只要把hash结果相同的桶直接相连就行,提高join的效率。一般两张表的分桶数量要一致,才能达到join的最高效率,如果是倍数关系,也会提高join的效率但没有一致数量的分桶效率高,如果不是倍数关系分桶又不一致,那么效率和没分桶没什么区别。

Spark SQL

在作者之前的Hadoop文章里,使用MapReduce清洗完日志文件后,在Hive的客户端中使用Hive SQL去构建对应的数据仓库并对数据进行分析。和之前不同的是,在本篇文章中, 作者使用的是Spark SQL去对Hive数据仓库进行操作。因为文章篇幅有限,下面只对Spark SQL进行一个简单的介绍,更多具体的内容读者们可以去阅读官方文档。

Spark SQL是Spark项目中专门用来处理结构化数据的一个模块,用户可以通过SQL,DataFrames API,DataSets API和Spark SQL进行交互。Spark SQL可以通过标准的SQL语句对各种数据源中的数据进行操作,如Json,Parquet等,也可以通过Hive SQL操作Hive中的数据;DataFrames是一组以列名组织的数据结构,相当于关系型数据库中的表,DataFrames可以从结构化的数据文件中创建而来,如Json,Parquet等,也可以从Hive中的表,外部数据库,RDDs等创建出来;Datasets是Spark1.6后新加入的API,类似于RDDs,可以使用Transformations和Actions API 操作数据,同时提供了很多运行上的优化,并且用Encoder来替代Java Serialization接口进行序列化相关的操作。

DataFrames可以通过RDDs转化而来,其中一种转化方式就是通过case class来定义DataFrames中的列结构,也可以说是表结构,然后将RDDs中的数据转化为case class对象,接着通过反射机制获取到case class对表结构的定义并转化成DataFrames对象。转化成DF对象后,用户可以方便地使用DataFrames提供的“domain-specific”操作语言来操作里面的数据,亦或是将DataFrames对象注册成其对应的表,然后通过标准SQL语句来操作里面的数据。总之,Spark SQL提供了多样化的数据结构和操作方法让我们能以SQL语句方便地对数据进行操作,减少运维和开发成本,十分方便和强大!

而在本案例里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。
Visits数据分析
页面具体访问记录Visits的事实表和维度表结构
这里写图片描述

接下来启动spark shell,然后使用Spark SQL去操作Hive数据仓库

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

在spark shell顺序执行如下命令操作Hive数据仓库,在此过程中,大家会发现执行速度比在Hive客户端中快很多,原因就在于使用Spark SQL去操作Hive,其底层使用的是Spark RDDs去操作HDFS中的数据,而不再是原来的Hadoop MapReduce。

//创建HiveContext对象,并且该对象继承了SqlContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

//在数据仓库中创建Visits信息的贴源数据表:
sqlContext.sql("create table visitsinfo_spark(session string,startdate string,enddate string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate) into 4 buckets row format delimited fields terminated by ' '")

//将HDFS中的数据导入到HIVE的Visits信息贴源数据表中
sqlContext.sql("load data inpath '/spark_clickstream/visits_log/16-07-18' overwrite into table visitsinfo_spark partition(inputDate='2016-07-27')")

这里写图片描述

//  根据具体的业务分析逻辑创建ODS层的Visits事实表,并从visitsinfo_spark的贴源表中导入数据
sqlContext.sql("create table ods_visits_spark(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' '")

sqlContext.sql("insert into table ods_visits_spark partition(inputDate='2016-07-27') select vi.session,vi.startdate,vi.enddate,vi.entrypage,vi.leavepage,vi.viewpagenum,vi.ip,vi.referal from visitsinfo_spark as vi where vi.inputDate='2016-07-27'")

//创建Visits事实表的时间维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_time_spark(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' '")

// 将“访问时间”和“离开时间”两列的值合并后再放入时间维度表中,减少数据的冗余
sqlContext.sql("insert overwrite table ods_dim_visits_time_spark partition(inputDate='2016-07-27') select distinct ov.timeparam, substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits_spark as ov1 union select ov2.leavetime as timeparam from ods_visits_spark as ov2) as ov")

这里写图片描述

//创建visits事实表的URL维度表并从当天的事实表里导入数据
sqlContext.sql("create table ods_dim_visits_url_spark(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields terminated by ' '")

//将每个session的进入页面和离开页面的URL合并后存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits_spark as ov1 union select ov2.leavepage as pageurl from ods_visits_spark as ov2 ) as ov lateral view parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query")

//将每个session从哪个外站进入当前网站的信息存入到URL维度表中
sqlContext.sql("insert into table ods_dim_visits_url_spark partition(inputDate='2016-07-27') select distinct ov.referal,b.host,b.path,b.query from ods_visits_spark as ov lateral view parse_url_tuple(substr(ov.referal,2,length(ov.referal)-2),'HOST','PATH','QUERY') b as host,path,query")

这里写图片描述

//查询访问网站页面最多的前20个session的信息
sqlContext.sql("select * from ods_visits_spark as ov sort by viewpagenum desc").show()

这里写图片描述

Troubleshooting

使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题

需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。

使用Flume拉取到HDFS中的文件格式错乱

这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致

启动Spark任务的时候会报任务无法序列化的错误

这里写图片描述
而这个错误的主要原因是Driver向worker通过RPC通信发送的任务无法序列化,很有可能就是用户在使用transformations或actions方法的时候,向这个方法中传入的函数里包含不可序列化的对象,如上面的程序中 logFileSource.map(x => weblogclean.weblogParser(x)) 向map中传入的函数包含不可序列化的对象weblogclean,所以要将该对象的相关类变为可序列化的类,通过extends Serializable的方法解决

在分布式环境下如何设置每个用户的SessionID

可以使用UUID,UUID是分布式环境下唯一的元素识别码,它由日期和时间,时钟序列,机器识别码(一般为网卡MAC地址)三部分组成。这样就保证了每个用户的SessionID的唯一性。

使用maven编译Spark程序时报错

在使用maven编译Spark程序时会报错,[ERROR] error: error while loading CharSequence, class file ‘/Library/Java/JavaVirtualMachines/jdk1.8.0_77.jdk/Contents/Home/jre/lib/rt.jar(java/lang/CharSequence.class)’ is broken
如图:
这里写图片描述
主要原因是Scala 2.10 和 JDK1.8的版本冲突问题,解决方案只能是将JDK降到1.7去编译

要在Spark中使用HiveContext,配置完后启动spark-shell报错

要在Spark中使用HiveContext,将所需的Hive配置文件拷贝到Spark项目的conf目录下,并且把连接数据库的Driver包也放到了Spark项目中的lib目录下,然后启动spark-shell报错,主要还是找不到CLASSPATH中的数据库连接驱动包,如下图:
这里写图片描述
这里写图片描述
目前作者想到的解决方案比较笨拙:就是启动spark-shell的时候显示地告诉驱动jar包的位置

$bin/spark-shell --jars lib/mysql-connector-java-5.0.5.jar

from:http://blog.csdn.net/ymh198816/article/details/52014315

BigData

作者:Xiaoyu Ma
链接:https://www.zhihu.com/question/27974418/answer/38965760
来源:知乎
著作权归作者所有,转载请联系作者获得授权。

大数据本身是个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的。你可以把它比作一个厨房所以需要的各种工具。锅碗瓢盆,各有各的用处,互相之间又有重合。你可以用汤锅直接当碗吃饭喝汤,你可以用小刀或者刨子去皮。但是每个工具有自己的特性,虽然奇怪的组合也能工作,但是未必是最佳选择。大数据,首先你要能存的下大数据。
传统的文件系统是单机的,不能横跨不同的机器。HDFS(Hadoop Distributed FileSystem)的设计本质上是为了大量的数据能横跨成百上千台机器,但是你看到的是一个文件系统而不是很多文件系统。比如你说我要获取/hdfs/tmp/file1的数据,你引用的是一个文件路径,但是实际的数据存放在很多不同的机器上。你作为用户,不需要知道这些,就好比在单机上你不关心文件分散在什么磁道什么扇区一样。HDFS为你管理这些数据。

存的下数据之后,你就开始考虑怎么处理数据。虽然HDFS可以为你整体管理不同机器上的数据,但是这些数据太大了。一台机器读取成T上P的数据(很大的数据哦,比如整个东京热有史以来所有高清电影的大小甚至更大),一台机器慢慢跑也许需要好几天甚至好几周。对于很多公司来说,单机处理是不可忍受的,比如微博要更新24小时热博,它必须在24小时之内跑完这些处理。那么我如果要用很多台机器处理,我就面临了如何分配工作,如果一台机器挂了如何重新启动相应的任务,机器之间如何互相通信交换数据以完成复杂的计算等等。这就是MapReduce / Tez / Spark的功能。MapReduce是第一代计算引擎,Tez和Spark是第二代。MapReduce的设计,采用了很简化的计算模型,只有Map和Reduce两个计算过程(中间用Shuffle串联),用这个模型,已经可以处理大数据领域很大一部分问题了。
那什么是Map什么是Reduce?
考虑如果你要统计一个巨大的文本文件存储在类似HDFS上,你想要知道这个文本里各个词的出现频率。你启动了一个MapReduce程序。Map阶段,几百台机器同时读取这个文件的各个部分,分别把各自读到的部分分别统计出词频,产生类似
(hello, 12100次),(world,15214次)等等这样的Pair(我这里把Map和Combine放在一起说以便简化);这几百台机器各自都产生了如上的集合,然后又有几百台机器启动Reduce处理。Reducer机器A将从Mapper机器收到所有以A开头的统计结果,机器B将收到B开头的词汇统计结果(当然实际上不会真的以字母开头做依据,而是用函数产生Hash值以避免数据串化。因为类似X开头的词肯定比其他要少得多,而你不希望数据处理各个机器的工作量相差悬殊)。然后这些Reducer将再次汇总,(hello,12100)+(hello,12311)+(hello,345881)= (hello,370292)。每个Reducer都如上处理,你就得到了整个文件的词频结果。
这看似是个很简单的模型,但很多算法都可以用这个模型描述了。
Map+Reduce的简单模型很黄很暴力,虽然好用,但是很笨重。第二代的Tez和Spark除了内存Cache之类的新feature,本质上来说,是让Map/Reduce模型更通用,让Map和Reduce之间的界限更模糊,数据交换更灵活,更少的磁盘读写,以便更方便地描述复杂算法,取得更高的吞吐量。

有了MapReduce,Tez和Spark之后,程序员发现,MapReduce的程序写起来真麻烦。他们希望简化这个过程。这就好比你有了汇编语言,虽然你几乎什么都能干了,但是你还是觉得繁琐。你希望有个更高层更抽象的语言层来描述算法和数据处理流程。于是就有了Pig和Hive。Pig是接近脚本方式去描述MapReduce,Hive则用的是SQL。它们把脚本和SQL语言翻译成MapReduce程序,丢给计算引擎去计算,而你就从繁琐的MapReduce程序中解脱出来,用更简单更直观的语言去写程序了。

有了Hive之后,人们发现SQL对比Java有巨大的优势。一个是它太容易写了。刚才词频的东西,用SQL描述就只有一两行,MapReduce写起来大约要几十上百行。而更重要的是,非计算机背景的用户终于感受到了爱:我也会写SQL!于是数据分析人员终于从乞求工程师帮忙的窘境解脱出来,工程师也从写奇怪的一次性的处理程序中解脱出来。大家都开心了。Hive逐渐成长成了大数据仓库的核心组件。甚至很多公司的流水线作业集完全是用SQL描述,因为易写易改,一看就懂,容易维护。

自从数据分析人员开始用Hive分析数据之后,它们发现,Hive在MapReduce上跑,真鸡巴慢!流水线作业集也许没啥关系,比如24小时更新的推荐,反正24小时内跑完就算了。但是数据分析,人们总是希望能跑更快一些。比如我希望看过去一个小时内多少人在充气娃娃页面驻足,分别停留了多久,对于一个巨型网站海量数据下,这个处理过程也许要花几十分钟甚至很多小时。而这个分析也许只是你万里长征的第一步,你还要看多少人浏览了跳蛋多少人看了拉赫曼尼诺夫的CD,以便跟老板汇报,我们的用户是猥琐男闷骚女更多还是文艺青年/少女更多。你无法忍受等待的折磨,只能跟帅帅的工程师蝈蝈说,快,快,再快一点!
于是Impala,Presto,Drill诞生了(当然还有无数非著名的交互SQL引擎,就不一一列举了)。三个系统的核心理念是,MapReduce引擎太慢,因为它太通用,太强壮,太保守,我们SQL需要更轻量,更激进地获取资源,更专门地对SQL做优化,而且不需要那么多容错性保证(因为系统出错了大不了重新启动任务,如果整个处理时间更短的话,比如几分钟之内)。这些系统让用户更快速地处理SQL任务,牺牲了通用性稳定性等特性。如果说MapReduce是大砍刀,砍啥都不怕,那上面三个就是剔骨刀,灵巧锋利,但是不能搞太大太硬的东西。

这些系统,说实话,一直没有达到人们期望的流行度。因为这时候又两个异类被造出来了。他们是Hive on Tez / Spark和SparkSQL。它们的设计理念是,MapReduce慢,但是如果我用新一代通用计算引擎Tez或者Spark来跑SQL,那我就能跑的更快。而且用户不需要维护两套系统。这就好比如果你厨房小,人又懒,对吃的精细程度要求有限,那你可以买个电饭煲,能蒸能煲能烧,省了好多厨具。

上面的介绍,基本就是一个数据仓库的构架了。底层HDFS,上面跑MapReduce/Tez/Spark,在上面跑Hive,Pig。或者HDFS上直接跑Impala,Drill,Presto。这解决了中低速数据处理的要求。

那如果我要更高速的处理呢?
如果我是一个类似微博的公司,我希望显示不是24小时热博,我想看一个不断变化的热播榜,更新延迟在一分钟之内,上面的手段都将无法胜任。于是又一种计算模型被开发出来,这就是Streaming(流)计算。Storm是最流行的流计算平台。流计算的思路是,如果要达到更实时的更新,我何不在数据流进来的时候就处理了?比如还是词频统计的例子,我的数据流是一个一个的词,我就让他们一边流过我就一边开始统计了。流计算很牛逼,基本无延迟,但是它的短处是,不灵活,你想要统计的东西必须预先知道,毕竟数据流过就没了,你没算的东西就无法补算了。因此它是个很好的东西,但是无法替代上面数据仓库和批处理系统。

还有一个有些独立的模块是KV Store,比如Cassandra,HBase,MongoDB以及很多很多很多很多其他的(多到无法想象)。所以KV Store就是说,我有一堆键值,我能很快速滴获取与这个Key绑定的数据。比如我用身份证号,能取到你的身份数据。这个动作用MapReduce也能完成,但是很可能要扫描整个数据集。而KV Store专用来处理这个操作,所有存和取都专门为此优化了。从几个P的数据中查找一个身份证号,也许只要零点几秒。这让大数据公司的一些专门操作被大大优化了。比如我网页上有个根据订单号查找订单内容的页面,而整个网站的订单数量无法单机数据库存储,我就会考虑用KV Store来存。KV Store的理念是,基本无法处理复杂的计算,大多没法JOIN,也许没法聚合,没有强一致性保证(不同数据分布在不同机器上,你每次读取也许会读到不同的结果,也无法处理类似银行转账那样的强一致性要求的操作)。但是丫就是快。极快。
每个不同的KV Store设计都有不同取舍,有些更快,有些容量更高,有些可以支持更复杂的操作。必有一款适合你。

除此之外,还有一些更特制的系统/组件,比如Mahout是分布式机器学习库,Protobuf是数据交换的编码和库,ZooKeeper是高一致性的分布存取协同系统,等等。

有了这么多乱七八糟的工具,都在同一个集群上运转,大家需要互相尊重有序工作。所以另外一个重要组件是,调度系统。现在最流行的是Yarn。你可以把他看作中央管理,好比你妈在厨房监工,哎,你妹妹切菜切完了,你可以把刀拿去杀鸡了。只要大家都服从你妈分配,那大家都能愉快滴烧菜。

你可以认为,大数据生态圈就是一个厨房工具生态圈。为了做不同的菜,中国菜,日本菜,法国菜,你需要各种不同的工具。而且客人的需求正在复杂化,你的厨具不断被发明,也没有一个万用的厨具可以处理所有情况,因此它会变的越来越复杂。