Category Archives: BigData

Sugar Rush versión rápida: más giros en menos tiempo



Pragmatic Play ha revolucionado la experiencia de juego en línea con Sugar Rush, un tragamonedas de temática dulce que ahora llega en una versión rápida diseñada especialmente para jugadores de México que buscan emoción instantánea y múltiples oportunidades en menor tiempo. En este análisis, descubriremos qué hace tan especial a esta variante de Sugar Rush y qué debes saber para disfrutar al máximo de este entretenido juego.

¿Qué es Sugar Rush?

Sugar Rush es un tragamonedas de video con gráficos vibrantes, basado en un mundo colorido lleno de caramelos, gomitas y todo tipo de dulces atractivos que encantan a cualquier jugador. El juego original cuenta con 6 rodillos y hasta 4096 formas de ganar, mientras que su versión rápida conserva la esencia pero acelera el ritmo con más giros por minuto.

Características principales

  • Alta volatilidad
  • Multiplicadores que pueden aumentar las ganancias
  • Funciones de giros gratis y símbolos especiales
  • Interfaz amigable y optimizada para móviles

Review de Sugar Rush versión rápida

Sugar Rush versión rápida: más giros en menos tiempo

Esta versión rápida de Sugar Rush ofrece una jugabilidad similar al Sugar Rush clásico, pero con una ventaja fundamental: las rondas de juego se desarrollan mucho más rápido, ideal para jugadores que prefieren una experiencia intensa sin largas esperas. El ritmo acelerado no compromete la calidad gráfica ni el diseño de sonido, manteniendo la atmósfera divertida y dulce que ha cautivado a la comunidad.

Interfaz y usabilidad

El interfaz sigue siendo intuitivo y sencillo de dominar. Los botones para girar, apostar y acceder a la tabla de pagos son accesibles sin distraer, mientras que las animaciones mantienen la energía y la diversión del juego.

¿Dónde jugar Sugar Rush versión rápida en México?

Muchos casinos en línea confiables ofrecen esta versión, especialmente aquellos que cuentan con licencia para operar en México, garantizando seguridad y pagos rápidos:

  • Caliente Casino
  • PlayUZU
  • Betway

Estos casinos proporcionan además promociones exclusivas que pueden aumentar el saldo inicial y prolongar la diversión.

Preguntas frecuentes sobre Sugar Rush versión rápida

¿Necesito descargar algún software para jugar?

No, Sugar Rush versión rápida se puede disfrutar directamente en el navegador, ya sea en computadora o dispositivos móviles.

¿Cuál es la apuesta mínima y máxima en esta versión?

Las apuestas suelen variar entre 0.20 MXN y 1000 MXN, adaptándose tanto a jugadores casuales como a los más experimentados.

¿Puede jugarse gratis?

Sí, la modalidad demo está disponible para que puedas familiarizarte con el juego sin riesgo antes de apostar con dinero real.

Entrevista con un jugador ganador de Sugar Rush

José Miguel, residente en Ciudad de México y aficionado a los tragamonedas, nos compartió su experiencia:

“Empecé a jugar Sugar Rush versión rápida hace tres meses y rápidamente me enganché. Me gusta la rapidez con la que giran los rodillos porque hace el juego más emocionante y puedo probar más estrategias. La última vez, gané un premio de 15,000 MXN y desde entonces recomiendo el juego a mis amigos.”

Tabla: Parámetros principales de Sugar Rush (versión rápida)

Parámetro Detalle
Provedor Pragmatic Play
Volatilidad Alta
RTP (Retorno al jugador) 96.50%
Formato 6 rodillos, hasta 4096 maneras de ganar
Apuesta mínima 0.20 MXN
Apuesta máxima 1000 MXN

Análisis de la popularidad de Sugar Rush en México

El mercado mexicano ha mostrado una creciente preferencia por juegos que combinan gráficos llamativos y mecánicas dinámicas, como es el caso de Sugar Rush. Su versión rápida ha ampliado esta demanda al ofrecer partidas intensas, perfectas para los jugadores que disponen de poco tiempo pero quieren optimizar su diversión. Además, el creciente acceso a dispositivos móviles y la regulación más clara del juego en línea en México hacen que Sugar Rush sea una opción atractiva y segura para los apostadores nacionales.

Comentarios sobre la demo

Muchos jugadores valoran la opción de jugar la demo para familiarizarse con los símbolos, funciones especiales y velocidad de esta versión. La demo funciona sin registro y permite ajustar apuestas, lo que facilita entender mejor la dinámica antes de apostar dinero real.

Consejos para disfrutar Sugar Rush rápido y responsablemente

  1. Establece un presupuesto antes de jugar y respétalo.
  2. Prueba primero la versión demo para conocer mejor el juego.
  3. Aprovecha las promociones ofrecidas por los casinos para maximizar tu saldo.
  4. Juega pausadamente y evita dejarte llevar por rachas rápidas de pérdida.
  5. Consulta con el soporte del casino ante cualquier duda para obtener ayuda inmediata.

Con estas recomendaciones y el conocimiento sobre Sugar Rush versión rápida, los jugadores mexicanos tienen en sus manos una opción divertida, moderna y segura para vivir la emoción de los tragamonedas en línea.

Sweet Bonanza Megaways – Der deutsche Testbericht



Sweet Bonanza Megaways von Pragmatic Play hat sich in der deutschen Online-Casino-Szene schnell einen Namen gemacht. Die Kombination aus dem bekannten Sweet Bonanza-Thema und der Megaways-Mechanik verspricht Spannung‚ große Gewinnchancen und abwechslungsreiches Gameplay. In diesem Artikel erfahren Sie alles Wichtige über diesen Slot‚ warum er in Deutschland immer beliebter wird und in welchen Casinos Sie ihn am besten spielen können.

Überblick und Gameplay

Sweet Bonanza Megaways ist die Megaways-Variante des beliebten Sweet Bonanza Slots. Statt fester Gewinnlinien bietet die Megaways-Mechanik tausende Möglichkeiten‚ Gewinne zu erzielen – in diesem Fall bis zu 117.649 Gewinnwege pro Runde. Das Spielfeld ist variabel‚ mit 6 Walzen und zwischen 2 und 7 Symbolen pro Walze.

Grundregeln des Spiels

  • Mit zehn oder mehr gleichen Symbolen irgendwo auf den Walzen kann ein Gewinn entstehen.
  • Der Tumble-Mechanismus lässt Gewinnsymbole verschwinden und neue an deren Stelle fallen‚ was aufeinanderfolgende Gewinne in einer Runde ermöglichen kann.
  • Das Spiel bietet Scatter-Symbole‚ mit denen Freispiele ausgelöst werden.
  • Multiplikatoren während der Freispiele erhöhen die Gewinnsumme erheblich.

Wo Sie Sweet Bonanza Megaways in Deutschland spielen können

Sweet Bonanza Megaways – Der deutsche Testbericht

Dank der regulierten Online-Casino-Landschaft in Deutschland ist Sweet Bonanza Megaways in zahlreichen lizenzierten Casinos verfügbar. Einige der besten Optionen sind:

Casino Bonus für neue Spieler Zahlungsmethoden Boni-Umsetzung
LeoVegas 100% bis zu 150 € PayPal‚ Kreditkarte‚ Sofortüberweisung 20x Umsatz
Mr Green 100 € + 100 Freispiele Klarna‚ Skrill‚ Neteller 35x Umsatz
Unibet 150 € + 50 Freispiele Banküberweisung‚ Trustly‚ Paysafecard 25x Umsatz

Die Popularität von Sweet Bonanza Megaways in Deutschland

In den letzten Jahren ist das Interesse an Megaways-Slots enorm gewachsen‚ und Sweet Bonanza Megaways profitiert stark davon. Die Mischung aus einem süßen‚ bunten Design mit vielen Frucht- und Bonbon-Symbolen und gleichzeitig dynamischen Gewinnmöglichkeiten zieht Spieler unterschiedlichster Erfahrungsstufen an.

Deutsche Spieler schätzen besonders:

  • Hohe Volatilität mit der Chance auf große Gewinne
  • Einsteigerfreundliches‚ intuitives Interface und mobile Optimierung
  • Die spannende Freispielrunde mit Multiplikatoren

Interview mit einem Gewinner

Wir sprachen mit Markus aus München‚ der vor Kurzem bei einem deutschen Online-Casino einen dreistelligen Euro-Betrag an Echtgeld mit Sweet Bonanza Megaways gewann.

Markus: „Das Spiel macht einfach Spaß‚ besonders mit dem Tumble-Feature. Ich hatte mein bestes Ergebnis in einer Freispielrunde mit einem Multiplikator von x10. Es ist aufregend‚ wie schnell die Gewinne hier ansteigen können. Ich empfehle allen deutschen Spielern‚ es mal auszuprobieren‚ dabei aber verantwortungsvoll zu spielen.“

Häufige Fragen zu Sweet Bonanza Megaways

Wie funktioniert der Tumble-Effekt?

Der Tumble-Effekt sorgt dafür‚ dass nach jedem Gewinn alle Gewinnsymbole verschwinden und neue von oben nachfallen‚ um freie Felder zu füllen. Das ermöglicht Kettengewinne in einer einzigen Drehung.

Kann ich Sweet Bonanza Megaways kostenlos ausprobieren?

Ja‚ viele Online-Casinos bieten eine kostenlose Demoversion an‚ in der Sie das Spiel ohne Risiko mit Spielgeld testen können. Dies ist perfekt‚ um die Regeln und Features kennenzulernen‚ bevor man echtes Geld setzt.

Was ist die RTP (Return to Player) des Slots?

Die theoretische Auszahlungsquote von Sweet Bonanza Megaways liegt bei etwa 96‚48 %‚ was im Vergleich zu anderen Slots als relativ fair angesehen wird.

Expertenerfahrungen

Feedback eines erfahrenen Spielers

Nicole‚ eine Online-Casinospielerin aus Hamburg mit mehreren Jahren Erfahrung‚ erklärt:

„Sweet Bonanza Megaways ist eine hervorragende Kombination aus unterhaltsamem Gameplay und attraktiven Gewinnchancen. Besonders gut gefällt mir der variable Walzenaufbau‚ der das Spielerlebnis jedes Mal anders gestaltet. Die Möglichkeit‚ Freispiele zu erreichen‚ macht jede Sitzung spannend.“

Fazit

Für deutsche Spieler ist Sweet Bonanza Megaways eine exzellente Wahl‚ wenn es um Slots mit farbenfrohem Design und hoher Gewinnpotenz geht. Die zugängliche Spielmechanik‚ kombiniert mit den Megaways-Gewinnmöglichkeiten‚ sorgt für dauerhaften Spielspaß und die Aussicht auf richtig große Auszahlungen.

Wer in Deutschland legal und sicher Sweet Bonanza Megaways spielen möchte‚ sollte sich an die empfohlenen Online-Casinos halten‚ die über gültige deutsche Lizenzen verfügen. So steht einem spannenden Spielvergnügen nichts im Weg.

40 Interview Questions asked at Startups in Machine Learning / Data Science

Machine learning and data science are being looked as the drivers of the next industrial revolution happening in the world today. This also means that there are numerous exciting startups looking for data scientists.  What could be a better start for your aspiring career!

However, still, getting into these roles is not easy. You obviously need to get excited about the idea, team and the vision of the company. You might also find some real difficult techincal questions on your way. The set of questions asked depend on what does the startup do. Do they provide consulting? Do they build ML products ? You should always find this out prior to beginning your interview preparation.

To help you prepare for your next interview, I’ve prepared a list of 40 plausible & tricky questions which are likely to come across your way in interviews. If you can answer and understand these question, rest assured, you will give a tough fight in your job interview.

Note: A key to answer these questions is to have concrete practical understanding on ML and related statistical concepts.

40 interview questions asked at startups in machine learning

 

Interview Questions on Machine Learning

