你好,游客 登录
背景:
阅读新闻

sparkmllib朴素贝叶斯分类算法

[日期:2017-09-29] 来源:csdn  作者:fuck_prometheus [字体: ]

贝叶斯分类是一类分类算法的总称,这类算法均以贝叶斯定理为基础,故统称为贝叶斯分类。本文作为分类算法的第一篇,将首先介绍分类问题,对分类问题 进行一个正式的定义。然后,介绍贝叶斯分类算法的基础——贝叶斯定理。最后,通过实例讨论贝叶斯分类中最简单的一种:朴素贝叶斯分类。

1. 基本概念

1)P(A) 是A的先验概率或边缘概率。之所以称为”先验”是因为它不考虑任何B方面的因素。
2)P(A|B) 是已知B发生后A的条件概率,也由于得自B的取值而被称作A的后验概率。
3)P(B|A)是已知A发生后B的条件概率,也由于得自A的取值而被称作B的后验概率。
4)P(B) 是B的先验概率或边缘概率,也被称作标准化常量 (normalized constant)。
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
MLlib的贝叶斯分类模型采用朴素贝叶斯分类算法,其模型主要是计算每个类别的先验概率 、各类别下各个特征属性的条件概率,其分布式实现方法是:对样本进行聚合操作,统计所有标签出现的次数、对应特征之和;对(label, features)格式样本采用combineByKey聚合函数,对同一标签数据进行聚合统计操作。通过聚合操作后,可以通过聚合结果计算先验概率、条 件概率,得到朴素贝叶斯分类模型。对于预测,根据模型的先验概率、条件概率,计算每个样本属于每个类别的概率,最后取最大项作为样本的类别。
通俗点讲,以文本分类来说,我们给定有m个分类,有n个特征,首先我计算出现在某一类的先验概率为p(yi),即出现在yi中的样本数除以总样本数。n个 特征我们用向量a表示,我们的目的是要计算数据属于那个分类,即取p(y|xi),…….,p(y|xn)中的最大值,对于其中的p(yi|a)计算为 p(a|yi)*p(yi)。p(yi)上面已经求出,p(xi|yi)为各类别下各个特征属性的条件概率估计,公式(3)类似于文本分类中的tf idf值。

源码分析

NaiveBayes //贝叶斯分类伴生对象
train //train是NaiveBayes对象的静态方法,该方法是根据设置朴素贝叶斯分类参数 新建朴素贝叶斯分类类,并执行run方法进行训练
NaiveBayes //
run //NaiveBayes类的run方法,开始训练贝叶斯模型,该方法主要是计算各类别的先验概率和各个特征在各个类别中的条件概率。
aggregated //对所有样本进行聚合统计,统计每个类别下的每一个特征值之和及次数 ,该聚合统计是先验概率和条件概率计算的基础。
pi //根据aggregated 函数,计算各类别的先验概率
theta //根据aggregated 函数,计算各个特征在各个类别中的条件概率
NaiveBayesModel 
predict //根据模型的先验概率、条件概率,计算样本属于每个类别的概率,取最大项作为样本的类别

多项式和伯努力方程:

private def multinomialCalculation(testData: Vector) = {
    val prob = thetaMatrix.multiply(testData)
    BLAS.axpy(1.0, piVector, prob)
    prob
  }

  private def bernoulliCalculation(testData: Vector) = {
    testData.foreachActive((_, value) =>
      if (value != 0.0 && value != 1.0) {
        throw new SparkException(
          s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
      }
    )
    val prob = thetaMinusNegTheta.get.multiply(testData)
    BLAS.axpy(1.0, piVector, prob)
    BLAS.axpy(1.0, negThetaSum.get, prob)
    prob
  }

NaiveBayes伴生对象

 //输入数据,平滑因子,模型类别(是多项式还是伯努力方程)
  def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel = {
    require(supportedModelTypes.contains(modelType),
      s"NaiveBayes was created with an unknown modelType: $modelType.")
    new NaiveBayes(lambda, modelType).run(input)
  }

NaiveBayes类

 def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
    val spark = SparkSession
      .builder()
      .sparkContext(data.context)
      .getOrCreate()

    import spark.implicits._

    val nb = new NewNaiveBayes()
      .setModelType(modelType)
      .setSmoothing(lambda)
      .setIsML(false)

    val dataset = data.map { case LabeledPoint(label, features) => (label, features.asML) }
      .toDF("label", "features")

    val newModel = nb.fit(dataset)

    val pi = newModel.pi.toArray
    val theta = Array.fill[Double](newModel.numClasses, newModel.numFeatures)(0.0)
    newModel.theta.foreachActive {
      case (i, j, v) =>
        theta(i)(j) = v
    }

    require(newModel.oldLabels != null,
      "The underlying ML NaiveBayes training does not produce labels.")
    new NaiveBayesModel(newModel.oldLabels, pi, theta, modelType)
  }