Q1. You are given a train data set having 1000 columns and 1 million rows. The data set is based on a classification problem. Your manager has asked you to reduce the dimension of this data so that model computation time can be reduced. Your machine has memory constraints. What would you do? (You are free to make practical assumptions.)

Answer: Processing a high dimensional data on a limited memory machine is a strenuous task, your interviewer would be fully aware of that. Following are the methods you can use to tackle such situation:

  1. Since we have lower RAM, we should close all other applications in our machine, including the web browser, so that most of the memory can be put to use.
  2. We can randomly sample the data set. This means, we can create a smaller data set, let’s say, having 1000 variables and 300000 rows and do the computations.
  3. To reduce dimensionality, we can separate the numerical and categorical variables and remove the correlated variables. For numerical variables, we’ll use correlation. For categorical variables, we’ll use chi-square test.
  4. Also, we can use PCA and pick the components which can explain the maximum variance in the data set.
  5. Using online learning algorithms like Vowpal Wabbit (available in Python) is a possible option.
  6. Building a linear model using Stochastic Gradient Descent is also helpful.
  7. We can also apply our business understanding to estimate which all predictors can impact the response variable. But, this is an intuitive approach, failing to identify useful predictors might result in significant loss of information.

Note: For point 4 & 5, make sure you read about online learning algorithms & Stochastic Gradient Descent. These are advanced methods.

Q2. Is rotation necessary in PCA? If yes, Why? What will happen if you don’t rotate the components?

Answer: Yes, rotation (orthogonal) is necessary because it maximizes the difference between variance captured by the component. This makes the components easier to interpret. Not to forget, that’s the motive of doing PCA where, we aim to select fewer components (than features) which can explain the maximum variance in the data set. By doing rotation, the relative location of the components doesn’t change, it only changes the actual coordinates of the points.

If we don’t rotate the components, the effect of PCA will diminish and we’ll have to select more number of components to explain variance in the data set.

Know more: PCA

 

Q3. You are given a data set. The data set has missing values which spread along 1 standard deviation from the median. What percentage of data would remain unaffected? Why?

Answer: This question has enough hints for you to start thinking! Since, the data is spread across median, let’s assume it’s a normal distribution. We know, in a normal distribution, ~68% of the data lies in 1 standard deviation from mean (or mode, median), which leaves ~32% of the data unaffected. Therefore, ~32% of the data would remain unaffected by missing values.

 

Q4. You are given a data set on cancer detection. You’ve build a classification model and achieved an accuracy of 96%. Why shouldn’t you be happy with your model performance? What can you do about it?

Answer: If you have worked on enough data sets, you should deduce that cancer detection results in imbalanced data. In an imbalanced data set, accuracy should not be used as a measure of performance because 96% (as given) might only be predicting majority class correctly, but our class of interest is minority class (4%) which is the people who actually got diagnosed with cancer. Hence, in order to evaluate model performance, we should use Sensitivity (True Positive Rate), Specificity (True Negative Rate), F measure to determine class wise performance of the classifier. If the minority class performance is found to to be poor, we can undertake the following steps:

  1. We can use undersampling, oversampling or SMOTE to make the data balanced.
  2. We can alter the prediction threshold value by doing probability caliberation and finding a optimal threshold using AUC-ROC curve.
  3. We can assign weight to classes such that the minority classes gets larger weight.
  4. We can also use anomaly detection.

Know more: Imbalanced Classification

 

Q5. Why is naive Bayes so ‘naive’ ?

Answer: naive Bayes is so ‘naive’ because it assumes that all of the features in a data set are equally important and independent. As we know, these assumption are rarely true in real world scenario.

 

Q6. Explain prior probability, likelihood and marginal likelihood in context of naiveBayes algorithm?

Answer: Prior probability is nothing but, the proportion of dependent (binary) variable in the data set. It is the closest guess you can make about a class, without any further information. For example: In a data set, the dependent variable is binary (1 and 0). The proportion of 1 (spam) is 70% and 0 (not spam) is 30%. Hence, we can estimate that there are 70% chances that any new email would  be classified as spam.

Likelihood is the probability of classifying a given observation as 1 in presence of some other variable. For example: The probability that the word ‘FREE’ is used in previous spam message is likelihood. Marginal likelihood is, the probability that the word ‘FREE’ is used in any message.

 

Q7. You are working on a time series data set. You manager has asked you to build a high accuracy model. You start with the decision tree algorithm, since you know it works fairly well on all kinds of data. Later, you tried a time series regression model and got higher accuracy than decision tree model. Can this happen? Why?

Answer: Time series data is known to posses linearity. On the other hand, a decision tree algorithm is known to work best to detect non – linear interactions. The reason why decision tree failed to provide robust predictions because it couldn’t map the linear relationship as good as a regression model did. Therefore, we learned that, a linear regression model can provide robust prediction given the data set satisfies its linearity assumptions.

 

Q8. You are assigned a new project which involves helping a food delivery company save more money. The problem is, company’s delivery team aren’t able to deliver food on time. As a result, their customers get unhappy. And, to keep them happy, they end up delivering food for free. Which machine learning algorithm can save them?

Answer: You might have started hopping through the list of ML algorithms in your mind. But, wait! Such questions are asked to test your machine learning fundamentals.

This is not a machine learning problem. This is a route optimization problem. A machine learning problem consist of three things:

  1. There exist a pattern.
  2. You cannot solve it mathematically (even by writing exponential equations).
  3. You have data on it.

Always look for these three factors to decide if machine learning is a tool to solve a particular problem.

 

Q9. You came to know that your model is suffering from low bias and high variance. Which algorithm should you use to tackle it? Why?

Answer:  Low bias occurs when the model’s predicted values are near to actual values. In other words, the model becomes flexible enough to mimic the training data distribution. While it sounds like great achievement, but not to forget, a flexible model has no generalization capabilities. It means, when this model is tested on an unseen data, it gives disappointing results.

In such situations, we can use bagging algorithm (like random forest) to tackle high variance problem. Bagging algorithms divides a data set into subsets made with repeated randomized sampling. Then, these samples are used to generate  a set of models using a single learning algorithm. Later, the model predictions are combined using voting (classification) or averaging (regression).

Also, to combat high variance, we can:

  1. Use regularization technique, where higher model coefficients get penalized, hence lowering model complexity.
  2. Use top n features from variable importance chart. May be, with all the variable in the data set, the algorithm is having difficulty in finding the meaningful signal.

 

Q10. You are given a data set. The data set contains many variables, some of which are highly correlated and you know about it. Your manager has asked you to run PCA. Would you remove correlated variables first? Why?

Answer: Chances are, you might be tempted to say No, but that would be incorrect. Discarding correlated variables have a substantial effect on PCA because, in presence of correlated variables, the variance explained by a particular component gets inflated.

For example: You have 3 variables in a data set, of which 2 are correlated. If you run PCA on this data set, the first principal component would exhibit twice the variance than it would exhibit with uncorrelated variables. Also, adding correlated variables lets PCA put more importance on those variable, which is misleading.

 

Q11. After spending several hours, you are now anxious to build a high accuracy model. As a result, you build 5 GBM models, thinking a boosting algorithm would do the magic. Unfortunately, neither of models could perform better than benchmark score. Finally, you decided to combine those models. Though, ensembled models are known to return high accuracy, but you are unfortunate. Where did you miss?

Answer: As we know, ensemble learners are based on the idea of combining weak learners to create strong learners. But, these learners provide superior result when the combined models are uncorrelated. Since, we have used 5 GBM models and got no accuracy improvement, suggests that the models are correlated. The problem with correlated models is, all the models provide same information.

For example: If model 1 has classified User1122 as 1, there are high chances model 2 and model 3 would have done the same, even if its actual value is 0. Therefore, ensemble learners are built on the premise of combining weak uncorrelated models to obtain better predictions.

 

Q12. How is kNN different from kmeans clustering?

Answer: Don’t get mislead by ‘k’ in their names. You should know that the fundamental difference between both these algorithms is, kmeans is unsupervised in nature and kNN is supervised in nature. kmeans is a clustering algorithm. kNN is a classification (or regression) algorithm.

kmeans algorithm partitions a data set into clusters such that a cluster formed is homogeneous and the points in each cluster are close to each other. The algorithm tries to maintain enough separability between these clusters. Due to unsupervised nature, the clusters have no labels.

kNN algorithm tries to classify an unlabeled observation based on its k (can be any number ) surrounding neighbors. It is also known as lazy learner because it involves minimal training of model. Hence, it doesn’t use training data to make generalization on unseen data set.

 

Q13. How is True Positive Rate and Recall related? Write the equation.

Answer: True Positive Rate = Recall. Yes, they are equal having the formula (TP/TP + FN).

Know more: Evaluation Metrics

 

Q14. You have built a multiple regression model. Your model R² isn’t as good as you wanted. For improvement, your remove the intercept term, your model R² becomes 0.8 from 0.3. Is it possible? How?

Answer: Yes, it is possible. We need to understand the significance of intercept term in a regression model. The intercept term shows model prediction without any independent variable i.e. mean prediction. The formula of R² = 1 – ∑(y – y´)²/∑(y – ymean)² where y´ is predicted value.   

When intercept term is present, R² value evaluates your model wrt. to the mean model. In absence of intercept term (ymean), the model can make no such evaluation, with large denominator, ∑(y - y´)²/∑(y)² equation’s value becomes smaller than actual, resulting in higher R².

 

Q15. After analyzing the model, your manager has informed that your regression model is suffering from multicollinearity. How would you check if he’s true? Without losing any information, can you still build a better model?

Answer: To check multicollinearity, we can create a correlation matrix to identify & remove variables having correlation above 75% (deciding a threshold is subjective). In addition, we can use calculate VIF (variance inflation factor) to check the presence of multicollinearity. VIF value <= 4 suggests no multicollinearity whereas a value of >= 10 implies serious multicollinearity. Also, we can use tolerance as an indicator of multicollinearity.

But, removing correlated variables might lead to loss of information. In order to retain those variables, we can use penalized regression models like ridge or lasso regression. Also, we can add some random noise in correlated variable so that the variables become different from each other. But, adding noise might affect the prediction accuracy, hence this approach should be carefully used.

Know more: Regression

 

Q16. When is Ridge regression favorable over Lasso regression?

Answer: You can quote ISLR’s authors Hastie, Tibshirani who asserted that, in presence of few variables with medium / large sized effect, use lasso regression. In presence of many variables with small / medium sized effect, use ridge regression.

Conceptually, we can say, lasso regression (L1) does both variable selection and parameter shrinkage, whereas Ridge regression only does parameter shrinkage and end up including all the coefficients in the model. In presence of correlated variables, ridge regression might be the preferred choice. Also, ridge regression works best in situations where the least square estimates have higher variance. Therefore, it depends on our model objective.

Know more: Ridge and Lasso Regression

 

Q17. Rise in global average temperature led to decrease in number of pirates around the world. Does that mean that decrease in number of pirates caused the climate change?

Answer: After reading this question, you should have understood that this is a classic case of “causation and correlation”. No, we can’t conclude that decrease in number of pirates caused the climate change because there might be other factors (lurking or confounding variables) influencing this phenomenon.

Therefore, there might be a correlation between global average temperature and number of pirates, but based on this information we can’t say that pirated died because of rise in global average temperature.

Know more: Causation and Correlation

 

Q18. While working on a data set, how do you select important variables? Explain your methods.

Answer: Following are the methods of variable selection you can use:

  1. Remove the correlated variables prior to selecting important variables
  2. Use linear regression and select variables based on p values
  3. Use Forward Selection, Backward Selection, Stepwise Selection
  4. Use Random Forest, Xgboost and plot variable importance chart
  5. Use Lasso Regression
  6. Measure information gain for the available set of features and select top n features accordingly.

 

Q19. What is the difference between covariance and correlation?

Answer: Correlation is the standardized form of covariance.

Covariances are difficult to compare. For example: if we calculate the covariances of salary ($) and age (years), we’ll get different covariances which can’t be compared because of having unequal scales. To combat such situation, we calculate correlation to get a value between -1 and 1, irrespective of their respective scale.

 

Q20. Is it possible capture the correlation between continuous and categorical variable? If yes, how?

Answer: Yes, we can use ANCOVA (analysis of covariance) technique to capture association between continuous and categorical variables.

 

Q21. Both being tree based algorithm, how is random forest different from Gradient boosting algorithm (GBM)?

Answer: The fundamental difference is, random forest uses bagging technique to make predictions. GBM uses boosting techniques to make predictions.

In bagging technique, a data set is divided into n samples using randomized sampling. Then, using a single learning algorithm a model is build on all samples. Later, the resultant predictions are combined using voting or averaging. Bagging is done is parallel. In boosting, after the first round of predictions, the algorithm weighs misclassified predictions higher, such that they can be corrected in the succeeding round. This sequential process of giving higher weights to misclassified predictions continue until a stopping criterion is reached.

Random forest improves model accuracy by reducing variance (mainly). The trees grown are uncorrelated to maximize the decrease in variance. On the other hand, GBM improves accuracy my reducing both bias and variance in a model.

Know more: Tree based modeling

 

Q22. Running a binary classification tree algorithm is the easy part. Do you know how does a tree splitting takes place i.e. how does the tree decide which variable to split at the root node and succeeding nodes?

Answer: A classification trees makes decision based on Gini Index and Node Entropy. In simple words, the tree algorithm find the best possible feature which can divide the data set into purest possible children nodes.

Gini index says, if we select two items from a population at random then they must be of same class and probability for this is 1 if population is pure. We can calculate Gini as following:

  1. Calculate Gini for sub-nodes, using formula sum of square of probability for success and failure (p^2+q^2).
  2. Calculate Gini for split using weighted Gini score of each node of that split

Entropy is the measure of impurity as given by (for binary class):

Entropy, Decision Tree

Here p and q is probability of success and failure respectively in that node. Entropy is zero when a node is homogeneous. It is maximum when a both the classes are present in a node at 50% – 50%.  Lower entropy is desirable.

 

Q23. You’ve built a random forest model with 10000 trees. You got delighted after getting training error as 0.00. But, the validation error is 34.23. What is going on? Haven’t you trained your model perfectly?

Answer: The model has overfitted. Training error 0.00 means the classifier has mimiced the training data patterns to an extent, that they are not available in the unseen data. Hence, when this classifier was run on unseen sample, it couldn’t find those patterns and returned prediction with higher error. In random forest, it happens when we use larger number of trees than necessary. Hence, to avoid these situation, we should tune number of trees using cross validation.

 

Q24. You’ve got a data set to work having p (no. of variable) > n (no. of observation). Why is OLS as bad option to work with? Which techniques would be best to use? Why?

Answer: In such high dimensional data sets, we can’t use classical regression techniques, since their assumptions tend to fail. When p > n, we can no longer calculate a unique least square coefficient estimate, the variances become infinite, so OLS cannot be used at all.

To combat this situation, we can use penalized regression methods like lasso, LARS, ridge which can shrink the coefficients to reduce variance. Precisely, ridge regression works best in situations where the least square estimates have higher variance.

Among other methods include subset regression, forward stepwise regression.

 

11222Q25. What is convex hull ? (Hint: Think SVM)

Answer: In case of linearly separable data, convex hull represents the outer boundaries of the two group of data points. Once convex hull is created, we get maximum margin hyperplane (MMH) as a perpendicular bisector between two convex hulls. MMH is the line which attempts to create greatest separation between two groups.

 

Q26. We know that one hot encoding increasing the dimensionality of a data set. But, label encoding doesn’t. How ?

Answer: Don’t get baffled at this question. It’s a simple question asking the difference between the two.

Using one hot encoding, the dimensionality (a.k.a features) in a data set get increased because it creates a new variable for each level present in categorical variables. For example: let’s say we have a variable ‘color’. The variable has 3 levels namely Red, Blue and Green. One hot encoding ‘color’ variable will generate three new variables as Color.Red, Color.Blue and Color.Green containing 0 and 1 value.

In label encoding, the levels of a categorical variables gets encoded as 0 and 1, so no new variable is created. Label encoding is majorly used for binary variables.

 

Q27. What cross validation technique would you use on time series data set? Is it k-fold or LOOCV?

Answer: Neither.

In time series problem, k fold can be troublesome because there might be some pattern in year 4 or 5 which is not in year 3. Resampling the data set will separate these trends, and we might end up validation on past years, which is incorrect. Instead, we can use forward chaining strategy with 5 fold as shown below:

  • fold 1 : training [1], test [2]
  • fold 2 : training [1 2], test [3]
  • fold 3 : training [1 2 3], test [4]
  • fold 4 : training [1 2 3 4], test [5]
  • fold 5 : training [1 2 3 4 5], test [6]

where 1,2,3,4,5,6 represents “year”.

 

Q28. You are given a data set consisting of variables having more than 30% missing values? Let’s say, out of 50 variables, 8 variables have missing values higher than 30%. How will you deal with them?

Answer: We can deal with them in the following ways:

  1. Assign a unique category to missing values, who knows the missing values might decipher some trend
  2. We can remove them blatantly.
  3. Or, we can sensibly check their distribution with the target variable, and if found any pattern we’ll keep those missing values and assign them a new category while removing others.

 

29. ‘People who bought this, also bought…’ recommendations seen on amazon is a result of which algorithm?

Answer: The basic idea for this kind of recommendation engine comes from collaborative filtering.

Collaborative Filtering algorithm considers “User Behavior” for recommending items. They exploit behavior of other users and items in terms of transaction history, ratings, selection and purchase information. Other users behaviour and preferences over the items are used to recommend items to the new users. In this case, features of the items are not known.

Know more: Recommender System

 

Q30. What do you understand by Type I vs Type II error ?

Answer: Type I error is committed when the null hypothesis is true and we reject it, also known as a ‘False Positive’. Type II error is committed when the null hypothesis is false and we accept it, also known as ‘False Negative’.

In the context of confusion matrix, we can say Type I error occurs when we classify a value as positive (1) when it is actually negative (0). Type II error occurs when we classify a value as negative (0) when it is actually positive(1).

 

Q31. You are working on a classification problem. For validation purposes, you’ve randomly sampled the training data set into train and validation. You are confident that your model will work incredibly well on unseen data since your validation accuracy is high. However, you get shocked after getting poor test accuracy. What went wrong?

Answer: In case of classification problem, we should always use stratified sampling instead of random sampling. A random sampling doesn’t takes into consideration the proportion of target classes. On the contrary, stratified sampling helps to maintain the distribution of target variable in the resultant distributed samples also.

 

Q32. You have been asked to evaluate a regression model based on R², adjusted R² and tolerance. What will be your criteria?

Answer: Tolerance (1 / VIF) is used as an indicator of multicollinearity. It is an indicator of percent of variance in a predictor which cannot be accounted by other predictors. Large values of tolerance is desirable.

We will consider adjusted R² as opposed to R² to evaluate model fit because R² increases irrespective of improvement in prediction accuracy as we add more variables. But, adjusted R² would only increase if an additional variable improves the accuracy of model, otherwise stays same. It is difficult to commit a general threshold value for adjusted R² because it varies between data sets. For example: a gene mutation data set might result in lower adjusted R² and still provide fairly good predictions, as compared to a stock market data where lower adjusted R² implies that model is not good.

 

Q33. In k-means or kNN, we use euclidean distance to calculate the distance between nearest neighbors. Why not manhattan distance ?

Answer: We don’t use manhattan distance because it calculates distance horizontally or vertically only. It has dimension restrictions. On the other hand, euclidean metric can be used in any space to calculate distance. Since, the data points can be present in any dimension, euclidean distance is a more viable option.

Example: Think of a chess board, the movement made by a bishop or a rook is calculated by manhattan distance because of their respective vertical & horizontal movements.

 

Q34. Explain machine learning to me like a 5 year old.

Answer: It’s simple. It’s just like how babies learn to walk. Every time they fall down, they learn (unconsciously) & realize that their legs should be straight and not in a bend position. The next time they fall down, they feel pain. They cry. But, they learn ‘not to stand like that again’. In order to avoid that pain, they try harder. To succeed, they even seek support from the door or wall or anything near them, which helps them stand firm.

This is how a machine works & develops intuition from its environment.

Note: The interview is only trying to test if have the ability of explain complex concepts in simple terms.

 

Q35. I know that a linear regression model is generally evaluated using Adjusted R² or F value. How would you evaluate a logistic regression model?

Answer: We can use the following methods:

  1. Since logistic regression is used to predict probabilities, we can use AUC-ROC curve along with confusion matrix to determine its performance.
  2. Also, the analogous metric of adjusted R² in logistic regression is AIC. AIC is the measure of fit which penalizes model for the number of model coefficients. Therefore, we always prefer model with minimum AIC value.
  3. Null Deviance indicates the response predicted by a model with nothing but an intercept. Lower the value, better the model. Residual deviance indicates the response predicted by a model on adding independent variables. Lower the value, better the model.

Know more: Logistic Regression

 

Q36. Considering the long list of machine learning algorithm, given a data set, how do you decide which one to use?

Answer: You should say, the choice of machine learning algorithm solely depends of the type of data. If you are given a data set which is exhibits linearity, then linear regression would be the best algorithm to use. If you given to work on images, audios, then neural network would help you to build a robust model.

If the data comprises of non linear interactions, then a boosting or bagging algorithm should be the choice. If the business requirement is to build a model which can be deployed, then we’ll use regression or a decision tree model (easy to interpret and explain) instead of black box algorithms like SVM, GBM etc.

In short, there is no one master algorithm for all situations. We must be scrupulous enough to understand which algorithm to use.

 

Q37. Do you suggest that treating a categorical variable as continuous variable would result in a better predictive model?

Answer: For better predictions, categorical variable can be considered as a continuous variable only when the variable is ordinal in nature.

 

Q38. When does regularization becomes necessary in Machine Learning?

Answer: Regularization becomes necessary when the model begins to ovefit / underfit. This technique introduces a cost term for bringing in more features with the objective function. Hence, it tries to push the coefficients for many variables to zero and hence reduce cost term. This helps to reduce model complexity so that the model can become better at predicting (generalizing).

 

Q39. What do you understand by Bias Variance trade off?

Answer:  The error emerging from any model can be broken down into three components mathematically. Following are these component :

error of a model

Bias error is useful to quantify how much on an average are the predicted values different from the actual value. A high bias error means we have a under-performing model which keeps on missing important trends. Variance on the other side quantifies how are the prediction made on same observation different from each other. A high variance model will over-fit on your training population and perform badly on any observation beyond training.

 

Q40. OLS is to linear regression. Maximum likelihood is to logistic regression. Explain the statement.

Answer: OLS and Maximum likelihood are the methods used by the respective regression methods to approximate the unknown parameter (coefficient) value. In simple words,

Ordinary least square(OLS) is a method used in linear regression which approximates the parameters resulting in minimum distance between actual and predicted values. Maximum Likelihood helps in choosing the the values of parameters which maximizes the likelihood that the parameters are most likely to produce observed data.

 

End Notes

You might have been able to answer all the questions, but the real value is in understanding them and generalizing your knowledge on similar questions. If you have struggled at these questions, no worries, now is the time to learn and not perform. You should right now focus on learning these topics scrupulously.

These questions are meant to give you a wide exposure on the types of questions asked at startups in machine learning. I’m sure these questions would leave you curious enough to do deeper topic research at your end. If you are planning for it, that’s a good sign.

Did you like reading this article? Have you appeared in any startup interview recently for data scientist profile? Do share your experience in comments below. I’d love to know your experience.

from:https://www.analyticsvidhya.com/blog/2016/09/40-interview-questions-asked-at-startups-in-machine-learning-data-science/

Data science Python notebooks

 

data-science-ipython-notebooks

Index

 

deep-learning

IPython Notebook(s) demonstrating deep learning functionality.

 

tensor-flow-tutorials

Additional TensorFlow tutorials:

Notebook Description
tsf-basics Learn basic operations in TensorFlow, a library for various kinds of perceptual and language understanding tasks from Google.
tsf-linear Implement linear regression in TensorFlow.
tsf-logistic Implement logistic regression in TensorFlow.
tsf-nn Implement nearest neighboars in TensorFlow.
tsf-alex Implement AlexNet in TensorFlow.
tsf-cnn Implement convolutional neural networks in TensorFlow.
tsf-mlp Implement multilayer perceptrons in TensorFlow.
tsf-rnn Implement recurrent neural networks in TensorFlow.
tsf-gpu Learn about basic multi-GPU computation in TensorFlow.
tsf-gviz Learn about graph visualization in TensorFlow.
tsf-lviz Learn about loss visualization in TensorFlow.

tensor-flow-exercises

Notebook Description
tsf-not-mnist Learn simple data curation by creating a pickle with formatted datasets for training, development and testing in TensorFlow.
tsf-fully-connected Progressively train deeper and more accurate models using logistic regression and neural networks in TensorFlow.
tsf-regularization Explore regularization techniques by training fully connected networks to classify notMNIST characters in TensorFlow.
tsf-convolutions Create convolutional neural networks in TensorFlow.
tsf-word2vec Train a skip-gram model over Text8 data in TensorFlow.
tsf-lstm Train a LSTM character model over Text8 data in TensorFlow.

 

theano-tutorials

Notebook Description
theano-intro Intro to Theano, which allows you to define, optimize, and evaluate mathematical expressions involving multi-dimensional arrays efficiently. It can use GPUs and perform efficient symbolic differentiation.
theano-scan Learn scans, a mechanism to perform loops in a Theano graph.
theano-logistic Implement logistic regression in Theano.
theano-rnn Implement recurrent neural networks in Theano.
theano-mlp Implement multilayer perceptrons in Theano.

 

keras-tutorials

Notebook Description
keras Keras is an open source neural network library written in Python. It is capable of running on top of either Tensorflow or Theano.
setup Learn about the tutorial goals and how to set up your Keras environment.
intro-deep-learning-ann Get an intro to deep learning with Keras and Artificial Neural Networks (ANN).
theano Learn about Theano by working with weights matrices and gradients.
keras-otto Learn about Keras by looking at the Kaggle Otto challenge.
ann-mnist Review a simple implementation of ANN for MNIST using Keras.
conv-nets Learn about Convolutional Neural Networks (CNNs) with Keras.
conv-net-1 Recognize handwritten digits from MNIST using Keras – Part 1.
conv-net-2 Recognize handwritten digits from MNIST using Keras – Part 2.
keras-models Use pre-trained models such as VGG16, VGG19, ResNet50, and Inception v3 with Keras.
auto-encoders Learn about Autoencoders with Keras.
rnn-lstm Learn about Recurrent Neural Networks (RNNs) with Keras.
lstm-sentence-gen Learn about RNNs using Long Short Term Memory (LSTM) networks with Keras.

deep-learning-misc

Notebook Description
deep-dream Caffe-based computer vision program which uses a convolutional neural network to find and enhance patterns in images.

 

scikit-learn

IPython Notebook(s) demonstrating scikit-learn functionality.

Notebook Description
intro Intro notebook to scikit-learn. Scikit-learn adds Python support for large, multi-dimensional arrays and matrices, along with a large library of high-level mathematical functions to operate on these arrays.
knn Implement k-nearest neighbors in scikit-learn.
linear-reg Implement linear regression in scikit-learn.
svm Implement support vector machine classifiers with and without kernels in scikit-learn.
random-forest Implement random forest classifiers and regressors in scikit-learn.
k-means Implement k-means clustering in scikit-learn.
pca Implement principal component analysis in scikit-learn.
gmm Implement Gaussian mixture models in scikit-learn.
validation Implement validation and model selection in scikit-learn.

 

statistical-inference-scipy

IPython Notebook(s) demonstrating statistical inference with SciPy functionality.

Notebook Description
scipy SciPy is a collection of mathematical algorithms and convenience functions built on the Numpy extension of Python. It adds significant power to the interactive Python session by providing the user with high-level commands and classes for manipulating and visualizing data.
effect-size Explore statistics that quantify effect size by analyzing the difference in height between men and women. Uses data from the Behavioral Risk Factor Surveillance System (BRFSS) to estimate the mean and standard deviation of height for adult women and men in the United States.
sampling Explore random sampling by analyzing the average weight of men and women in the United States using BRFSS data.
hypothesis Explore hypothesis testing by analyzing the difference of first-born babies compared with others.

 

pandas

IPython Notebook(s) demonstrating pandas functionality.

Notebook Description
pandas Software library written for data manipulation and analysis in Python. Offers data structures and operations for manipulating numerical tables and time series.
github-data-wrangling Learn how to load, clean, merge, and feature engineer by analyzing GitHub data from the Viz repo.
Introduction-to-Pandas Introduction to Pandas.
Introducing-Pandas-Objects Learn about Pandas objects.
Data Indexing and Selection Learn about data indexing and selection in Pandas.
Operations-in-Pandas Learn about operating on data in Pandas.
Missing-Values Learn about handling missing data in Pandas.
Hierarchical-Indexing Learn about hierarchical indexing in Pandas.
Concat-And-Append Learn about combining datasets: concat and append in Pandas.
Merge-and-Join Learn about combining datasets: merge and join in Pandas.
Aggregation-and-Grouping Learn about aggregation and grouping in Pandas.
Pivot-Tables Learn about pivot tables in Pandas.
Working-With-Strings Learn about vectorized string operations in Pandas.
Working-with-Time-Series Learn about working with time series in pandas.
Performance-Eval-and-Query Learn about high-performance Pandas: eval() and query() in Pandas.

 

matplotlib

IPython Notebook(s) demonstrating matplotlib functionality.

Notebook Description
matplotlib Python 2D plotting library which produces publication quality figures in a variety of hardcopy formats and interactive environments across platforms.
matplotlib-applied Apply matplotlib visualizations to Kaggle competitions for exploratory data analysis. Learn how to create bar plots, histograms, subplot2grid, normalized plots, scatter plots, subplots, and kernel density estimation plots.
Introduction-To-Matplotlib Introduction to Matplotlib.
Simple-Line-Plots Learn about simple line plots in Matplotlib.
Simple-Scatter-Plots Learn about simple scatter plots in Matplotlib.
Errorbars.ipynb Learn about visualizing errors in Matplotlib.
Density-and-Contour-Plots Learn about density and contour plots in Matplotlib.
Histograms-and-Binnings Learn about histograms, binnings, and density in Matplotlib.
Customizing-Legends Learn about customizing plot legends in Matplotlib.
Customizing-Colorbars Learn about customizing colorbars in Matplotlib.
Multiple-Subplots Learn about multiple subplots in Matplotlib.
Text-and-Annotation Learn about text and annotation in Matplotlib.
Customizing-Ticks Learn about customizing ticks in Matplotlib.
Settings-and-Stylesheets Learn about customizing Matplotlib: configurations and stylesheets.
Three-Dimensional-Plotting Learn about three-dimensional plotting in Matplotlib.
Geographic-Data-With-Basemap Learn about geographic data with basemap in Matplotlib.
Visualization-With-Seaborn Learn about visualization with Seaborn.

 

numpy

IPython Notebook(s) demonstrating NumPy functionality.

Notebook Description
numpy Adds Python support for large, multi-dimensional arrays and matrices, along with a large library of high-level mathematical functions to operate on these arrays.
Introduction-to-NumPy Introduction to NumPy.
Understanding-Data-Types Learn about data types in Python.
The-Basics-Of-NumPy-Arrays Learn about the basics of NumPy arrays.
Computation-on-arrays-ufuncs Learn about computations on NumPy arrays: universal functions.
Computation-on-arrays-aggregates Learn about aggregations: min, max, and everything in between in NumPy.
Computation-on-arrays-broadcasting Learn about computation on arrays: broadcasting in NumPy.
Boolean-Arrays-and-Masks Learn about comparisons, masks, and boolean logic in NumPy.
Fancy-Indexing Learn about fancy indexing in NumPy.
Sorting Learn about sorting arrays in NumPy.
Structured-Data-NumPy Learn about structured data: NumPy’s structured arrays.

 

python-data

IPython Notebook(s) demonstrating Python functionality geared towards data analysis.

Notebook Description
data structures Learn Python basics with tuples, lists, dicts, sets.
data structure utilities Learn Python operations such as slice, range, xrange, bisect, sort, sorted, reversed, enumerate, zip, list comprehensions.
functions Learn about more advanced Python features: Functions as objects, lambda functions, closures, *args, **kwargs currying, generators, generator expressions, itertools.
datetime Learn how to work with Python dates and times: datetime, strftime, strptime, timedelta.
logging Learn about Python logging with RotatingFileHandler and TimedRotatingFileHandler.
pdb Learn how to debug in Python with the interactive source code debugger.
unit tests Learn how to test in Python with Nose unit tests.

 

kaggle-and-business-analyses

IPython Notebook(s) used in kaggle competitions and business analyses.

Notebook Description
titanic Predict survival on the Titanic. Learn data cleaning, exploratory data analysis, and machine learning.
churn-analysis Predict customer churn. Exercise logistic regression, gradient boosting classifers, support vector machines, random forests, and k-nearest-neighbors. Includes discussions of confusion matrices, ROC plots, feature importances, prediction probabilities, and calibration/descrimination.

 

spark

IPython Notebook(s) demonstrating spark and HDFS functionality.

Notebook Description
spark In-memory cluster computing framework, up to 100 times faster for certain applications and is well suited for machine learning algorithms.
hdfs Reliably stores very large files across machines in a large cluster.

 

mapreduce-python

IPython Notebook(s) demonstrating Hadoop MapReduce with mrjob functionality.

Notebook Description
mapreduce-python Runs MapReduce jobs in Python, executing jobs locally or on Hadoop clusters. Demonstrates Hadoop Streaming in Python code with unit test and mrjob config file to analyze Amazon S3 bucket logs on Elastic MapReduce. Disco is another python-based alternative.

 

aws

IPython Notebook(s) demonstrating Amazon Web Services (AWS) and AWS tools functionality.

Also check out:

  • SAWS: A Supercharged AWS command line interface (CLI).
  • Awesome AWS: A curated list of libraries, open source repos, guides, blogs, and other resources.
Notebook Description
boto Official AWS SDK for Python.
s3cmd Interacts with S3 through the command line.
s3distcp Combines smaller files and aggregates them together by taking in a pattern and target file. S3DistCp can also be used to transfer large volumes of data from S3 to your Hadoop cluster.
s3-parallel-put Uploads multiple files to S3 in parallel.
redshift Acts as a fast data warehouse built on top of technology from massive parallel processing (MPP).
kinesis Streams data in real time with the ability to process thousands of data streams per second.
lambda Runs code in response to events, automatically managing compute resources.

 

commands

IPython Notebook(s) demonstrating various command lines for Linux, Git, etc.

Notebook Description
linux Unix-like and mostly POSIX-compliant computer operating system. Disk usage, splitting files, grep, sed, curl, viewing running processes, terminal syntax highlighting, and Vim.
anaconda Distribution of the Python programming language for large-scale data processing, predictive analytics, and scientific computing, that aims to simplify package management and deployment.
ipython notebook Web-based interactive computational environment where you can combine code execution, text, mathematics, plots and rich media into a single document.
git Distributed revision control system with an emphasis on speed, data integrity, and support for distributed, non-linear workflows.
ruby Used to interact with the AWS command line and for Jekyll, a blog framework that can be hosted on GitHub Pages.
jekyll Simple, blog-aware, static site generator for personal, project, or organization sites. Renders Markdown or Textile and Liquid templates, and produces a complete, static website ready to be served by Apache HTTP Server, Nginx or another web server.
pelican Python-based alternative to Jekyll.
django High-level Python Web framework that encourages rapid development and clean, pragmatic design. It can be useful to share reports/analyses and for blogging. Lighter-weight alternatives include Pyramid, Flask, Tornado, and Bottle.

misc

IPython Notebook(s) demonstrating miscellaneous functionality.

Notebook Description
regex Regular expression cheat sheet useful in data wrangling.
algorithmia Algorithmia is a marketplace for algorithms. This notebook showcases 4 different algorithms: Face Detection, Content Summarizer, Latent Dirichlet Allocation and Optical Character Recognition.

notebook-installation

anaconda

Anaconda is a free distribution of the Python programming language for large-scale data processing, predictive analytics, and scientific computing that aims to simplify package management and deployment.

Follow instructions to install Anaconda or the more lightweight miniconda.

dev-setup

For detailed instructions, scripts, and tools to set up your development environment for data analysis, check out the dev-setup repo.

running-notebooks

To view interactive content or to modify elements within the IPython notebooks, you must first clone or download the repository then run the notebook. More information on IPython Notebooks can be found here.

$ git clone https://github.com/donnemartin/data-science-ipython-notebooks.git
$ cd data-science-ipython-notebooks
$ jupyter notebook

Notebooks tested with Python 2.7.x.

credits

contributing

Contributions are welcome! For bug reports or requests please submit an issue.

contact-info

Feel free to contact me to discuss any issues, questions, or comments.

license

This repository contains a variety of content; some developed by Donne Martin, and some from third-parties. The third-party content is distributed under the license provided by those parties.

The content developed by Donne Martin is distributed under the following license:

I am providing code and resources in this repository to you under an open source license. Because this is my personal repository, the license you receive to my code and resources is from me and not my employer (Facebook).

Copyright 2015 Donne Martin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License

Flume+Kafka+Storm+Redis实时分析系统基本架构

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同。这篇文章的目的只是带大家入个门,让大家对实时分析技术有一个简单的认识,并和大家一起做学习交流。
文章的最后还有Troubleshooting,分享了作者在部署本文示例程序过程中所遇到的各种问题和解决方案。

系统基本架构

整个实时分析系统的架构就是先由电商系统的订单服务器产生订单日志, 然后使用Flume去监听订单日志,并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 接着由Storm系统消费Kafka中的消息,同时消费记录由Zookeeper集群管理,这样即使Kafka宕机重启后也能找到上次的消费记录,接着从上次宕机点继续从Kafka的Broker中进行消费。但是由于存在先消费后记录日志或者先记录后消费的非原子操作,如果出现刚好消费完一条消息并还没将信息记录到Zookeeper的时候就宕机的类似问题,或多或少都会存在少量数据丢失或重复消费的问题, 其中一个解决方案就是Kafka的Broker和Zookeeper都部署在同一台机子上。接下来就是使用用户定义好的Storm Topology去进行日志信息的分析并输出到Redis缓存数据库中(也可以进行持久化),最后用Web APP去读取Redis中分析后的订单信息并展示给用户。之所以在Flume和Storm中间加入一层Kafka消息系统,就是因为在高并发的条件下, 订单日志的数据会井喷式增长,如果Storm的消费速度(Storm的实时计算能力那是最快之一,但是也有例外, 而且据说现在Twitter的开源实时计算框架Heron比Storm还要快)慢于日志的产生速度,加上Flume自身的局限性,必然会导致大量数据滞后并丢失,所以加了Kafka消息系统作为数据缓冲区,而且Kafka是基于log File的消息系统,也就是说消息能够持久化在硬盘中,再加上其充分利用Linux的I/O特性,提供了可观的吞吐量。架构中使用Redis作为数据库也是因为在实时的环境下,Redis具有很高的读写速度。

业务背景
各大电商网站在合适的时间进行各种促销活动已是常态,在能为网站带来大量的流量和订单的同时,对于用户也有不小的让利,必然是大家伙儿喜闻乐见的。在促销活动期间,老板和运营希望能实时看到订单情况,老板开心,运营也能根据实时的订单数据调整运营策略,而让用户能实时看到网站的订单数据,也会勾起用户的购买欲。但是普通的离线计算系统已然不能满足在高并发环境下的实时计算要求,所以我们得使用专门实时计算系统,如:Storm, Heron, Spark Stream等,去满足类似的需求。
既然要分析订单数据,那必然在订单产生的时候要把订单信息记录在日志文件中。本文中,作者通过使用log4j2,以及结合自己之前开发电商系统的经验,写了一个订单日志生成模拟器,代码如下,能帮助大家随机产生订单日志。下面所展示的订单日志文件格式和数据就是我们本文中的分析目标,本文的案例中用来分析所有商家的订单总销售额并找出销售额钱20名的商家。

订单数据格式:
orderNumber: XX | orderDate: XX | paymentNumber: XX | paymentDate: XX | merchantName: XX | sku: [ skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;] | price: [ totalPrice: XX discount: XX paymentPrice: XX ]

订单日志生成程序:
使用log4j2将日志信息写入文件中,每小时滚动一次日志文件

  1.       
  2.         
  3.           
  4.         
  5.         
  6.           filePattern=”/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log”>
  7.             
  8.             
  9.                 
  10.             
  11.         
  12.       
  13.       
  14.         
  15.           
  16.           
  17.         
  18.       


      
        
          
        
    	
        	
       		
        		
      		
                  		
      
      
        
          
          
        
      

生成器代码:

  1. package com.guludada.ordersInfo;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Random;
  5. // Import log4j classes.
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. public class ordersInfoGenerator {
  9.     public enum paymentWays {
  10.         Wechat,Alipay,Paypal
  11.     }
  12.     public enum merchantNames {
  13.         优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
  14.         暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
  15.     }
  16.     public enum productNames {
  17.         黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
  18.         沙滩拖鞋
  19.     }
  20.     float[] skuPriceGroup = {299,399,699,899,1000,2000};
  21.     float[] discountGroup = {10,20,50,100};
  22.     float totalPrice = 0;
  23.     float discount = 0;
  24.     float paymentPrice = 0;
  25.     private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
  26.     private int logsNumber = 1000;
  27.     public void generate() {
  28.         for(int i = 0; i <= logsNumber; i++) {
  29.             logger.info(randomOrderInfo());
  30.         }
  31.     }
  32.     public String randomOrderInfo() {
  33.         SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
  34.         Date date = new Date();
  35.         String orderNumber = randomNumbers(5) + date.getTime();
  36.         String orderDate = sdf.format(date);
  37.         String paymentNumber = randomPaymentWays() + “-” + randomNumbers(8);
  38.         String paymentDate = sdf.format(date);
  39.         String merchantName = randomMerchantNames();
  40.         String skuInfo = randomSkus();
  41.         String priceInfo = calculateOrderPrice();
  42.         return “orderNumber: ” + orderNumber + ” | orderDate: ” + orderDate + ” | paymentNumber: ” +
  43.             paymentNumber + ” | paymentDate: ” + paymentDate + ” | merchantName: ” + merchantName +
  44.             ” | sku: ” + skuInfo + ” | price: ” + priceInfo;
  45.     }
  46.     private String randomPaymentWays() {
  47.         paymentWays[] paymentWayGroup = paymentWays.values();
  48.         Random random = new Random();
  49.         return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
  50.     }
  51.     private String randomMerchantNames() {
  52.         merchantNames[] merchantNameGroup = merchantNames.values();
  53.         Random random = new Random();
  54.         return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
  55.     }
  56.     private String randomProductNames() {
  57.         productNames[] productNameGroup = productNames.values();
  58.         Random random = new Random();
  59.         return productNameGroup[random.nextInt(productNameGroup.length)].name();
  60.     }
  61.     private String randomSkus() {
  62.         Random random = new Random();
  63.         int skuCategoryNum = random.nextInt(3);
  64.         String skuInfo =”[“;
  65.         totalPrice = 0;
  66.         for(int i = 1; i <= 3; i++) {
  67.             int skuNum = random.nextInt(3)+1;
  68.             float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
  69.             float totalSkuPrice = skuPrice * skuNum;
  70.             String skuName = randomProductNames();
  71.             String skuCode = randomCharactersAndNumbers(10);
  72.             skuInfo += ” skuName: ” + skuName + ” skuNum: ” + skuNum + ” skuCode: ” + skuCode
  73.                     + ” skuPrice: ” + skuPrice + ” totalSkuPrice: ” + totalSkuPrice + “;”;
  74.             totalPrice += totalSkuPrice;
  75.         }
  76.         skuInfo += ” ]”;
  77.         return skuInfo;
  78.     }
  79.     private String calculateOrderPrice() {
  80.         Random random = new Random();
  81.         discount = discountGroup[random.nextInt(discountGroup.length)];
  82.         paymentPrice = totalPrice – discount;
  83.         String priceInfo = “[ totalPrice: ” + totalPrice + ” discount: ” + discount + ” paymentPrice: ” + paymentPrice +” ]”;
  84.         return priceInfo;
  85.     }
  86.     private String randomCharactersAndNumbers(int length) {
  87.         String characters = “abcdefghijklmnopqrstuvwxyz0123456789”;
  88.         String randomCharacters = “”;
  89.                 Random random = new Random();
  90.                 for (int i = 0; i < length; i++) {
  91.               randomCharacters += characters.charAt(random.nextInt(characters.length()));
  92.                 }
  93.                 return randomCharacters;
  94.     }
  95.     private String randomNumbers(int length) {
  96.         String characters = “0123456789”;
  97.         String randomNumbers = “”;
  98.                 Random random = new Random();
  99.                 for (int i = 0; i < length; i++) {
  100.              randomNumbers += characters.charAt(random.nextInt(characters.length()));
  101.                 }
  102.                return randomNumbers;
  103.     }
  104.     public static void main(String[] args) {
  105.         ordersInfoGenerator generator = new ordersInfoGenerator();
  106.         generator.generate();
  107.     }
  108. }
package com.guludada.ordersInfo;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class ordersInfoGenerator {
	
	public enum paymentWays {
		Wechat,Alipay,Paypal
	}
	public enum merchantNames {
		优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
		暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
	}
	
	public enum productNames {
		黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
		沙滩拖鞋
	}
	
	float[] skuPriceGroup = {299,399,699,899,1000,2000};
	float[] discountGroup = {10,20,50,100};
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
	private int logsNumber = 1000;
	
	public void generate() {
				
		for(int i = 0; i <= logsNumber; i++) {			
			logger.info(randomOrderInfo());			
		}
	}
	
	public String randomOrderInfo() {
		
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");		
		Date date = new Date();		
		
		String orderNumber = randomNumbers(5) + date.getTime();
		
		String orderDate = sdf.format(date);
		
		String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
		
		String paymentDate = sdf.format(date);
		
		String merchantName = randomMerchantNames();
		
		String skuInfo = randomSkus();
		
		String priceInfo = calculateOrderPrice();
		
		return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
			paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName + 
			" | sku: " + skuInfo + " | price: " + priceInfo;
	}
		
	private String randomPaymentWays() {
		
		paymentWays[] paymentWayGroup = paymentWays.values();
		Random random = new Random();
		return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
	}
	
	private String randomMerchantNames() {
		
		merchantNames[] merchantNameGroup = merchantNames.values();
		Random random = new Random();
		return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
	}
	
	private String randomProductNames() {
		
		productNames[] productNameGroup = productNames.values();
		Random random = new Random();
		return productNameGroup[random.nextInt(productNameGroup.length)].name();
	}
	
	
	private String randomSkus() {
		
		Random random = new Random();
		int skuCategoryNum = random.nextInt(3);
		
		String skuInfo ="[";
		
		totalPrice = 0;
		for(int i = 1; i <= 3; i++) {
			
			int skuNum = random.nextInt(3)+1;
			float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
			float totalSkuPrice = skuPrice * skuNum;			
			String skuName = randomProductNames();
			String skuCode = randomCharactersAndNumbers(10);
			skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
					+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";		
			totalPrice += totalSkuPrice;
		}
		
		
		skuInfo += " ]";
		
		return skuInfo;
	}
	
	private String calculateOrderPrice() {
		
		Random random = new Random();
		discount = discountGroup[random.nextInt(discountGroup.length)];
		paymentPrice = totalPrice - discount;
		
		String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
		
		return priceInfo;
	}
	
	private String randomCharactersAndNumbers(int length) {
		
		String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
		String randomCharacters = "";  
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	  randomCharacters += characters.charAt(random.nextInt(characters.length()));  
                }  
                return randomCharacters;  
	}
	
	private String randomNumbers(int length) {
		
		String characters = "0123456789";
		String randomNumbers = "";   
                Random random = new Random();  
                for (int i = 0; i < length; i++) {  
        	 randomNumbers += characters.charAt(random.nextInt(characters.length()));  
                }  
               return randomNumbers;		
	}
	
	public static void main(String[] args) {
		
		ordersInfoGenerator generator = new ordersInfoGenerator();
		generator.generate();
	}
}

收集日志数据
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,FlumeApache的一个顶级项目,与Kafka也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。

Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件(Flume Event)并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分给下一个装在分布式系统中其它服务器上的Flume Agent进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。

在本文中,Flume的Source我们选择的是Exec Source,因为是实时系统,直接通过tail 命令来监听日志文件,而在Kafka的Broker集群端的Flume我们选择Kafka Sink 来把数据下沉到Kafka消息系统中。

下图是来自Flume官网里的Flume拉取数据的架构图:

图片来源:http://flume.apache.org/FlumeUserGuide.html

订单日志产生端的Flume配置文件如下:

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = exec
  5. agent.sources.origin.command = tail -F /export/data/trivial/app.log
  6. agent.sources.origin.channels = memorychannel
  7. agent.sources.origin.interceptors = i1
  8. agent.sources.origin.interceptors.i1.type = static
  9. agent.sources.origin.interceptors.i1.key = topic
  10. agent.sources.origin.interceptors.i1.value = ordersInfo
  11. agent.sinks.loggerSink.type = logger
  12. agent.sinks.loggerSink.channel = memorychannel
  13. agent.channels.memorychannel.type = memory
  14. agent.channels.memorychannel.capacity = 10000
  15. agent.sinks.target.type = avro
  16. agent.sinks.target.channel = memorychannel
  17. agent.sinks.target.hostname = 172.16.124.130
  18. agent.sinks.target.port = 4545
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /export/data/trivial/app.log
agent.sources.origin.channels = memorychannel

agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545

Kafka消息系统端Flume配置文件

  1. agent.sources = origin
  2. agent.channels = memorychannel
  3. agent.sinks = target
  4. agent.sources.origin.type = avro
  5. agent.sources.origin.channels = memorychannel
  6. agent.sources.origin.bind = 0.0.0.0
  7. agent.sources.origin.port = 4545
  8. agent.sinks.loggerSink.type = logger
  9. agent.sinks.loggerSink.channel = memorychannel
  10. agent.channels.memorychannel.type = memory
  11. agent.channels.memorychannel.capacity = 5000000
  12. agent.channels.memorychannel.transactionCapacity = 1000000
  13. agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
  14. #agent.sinks.target.topic = bigdata
  15. agent.sinks.target.brokerList=localhost:9092
  16. agent.sinks.target.requiredAcks=1
  17. agent.sinks.target.batchSize=100
  18. agent.sinks.target.channel = memorychannel
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
#agent.sinks.target.topic = bigdata
agent.sinks.target.brokerList=localhost:9092
agent.sinks.target.requiredAcks=1
agent.sinks.target.batchSize=100
agent.sinks.target.channel = memorychannel

这里需要注意的是,在日志服务器端的Flume agent中我们配置了一个interceptors,这个是用来为Flume Event(Flume Event就是拉取到的一行行的日志信息)的头部添加key为“topic”的K-V键值对,这样这条抓取到的日志信息就会根据topic的值去到Kafka中指定的topic消息池中,当然还可以为Flume Event额外配置一个key为“Key”的键值对,Kafka Sink会根据key“Key”的值将这条日志信息下沉到不同的Kafka分片上,否则就是随机分配。在Kafka集群端的Flume配置里,有几个重要的参数需要注意,“topic”是指定抓取到的日志信息下沉到Kafka哪一个topic池中,如果之前Flume发送端为Flume Event添加了带有topic的头信息,则这里可以不用配置;brokerList就是配置Kafka集群的主机地址和端口;requireAcks=1是配置当下沉到Kafka的消息储存到特定partition的leader中成功后就返回确认消息,requireAcks=0是不需要确认消息成功写入Kafka中,requireAcks=-1是指不光需要确认消息被写入partition的leander中,还要确认完成该条消息的所有备份;batchSize配置每次下沉多少条消息,每次下沉的数量越多延迟也高。

Kafka消息系统
这一部分我们将谈谈Kafka的配置和使用,Kafka在我们的系统中实际上就相当于起到一个数据缓冲池的作用, 有点类似于ActiveQ的消息队列和Redis这样的缓存区的作用,但是更可靠,因为是基于log File的消息系统,数据不容易丢失,以及能记录数据的消费位置并且用户还可以自定义消息消费的起始位置,这就使得重复消费消息也可以得以实现,而且同时具有队列和发布订阅两种消息消费模式,十分灵活,并且与Storm的契合度很高,充分利用Linux系统的I/O提高读写速度等等。另一个要提的方面就是Kafka的Consumer是pull-based模型的,而Flume是push-based模型。push-based模型是尽可能大的消费数据,但是当生产者速度大于消费者时数据会被覆盖。而pull-based模型可以缓解这个压力,消费速度可以慢于生产速度,有空余时再拉取那些没拉取到的数据。

Kafka是一个分布式的高吞吐量的消息系统,同时兼有点对点和发布订阅两种消息消费模式Kafka主要由Producer,Consumer和Broker组成。Kafka中引入了一个叫“topic”的概念,用来管理不同种类的消息,不同类别的消息会记录在到其对应的topic池中,而这些进入到topic中的消息会被Kafka写入磁盘的log文件中进行持久化处理。Kafka会把消息写入磁盘的log file中进行持久化对于每一个topic里的消息log文件,Kafka都会对其进行分片处理,而每一个消息都会顺序写入中log分片中,并且被标上“offset”的标量来代表这条消息在这个分片中的顺序,并且这些写入的消息无论是内容还是顺序都是不可变的。所以Kafka和其它消息队列系统的一个区别就是它能做到分片中的消息是能顺序被消费的,但是要做到全局有序还是有局限性的,除非整个topic只有一个log分片。并且无论消息是否有被消费,这条消息会一直保存在log文件中,当留存时间足够长到配置文件中指定的retention的时间后,这条消息才会被删除以释放空间。对于每一个Kafka的Consumer,它们唯一要存的Kafka相关的元数据就是这个“offset”值,记录着Consumer在分片上消费到了哪一个位置。通常Kafka是使用Zookeeper来为每一个Consumer保存它们的offset信息,所以在启动Kafka之前需要有一个Zookeeper集群;而且Kafka默认采用的是先记录offset再读取数据的策略,这种策略会存在少量数据丢失的可能。不过用户可以灵活设置Consumer的“offset”的位置,在加上消息记录在log文件中,所以是可以重复消费消息的。log的分片和它们的备份分散保存在集群的服务器上,对于每一个partition,在集群上都会有一台这个partition存在服务器作为leader,而这个partitionpartition的其它备份所在的服务器做为follower,leader负责处理关于这个partition所有请求,而follower负责这个partition的其它备份的同步工作,当leader服务器宕机时,其中一个follower服务器就会被选举为新的leader。

一般的消息系统分为两种模式,一种是点对点的消费模式,也就是queuing模式,另一种是发布订阅模式,也就是publish-subscribe模式,而Kafka引入了一个Consumer Group的概念,使得其能兼有两种模式。在Kafka中,每一个consumer都会标明自己属于哪个consumer group,每个topic的消息都会分发给每一个subscribe了这个topic的所有consumer group中的一个consumer实例。所以当所有的consumers都在同一个consumer group中,那么就像queuing的消息系统,一个message一次只被一个consumer消费。如果每一个consumer都有不同consumer group,那么就像public-subscribe消息系统一样,一个消息分发给所有的consumer实例。对于普通的消息队列系统,可能存在多个consumer去同时消费message,虽然message是有序地分发出去的,但是由于网络延迟的时候到达不同的consumer的时间不是顺序的,这时就失去了顺序性,解决方案是只用一个consumer去消费message,但显然不太合适。而对于Kafka来说,一个partiton只分发给每一个consumer group中的一个consumer实例,也就是说这个partition只有一个consumer实例在消费,所以可以保证在一个partition内部数据的处理是有序的,不同之处就在于Kafka内部消息进行了分片处理,虽然看上去也是单consumer的做法,但是分片机制保证了并发消费。如果要做到全局有序,那么整个topic中的消息只有一个分片,并且每一个consumer group中只能有一个consumer实例。这实际上就是彻底牺牲了消息消费时的并发度。

 

Kafka的配置和部署十分简单
1. 首先启动Zookeeper集群,Kafka需要Zookeeper集群来帮助记录每一个Consumer的offset
2. 为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有两个节点的Kafka集群,那么节点1和节点2的最基本的配置如下:

  1. config/server-1.properties:
  2.     broker.id=1
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=export/data/kafka
    zookeeper.connect=localhost:2181
  1. config/server-2.properties:
  2.     broker.id=2
  3.     listeners=PLAINTEXT://:9093
  4.     log.dir=/export/data/kafka
  5.     zookeeper.connect=localhost:2181
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9093
    log.dir=/export/data/kafka
    zookeeper.connect=localhost:2181

broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。
3. 配置完上面的配置文件后,只要分别在节点上
输入下面命令启动Kafka进程就可以使用了

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...


Storm实时计算框架

接下来开始介绍本篇文章要使用的实时计算框架StormStrom是一个非常快的实时计算框架,至于快到什么程度呢?官网首页给出的数据是每一个Storm集群上的节点每一秒能处理一百万条数据。相比Hadoop“Mapreduce”计算框架,Storm使用的是"Topology"Mapreduce程序在计算完成后最终会停下来,而Topology则是会永远运行下去除非你显式地使用“kill -9 XXX”命令停掉它。和大多数的集群系统一样,Storm集群也存在着Master节点和Worker节点,在Master节点上运行的一个守护进程叫“Nimbus”,类似于Hadoop“JobTracker”的功能,负责集群中计算程序的分发,任务的分发,监控任务和工作节点的运行情况等;Worker节点上运行的守护进程叫“Supervisor”,负责接收Nimbus分发的任务并运行,每一个Worker上都会运行着Topology程序的一部分,而一个Topology程序的运行就是由集群上多个Worker一起协同工作的。值得注意的是NimubsSupervisor之间的协调工作也是通过Zookeeper来管理的,NimbusSupervisor自己本身在集群上都是无状态的,它们的状态都保存在Zookeeper上,所以任何节点的宕机和动态扩容都不会影响整个集群的工作运行,并支持fast-fail机制。

Storm有一个很重要的对数据的抽象概念,叫做“Stream”,我们姑且称之为数据流,数据流Stream就是由之间没有任何关系的松散的一个一个的数据元组“tuples”所组成的序列。要在Storm上做实时计算,首先你得有一个计算程序,这就是“Topology”,一个Topology程序由“Spout”“Bolt”共同组成。Storm就是通过Topology程序将数据流Stream通过可靠(ACK机制)的分布式计算生成我们的目标数据流Stream,就比如说把婚恋网站上当日注册的所有用户信息数据流Stream通过Topology程序计算出月收入上万年龄在30岁以下的新的用户信息流Stream。在我们的文章中,Spout就是实现了特定接口的Java类,它相当于数据源,用于产生数据或者从外部接收数据;而Bolt就是实现了Storm Bolt接口的Java类,用于消费从Spout发送出来的数据流并实现用户自定义的数据处理逻辑;对于复杂的数据处理,可以定义多个连续的Bolt去协同处理。最后在程序中通过SpoutBolt生成Topology对象并提交到Storm集群上执行。

tuplesStorm的数据模型,,由值和其所对应的field所组成,比如说在SpoutBolt中定义了发出的元组的field为:(name,age,gender),那么从这个SpoutBolt中发出的数据流的每一个元组值就类似于(''咕噜大大",27,"中性")Storm中还有一个Stream Group的概念,它用来决定从Spout或或或Bolt组件中发出的tuples接下来应该传到哪一个组件中或者更准确地说在程序里设置某个组件应该接收来自哪一个组件的tuples; 并且在Storm中提供了多个用于数据流分组的机制,比如说shuffleGrouping,用来将当前组件产生的tuples随机分发到下一个组件中,或者 fieldsGrouping,根据tuplesfield值来决定当前组件产生的tuples应该分发到哪一个组件中。
 
另一部分需要了解的就是Stormtasksworkers的概念。每一个worker都是一个运行在物理机器上的JVM进程,每个worker中又运行着多个task线程,这些task线程可能是Spout任务也可能是Bolt任务,由Nimbus根据RoundRobin负载均衡策略来分配,而至于在整个Topology程序里要起几个Spout线程或Bolt线程,也就是tasks,由用户在程序中设置并发度来决定。

Storm集群的配置文件如下:
Storm的配置文件在项目的conf目录下,也就是:conf/storm.yaml

  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements.  See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership.  The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License.  You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. ########### These MUST be filled in for a storm configuration
  17. storm.zookeeper.servers:
  18.   - "ymhHadoop"
  19.   - "ymhHadoop2"
  20.   - "ymhHadoop3"
  21. storm.local.dir: "/export/data/storm/workdir"
  22. nimbus.host: "ymhHadoop"
  23. supervisor.slots.ports:
  24.   -6700
  25.   -6701
  26.   -6702
  27.   -6703
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
  - "ymhHadoop"
  - "ymhHadoop2"
  - "ymhHadoop3"    

storm.local.dir: "/export/data/storm/workdir"
 
nimbus.host: "ymhHadoop"

supervisor.slots.ports:
  -6700
  -6701
  -6702
  -6703 
 
storm.zookeeper.servers自然就是用来配置我们熟悉的Zookeeper集群中各个节点的URI地址和端口的
storm.local.dir 是用来配置storm节点相关文件的存储目录的,每一个storm集群的节点在本地服务器上都要有一个目录存储少量的和该节点有关的一些信息。记得要开发这个目录的读写权限哦
nimbus.host 自然就是用来指定nimbus服务器的URI
supervisor.slots.ports 这个是用来配置supervisor服务器启动的worker所监听的端口,每一个worker就是一个物理的JVM进程。上面这些是基本配置,并且要严格按照上面的格式来,少一个空格都会报错。

接下来就是将配置文件拷贝到集群的各个机器上,然后在分别在nimbussupervisor机器上通过$bin/storm nimbus $bin/storm supervisor命令来启动集群上的机子。最后在nimbus上通过$bin/storm UI 命令可以启动Storm提供的UI界面,功能十分强大,可以监控集群上各个节点的运行状态,提交Topology任务,监控Topology任务的运行情况等。这个UI界面可以通过http://{nimbus host}:8080的地址访问到。



Redis数据库

Redis是一个基于内存的多种数据结构的存储工具,经常有人说Redis是一个基于key-value数据结构的缓存数据库,这种说法必然是不准确的,Key-Value只是其中的一种数据结构的实现,Redis支持Stringshasheslistssetssorted sets等多种常见的数据结构,并提供了功能强大的范围查询,以及提供了INCRINCRBY,DECR,DECRBY等多种原子命令操作,保证在并发的环境下不会出现脏数据。虽然Redis是基于内存的数据库,但也提供了多种硬盘持久化策略,比如说RDB策略,用来将某个时间点的Redis的数据快照存储在硬盘中,或者是AOF策略,将每一个Redis操作命令都不可变的顺序记录在log文件中,恢复数据时就将log文件中的所有命令顺序执行一遍等等。Redis不光可以作为网站热点数据的缓存服务器,还可以用来做数据库,或者消息队列服务器的broker等。在本文中选择Redis作为订单分析结果的存储工具,一方面是其灵活的数据结构和强大的数据操作命令,另一方面就是在大数据的实时计算环境下,需要Redis这样的具备高速I/O的数据库。
 
在本文的例子中,作者使用Sorted Sets数据结构来存储各个商家的总订单销售额,Sorted Sets数据结构由Key, Scoreelement value 三部分组成,Set的数据结构保证同一个key中的元素值不会重复,而在Sorted Sets结构中是通过 Score来为元素值排序,这很自然地就能将各个商家的总订单销售额设置为Score,然后商家名称为element value,这样就能根据总订单销售额来为商家排序。在Storm程序中,我们通过Jedis API来调用Redis
$ZINCRBY KEY INCREMENT MEMBER
的命令来统计商家总销售额, ZINCRBY是一个原子命令,能保证在Storm的并发计算的环境下,正确地增加某个商家的Score的值,也就是它们的订单总销售额。而对于两个商家同名这种情况应该在业务系统中去避免而不应该由我们的数据分析层来处理。最后提一个小trips,就是如果所有商家的Score都设置成相同的分数,那么Redis就会默认使用商家名的字母字典序来排序。

Kafka+Storm+Redis的整合
当数据被Flume拉取进Kafka消息系统中,我们就可以使用Storm来进行消费,Redis来对结果进行存储。StormKafka有很好的兼容性,我们可以通过Kafka Spout来从Kafka中获取数据;在Bolt处理完数据后,通过Jedis API在程序中将数据存储在Redis数据库中。

下面就是Kafka Spout和创建Topology的程序代码:

BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
zkHosts是用来指定Zookeeper集群的节点的URI和端口,而Zookeeper集群是用来记录SpoutKafka消息消费的offset位置

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
主要是用来将SpoutKafka拉取来的byte[]数组格式的数据转化为Stormtuples

  1. package com.guludada.ordersanalysis;
  2. import java.util.UUID;
  3. import backtype.storm.Config;
  4. import backtype.storm.LocalCluster;
  5. import backtype.storm.StormSubmitter;
  6. import backtype.storm.generated.AlreadyAliveException;
  7. import backtype.storm.generated.InvalidTopologyException;
  8. import backtype.storm.spout.SchemeAsMultiScheme;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. import storm.kafka.Broker;
  12. import storm.kafka.BrokerHosts;
  13. import storm.kafka.KafkaSpout;
  14. import storm.kafka.SpoutConfig;
  15. import storm.kafka.StaticHosts;
  16. import storm.kafka.StringScheme;
  17. import storm.kafka.ZkHosts;
  18. import storm.kafka.trident.GlobalPartitionInformation;
  19. public class ordersAnalysisTopology {
  20.     private static String topicName = "ordersInfo";
  21.     private static String zkRoot = "/stormKafka/"+topicName;
  22.     public static void main(String[] args) {
  23.         BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
  24.         SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
  25.         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  26.         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  27.         TopologyBuilder builder = new TopologyBuilder();
  28.         builder.setSpout("kafkaSpout",kafkaSpout);
  29.         builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");
  30.         Config conf = new Config();
  31.         conf.setDebug(true);
  32.         if(args != null && args.length > 0) {
  33.             conf.setNumWorkers(1);
  34.             try {
  35.                 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  36.             } catch (AlreadyAliveException e) {
  37.                 // TODO Auto-generated catch block
  38.                 e.printStackTrace();
  39.             } catch (InvalidTopologyException e) {
  40.                 // TODO Auto-generated catch block
  41.                 e.printStackTrace();
  42.             }
  43.         } else {
  44.             conf.setMaxSpoutPending(3);
  45.             LocalCluster cluster = new LocalCluster();
  46.             cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
  47.         }
  48.     }
  49. }
package com.guludada.ordersanalysis;

import java.util.UUID;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.Broker;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.GlobalPartitionInformation;

public class ordersAnalysisTopology {
	
	private static String topicName = "ordersInfo";
	private static String zkRoot = "/stormKafka/"+topicName;
	
	public static void main(String[] args) {
		
		BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");

		
		SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();        
		builder.setSpout("kafkaSpout",kafkaSpout);        
		builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");

		Config conf = new Config();
		conf.setDebug(true);
		
		if(args != null && args.length > 0) {
			conf.setNumWorkers(1);
			try {
				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} else {
			
			conf.setMaxSpoutPending(3);
			
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
			
			
		}

	}
}

下面是Bolt程序,主要是用来处理从Kafka拉取到的订单日志信息, 并计算出所有商家的总订单收入,然后使用Jedis API将计算结果存入到Redis数据库中。

 

  1. package com.guludada.domain;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. public class ordersBean {
  5.     Date createTime = null;
  6.     String number = "";
  7.     String paymentNumber = "";
  8.     Date paymentDate = null;
  9.     String merchantName = "";
  10.     ArrayList skuGroup = null;
  11.     float totalPrice = 0;
  12.     float discount = 0;
  13.     float paymentPrice = 0;
  14.     public Date getCreateTime() {
  15.         return createTime;
  16.     }
  17.     public void setCreateTime(Date createTime) {
  18.         this.createTime = createTime;
  19.     }
  20.     public String getNumber() {
  21.         return number;
  22.     }
  23.     public void setNumber(String number) {
  24.         this.number = number;
  25.     }
  26.     public String getPaymentNumber() {
  27.         return paymentNumber;
  28.     }
  29.     public void setPaymentNumber(String paymentNumber) {
  30.         this.paymentNumber = paymentNumber;
  31.     }
  32.     public Date getPaymentDate() {
  33.         return paymentDate;
  34.     }
  35.     public void setPaymentDate(Date paymentDate) {
  36.         this.paymentDate = paymentDate;
  37.     }
  38.     public String getMerchantName() {
  39.         return merchantName;
  40.     }
  41.     public void setMerchantName(String merchantName) {
  42.         this.merchantName = merchantName;
  43.     }
  44.     public ArrayList getSkuGroup() {
  45.         return skuGroup;
  46.     }
  47.     public void setSkuGroup(ArrayList skuGroup) {
  48.         this.skuGroup = skuGroup;
  49.     }
  50.     public float getTotalPrice() {
  51.         return totalPrice;
  52.     }
  53.     public void setTotalPrice(float totalPrice) {
  54.         this.totalPrice = totalPrice;
  55.     }
  56.     public float getDiscount() {
  57.         return discount;
  58.     }
  59.     public void setDiscount(float discount) {
  60.         this.discount = discount;
  61.     }
  62.     public float getPaymentPrice() {
  63.         return paymentPrice;
  64.     }
  65.     public void setPaymentPrice(float paymentPrice) {
  66.         this.paymentPrice = paymentPrice;
  67.     }
  68. }
package com.guludada.domain;

import java.util.ArrayList;
import java.util.Date;

public class ordersBean {

	Date createTime = null;
	String number = "";
	String paymentNumber = "";
	Date paymentDate = null;
	String merchantName = "";
	ArrayList skuGroup = null;
	float totalPrice = 0;
	float discount = 0;
	float paymentPrice = 0;
	
	public Date getCreateTime() {
		return createTime;
	}
	public void setCreateTime(Date createTime) {
		this.createTime = createTime;
	}
	public String getNumber() {
		return number;
	}
	public void setNumber(String number) {
		this.number = number;
	}
	public String getPaymentNumber() {
		return paymentNumber;
	}
	public void setPaymentNumber(String paymentNumber) {
		this.paymentNumber = paymentNumber;
	}
	public Date getPaymentDate() {
		return paymentDate;
	}
	public void setPaymentDate(Date paymentDate) {
		this.paymentDate = paymentDate;
	}
	public String getMerchantName() {
		return merchantName;
	}
	public void setMerchantName(String merchantName) {
		this.merchantName = merchantName;
	}
	public ArrayList getSkuGroup() {
		return skuGroup;
	}
	public void setSkuGroup(ArrayList skuGroup) {
		this.skuGroup = skuGroup;
	}
	public float getTotalPrice() {
		return totalPrice;
	}
	public void setTotalPrice(float totalPrice) {
		this.totalPrice = totalPrice;
	}
	public float getDiscount() {
		return discount;
	}
	public void setDiscount(float discount) {
		this.discount = discount;
	}
	public float getPaymentPrice() {
		return paymentPrice;
	}
	public void setPaymentPrice(float paymentPrice) {
		this.paymentPrice = paymentPrice;
	}
	
	
}

本文例子中用不到skusbean,所以这里作者就没有写委屈偷懒一下下

  1. package com.guludada.domain;
  2. public class skusBean {
  3.       ………………
  4. }
package com.guludada.domain;

public class skusBean {
      ………………
}

logInfoHandler用来过滤订单的日志信息,并保存到ordersBean和skusBean中,方便Bolt获取日志数据的各项属性进行处理

  1. package com.guludada.common;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. import com.guludada.domain.ordersBean;
  7. public class logInfoHandler {
  8.     SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  9.     public ordersBean getOrdersBean(String orderInfo) {
  10.         ordersBean order = new ordersBean();
  11.         //从日志信息中过滤出订单信息
  12.         Pattern orderPattern = Pattern.compile("orderNumber:.+");
  13.         Matcher orderMatcher = orderPattern.matcher(orderInfo);
  14.         if(orderMatcher.find()) {
  15.             String orderInfoStr = orderMatcher.group(0);
  16.             String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
  17.             //获取订单号
  18.             String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
  19.             order.setNumber(orderNum);
  20.             //获取创建时间
  21.             String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
  22.             try {
  23.                 order.setCreateTime(sdf_final.parse(orderCreateTime));
  24.             } catch (ParseException e) {
  25.                 // TODO Auto-generated catch block
  26.                 e.printStackTrace();
  27.             }
  28.             //获取商家名称
  29.             String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
  30.             order.setMerchantName(merchantName);
  31.             //获取订单总额
  32.             String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
  33.             String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
  34.             order.setTotalPrice(Float.parseFloat(totalPrice));
  35.             return order;
  36.         } else {
  37.             return order;
  38.         }
  39.     }
  40. }
package com.guludada.common;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.domain.ordersBean;

public class logInfoHandler {
	
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public ordersBean getOrdersBean(String orderInfo) {
		
		ordersBean order = new ordersBean();
		
		//从日志信息中过滤出订单信息
		Pattern orderPattern = Pattern.compile("orderNumber:.+");
		Matcher orderMatcher = orderPattern.matcher(orderInfo);
		if(orderMatcher.find()) {
			
			String orderInfoStr = orderMatcher.group(0);
			String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
			
			//获取订单号
			String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
			order.setNumber(orderNum);
						
			//获取创建时间
			String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
			try {
				order.setCreateTime(sdf_final.parse(orderCreateTime));
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
			//获取商家名称
			String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
			order.setMerchantName(merchantName);
			
			//获取订单总额
			String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
			String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
			order.setTotalPrice(Float.parseFloat(totalPrice));
						
			return order;
						
		} else {
			return order;
		}
	}
}

 

  1. package com.guludada.ordersanalysis;
  2. import java.util.Map;
  3. import com.guludada.common.logInfoHandler;
  4. import com.guludada.domain.ordersBean;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichBolt;
  9. import backtype.storm.tuple.Tuple;
  10. import redis.clients.jedis.Jedis;
  11. import redis.clients.jedis.JedisPool;
  12. import redis.clients.jedis.JedisPoolConfig;
  13. public class merchantsSalesAnalysisBolt extends BaseRichBolt {
  14.     private OutputCollector _collector;
  15.     logInfoHandler loginfohandler;
  16.     JedisPool pool;
  17.     public void execute(Tuple tuple) {
  18.         String orderInfo = tuple.getString(0);
  19.         ordersBean order = loginfohandler.getOrdersBean(orderInfo);
  20.         //store the salesByMerchant infomation into Redis
  21.         Jedis jedis = pool.getResource();
  22.         jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
  23.     }
  24.     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
  25.         this._collector = collector;
  26.         this.loginfohandler = new logInfoHandler();
  27.         this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
  28.     }
  29.     public void declareOutputFields(OutputFieldsDeclarer arg0) {
  30.         // TODO Auto-generated method stub
  31.     }
  32. }
package com.guludada.ordersanalysis;

import java.util.Map;

import com.guludada.common.logInfoHandler;
import com.guludada.domain.ordersBean;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class merchantsSalesAnalysisBolt extends BaseRichBolt {
	
	private OutputCollector _collector;
	logInfoHandler loginfohandler;
	JedisPool pool;

	public void execute(Tuple tuple) {
		String orderInfo = tuple.getString(0);
		ordersBean order = loginfohandler.getOrdersBean(orderInfo);
		
		//store the salesByMerchant infomation into Redis
		Jedis jedis = pool.getResource();
		jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this._collector = collector;
		this.loginfohandler = new logInfoHandler();
		this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
		
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		
	}

}

Topology项目的Maven配置文件

  1.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2.   4.0.0
  3.   com.guludada
  4.   Storm_OrdersAnalysis
  5.   war
  6.   0.0.1-SNAPSHOT
  7.   Storm_OrdersAnalysis Maven Webapp
  8.   http://maven.apache.org
  9.   
  10.     
  11.         org.apache.storm
  12.         storm-core
  13.         0.9.6
  14.         provided
  15.     
  16.     
  17.         org.apache.storm
  18.         storm-kafka
  19.         0.9.6
  20.     
  21.     
  22.         org.apache.kafka
  23.         kafka_2.10
  24.         0.9.0.1
  25.             
  26.                 
  27.                     org.apache.zookeeper
  28.                     zookeeper
  29.                 
  30.                 
  31.                     log4j
  32.                     log4j
  33.                 
  34.                 
  35.                     org.slf4j
  36.                     slf4j-log4j12
  37.                 
  38.             
  39.     
  40.     
  41.         redis.clients
  42.         jedis
  43.         2.8.1
  44.     
  45.   
  46.   
  47.     Storm_OrdersAnalysis
  48.     
  49.         
  50.             maven-assembly-plugin
  51.             
  52.                 
  53.                     jar-with-dependencies
  54.                 
  55.                 
  56.                    
  57.                      com.guludada.ordersanalysis.ordersAnalysisTopology
  58.                    
  59.                  
  60.              
  61.           
  62.       
  63.   

  4.0.0
  com.guludada
  Storm_OrdersAnalysis
  war
  0.0.1-SNAPSHOT
  Storm_OrdersAnalysis Maven Webapp
  http://maven.apache.org
  
    
		org.apache.storm
		storm-core
		0.9.6
		provided
	
	
        org.apache.storm
        storm-kafka
        0.9.6
    
    
    	org.apache.kafka
        kafka_2.10
        0.9.0.1
            
                
                    org.apache.zookeeper
                    zookeeper
                
                
                    log4j
                    log4j
                
                
                    org.slf4j
    				slf4j-log4j12
                
            
    
    
	    redis.clients
	    jedis
	    2.8.1
		
  
  
    Storm_OrdersAnalysis
    
		
			maven-assembly-plugin
			
				  
			    	jar-with-dependencies
			    
			    
			       
			         com.guludada.ordersanalysis.ordersAnalysisTopology
			       
			     
			 
		  
	  
  

maven配置文件中配置了一个官方推荐的maven-assembly-plugin插件,用来帮助用户方便地打包Topology程序的。只需要进入到项目的根路径,然后运行
$mvn assembly:assembly
命令就可以打包好Topologyjar包了。

最后我带大家梳理一下整个项目的部署流程
1.  启动Zookeeper
2. 启动Kafka
3. 启动Flume将程序拉取到Kafka
4. 启动Storm集群
5. 启动Redis服务端  通过命令
$ src/redis-server
6. 提交打包好的Topology程序到Storm集群中通过Storm UI 或者命令$storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
7. 启动RedisCLI客户端查看结果通过命令
$ src/redis-cli --raw
$  zrange key 0 -1 withscores

如下图:

 

Troubleshooting
  1. 在使用maven同时导入storm-core, storm-kaka和kafka的依赖包的时候可能会出现jar包冲突导致无法初始化Log4jLoggerFactory,并无法启动Storm程序.解决方法也很简单,按照红字提示,把多余的jar包移除就行了,通过在maven的pom文件中kafka的依赖设置部分加入下面的设置org.slf4jslf4j-log4j12
  2. 第一次执行Storm建立Topology时,作者遇到了一个十分低级的问题,就是发现明明Kafka的topic里有数据,可是Storm程序怎么都无法读取到数据,后来才从下面的文章中明白了问题的所在 http://m.blog.csdn.net/article/details?id=18615761  原因就在于Topology第一次启动前还没有在zookeeper中的zkRoot创建offset信息,Storm取不到offset信息就会使用默认的offset,也就是log文件中从最后一个元素开始读取信息,所以之前在kafka中的数据都无法读出来。Storm启动后,再往broker中写数据,这些后写的数据就能正确被Storm处理。
  3. 当Storm的topology传到Nimbus的时候,或者说你的Storm程序刚开始启动的时候可能会报关于JedisPool是一个无法序列化的对象而导致的错误:java.lang.RuntimeException:java.io.NotSerializableException: redis.clients.jedis.JedisPool 解决方案就是将Bolt类中外部的JedisPool初始化代码放入Bolt的prepare()方法中,如本文的代码示例所示
  4. 在Storm启动并开始连接Redis的时候,会报出连接被拒绝,因为Redis运行在protect mode模式下的错误。这是因为Storm程序是远程连接Redis的服务器端,如果Redis服务器端没有设置密码的话是拒绝远程连接的。解决方法也十分简单,关闭protect mode模式(强烈不推荐),或者使用下面命令为Redis设置密码就可以了$config set requirepass 123
  5. 向Storm提交Topology以后, Supervisor端会一直报“Kill XXXX No Such process”的错误,多数原因是提交的topology没有正确被执行,而Storm的日记中不会显示topology程序里的错误。解决方法就是启动Storm UI, 通过这个Storm自带的UI界面查看topology的运行情况,并且程序中的错误也会在UI界面中显示出来,能方便地查看topology程序的错误。

    6.kafka使用的时候的小问题:
        当在一台机子上启动kafka producer客户端的时候,是无法在同一台机子上继续启动kafka的consumer客户端的,因为这两个进程可能占用的同一个端口,需要在另外一台机子上启动kafka consumer程序,这样就能看见正确的结果了

最后,感谢所有耐心看完这篇文章的人,楼主也深感自己的技术水平和语言表达还有很多需要提高的地方,希望能和大家一起交流学习共同进步,欢迎大家留下宝贵的意见和评论!还有再最后吐槽一下,CSDN的文章编辑器在我的MAC系统的火狐浏览器下十分十分十分十分难用,字体格式等根本不受控制,各种莫名其妙的BUG…………

from:http://blog.csdn.net/ymh198816/article/details/51998085