核心算法

override protected def train(dataset: Dataset[_]): NaiveBayesModel = {
    if (isML) {
      val numClasses = getNumClasses(dataset)
      if (isDefined(thresholds)) {
        require($(thresholds).length == numClasses, this.getClass.getSimpleName +
          ".train() called with non-matching numClasses and thresholds.length." +
          s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
      }
    }

    val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
      val values = v match {
        case sv: SparseVector => sv.values
        case dv: DenseVector => dv.values
      }

      require(values.forall(_ >= 0.0),
        s"Naive Bayes requires nonnegative feature values but found $v.")
    }

    val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
      val values = v match {
        case sv: SparseVector => sv.values
        case dv: DenseVector => dv.values
      }

      require(values.forall(v => v == 0.0 || v == 1.0),
        s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")
    }

    val requireValues: Vector => Unit = {
      $(modelType) match {
        case Multinomial =>
          requireNonnegativeValues
        case Bernoulli =>
          requireZeroOneBernoulliValues
        case _ =>
          // This should never happen.
          throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
      }
    }

    val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
    val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))

    // Aggregates term frequencies per label.
    // TODO: Calling aggregateByKey and collect creates two stages, we can implement something
    // TODO: similar to reduceByKeyLocally to save one stage.
    val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd
      .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2)))
      }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))(
      seqOp = {
         case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
           requireValues(features)
           BLAS.axpy(weight, features, featureSum)
           (weightSum + weight, featureSum)
      },
      combOp = {
         case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
           BLAS.axpy(1.0, featureSum2, featureSum1)
           (weightSum1 + weightSum2, featureSum1)
      }).collect().sortBy(_._1)

    val numLabels = aggregated.length
    val numDocuments = aggregated.map(_._2._1).sum

    val labelArray = new Array[Double](numLabels)
    val piArray = new Array[Double](numLabels)
    val thetaArray = new Array[Double](numLabels * numFeatures)

    val lambda = $(smoothing)
    val piLogDenom = math.log(numDocuments + numLabels * lambda)
    var i = 0
    aggregated.foreach { case (label, (n, sumTermFreqs)) =>
      labelArray(i) = label
      piArray(i) = math.log(n + lambda) - piLogDenom
      val thetaLogDenom = $(modelType) match {
        case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda)
        case Bernoulli => math.log(n + 2.0 * lambda)
        case _ =>
          // This should never happen.
          throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
      }
      var j = 0
      while (j < numFeatures) {
        thetaArray(i * numFeatures + j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
        j += 1
      }
      i += 1
    }

    val pi = Vectors.dense(piArray)
    val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
    new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
  }

预测

NaiveBayesModel 类,NaiveBayesModel伴生对象

 override def predict(testData: Vector): Double = {
    modelType match {
      case Multinomial =>
        labels(multinomialCalculation(testData).argmax)
      case Bernoulli =>
        labels(bernoulliCalculation(testData).argmax) //max()
    }
  }

private def multinomialCalculation(features: Vector) = {
    val prob = theta.multiply(features)
    BLAS.axpy(1.0, pi, prob)
    prob
  }

  private def bernoulliCalculation(features: Vector) = {
    features.foreachActive((_, value) =>
      require(value == 0.0 || value == 1.0,
        s"Bernoulli naive Bayes requires 0 or 1 feature values but found $features.")
    )
    val prob = thetaMinusNegTheta.get.multiply(features)
    BLAS.axpy(1.0, pi, prob)
    BLAS.axpy(1.0, negThetaSum.get, prob)
    prob
  }

实例

import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val Array(training, test) = data.randomSplit(Array(0.6, 0.4))

val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")

val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()

// Save and load model
model.save(sc, "target/tmp/myNaiveBayesModel")
val sameModel = NaiveBayesModel.load(sc, "target/tmp/myNaiveBayesModel")
收藏 推荐 打印 | 阅读: