跳转至

PredictionIO实现论坛广告贴过滤

关注 Apache PredictionIO 已有一段时间,最近需要做一个过滤论坛广告贴的功能,调研了一番市面上提供文本检测的API服务,发现靠谱的公司都比较贵,并且准确率不高(一个可能性是他们使用的训练数据集是互联网上的通用信息,而我们是一个垂直行业论坛,攻击来源也可能是固定的那么若干个),于是尝试自己实现个文本分类器来过滤广告。

关于文本分类,首先想到的就是朴素贝叶斯分类器,其理论清晰简单,并且各大机器学习框架均支持。

朴素贝叶斯分类器 关于贝叶斯分类器的的原理可以参考以下文档:算法杂货铺——分类算法之朴素贝叶斯分类(Naive Bayesian classification)

Spark实现

下面是 Spark 贝叶斯分类器的实现,主要流程为:

  1. 读入数据集
  2. 使用 ansj 分词,生成 (label, words) 结构的 dataframe
  3. 分割训练集与测试集
  4. 创建 HashingTF,IDF,NaiveBayes 的 Pipeline 模型
  5. 设置参数矩阵,然后使用 TrainValidationSplit 进行模型训练与参数选择
  6. 训练出 bestModal,保存到指定路径
  7. 通过最佳模型与测试集得出模型评估

实现代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.yplam

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import scopt.OptionParser
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.mllib.evaluation.MulticlassMetrics

object SpamFilterTrainer {

  /**
    * 运行参数
    * @param trainInput 训练数据集路径,格式为 label;content
    */
  case class SpamFilterParams(
               trainInput: String = null,
               modelOutput: String = null
             )

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val defaultParam = SpamFilterParams()
    val parser = new OptionParser[SpamFilterParams](this.getClass.getSimpleName) {
      head(s"${this.getClass.getSimpleName}: BBS Spam filter.")
      opt[String]("train")
        .text("train input")
        .action((x, c) => c.copy(trainInput = x))
        .required()
      opt[String]("output")
        .text("model output")
        .action((x, c) => c.copy(modelOutput = x))
        .optional()
    }
    parser.parse(args, defaultParam).map { params =>
      run(spark, params.trainInput, params.modelOutput)
      sys.exit(0)
    } getOrElse {
      sys.exit(1)
    }
  }

  def run(spark: SparkSession, trainInput: String, modelOutput: String) : Unit = {
    import spark.implicits._
    import scala.collection.JavaConversions._

    val logger = Logger.getLogger("app")
    logger.warn("Start:" + System.currentTimeMillis().toString)

    val filter = new StopRecognition()
    filter.insertStopWords(" ", ",")

    // 读取数据集,使用 ansj 分词,生成 (label, words) 结构的 dataframe
    val inputData = spark.sparkContext.textFile(trainInput).map{ line =>
      val splits = line.split(";", 2)
      val content = if(splits(1).length > 0){
        ToAnalysis.parse(splits(1)).recognition(filter).getTerms.map(_.getRealName)
      }
      else List("")
      (splits(0).toInt, content)
    }.toDF("label", "words")
    inputData.show()

    // 分割训练集与测试集
    val Array(train, test) = inputData.randomSplit(Array(0.9, 0.1), seed = 12345).map(_.cache())

    // 创建 HashingTF,IDF,NaiveBayes 的 Pipeline 模型
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(5000)
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    val nb = new NaiveBayes()
      .setModelType("multinomial")
      .setFeaturesCol("features")
      .setLabelCol("label")
    val pipeline = new Pipeline()
      .setStages(Array(hashingTF, idf, nb))

    // 设置参数矩阵,然后使用 TrainValidationSplit 进行模型训练与参数选择
    val paramGrid = new ParamGridBuilder()
      .addGrid(hashingTF.numFeatures, Array(50000, 100000))
      .addGrid(nb.smoothing, Array(1.0))
      .build()

    val trainValidationSplit = new TrainValidationSplit()
      .setEstimator(pipeline)
      .setEvaluator(new MulticlassClassificationEvaluator)
      .setEstimatorParamMaps(paramGrid)
      .setTrainRatio(0.8)


    val model = trainValidationSplit.fit(train)

    logger.warn("Train end:" + System.currentTimeMillis().toString)

    // 训练出 bestModal,保存到指定路径
    if(modelOutput != null)
      model.bestModel.asInstanceOf[PipelineModel].write.overwrite().save(modelOutput)

    // 将参数与评估结果打印出来
    model.getEstimatorParamMaps.zip(model.validationMetrics).foreach(println)

    val testResult = model.transform(test)
      .select("features", "label", "prediction")
    testResult.show()
    val testPredictionAndLabels = testResult.select("prediction", "label").rdd.map{
      case Row(prediction: Double, label: Int) => (prediction, label.toDouble)
    }
    val metrics = new MulticlassMetrics(testPredictionAndLabels)
    println(metrics.confusionMatrix)
    println("accuracy:" + metrics.accuracy)
    println("weightedPrecision:" + metrics.weightedPrecision)
    println("weightedRecall:" + metrics.weightedRecall)

    logger.warn("Finish:" + System.currentTimeMillis().toString)

    spark.stop()
  }
}

打包,然后在 Spark 中训练:

1
spark-submit --class com.yplam.SpamFilterTrainer spark-scala-playground.jar --train /home/yplam/spark/work/data/bbs --output /home/yplam/spark/work/model/bbs

结果还算可以,约 93% 的准确率。由于我们之前已经保存好最佳模型,可以编写一个简单的json API接口对外提供服务,而另一个可能更加适合的方式就是将模型移到 PredictionIO 上。

PredictionIO 简介

PredictionIO 是一个开源的机器学习平台,用于构建、评估与发布机器学习算法引擎。PredictionIO 提供事件服务器( Event Server ) 用于接收与保存数据,用于生成训练数据集;引擎模板( engine template )提供各种不同场景应用所需的机器学习算法模板。模型训练出来后 PredictionIO 提供保存与发布服务,发布后的模型可以通过接收 REST API 提供预测结果。对开发者而言,只需要按照 PredictionIO 提供的 DASE 模式实现数据源与预处理(D)、算法(A)、服务(S)、评估指标(E)。PredictionIO 使开发者专注于模型本身,其他的事情它都已经帮你实现。

PredictionIO

上面为 PredictionIO 的单模型引擎框架图。

事件服务器接收与存储事件数据,数据可以有多种格式,譬如对于文本分类器可以是 { "entityId" : 123, "entityType" : "content", "event" : "$set", "properties" : { "label" : "spam", "text" : "哈哈哈"} },对于推荐引擎可以是 { "entityId" : 1, "entityType" : "user", "event" : "buy", "targetEntityType":"item", "targetEntityId":"123" } 等。关于事件模型的更多内容可以参考 Events Modeling 。 PredictionIO 提供 REST 接口用于接收事件 多种存储后端,包括mysql,pgsql 跟 hbase。

PredictionIO 实现

由于 PredictionIO 社区提供了大量的引擎模板,因此实现自己模型的最简单方式就是在官方模板的基础上进行修改,譬如 Text Classification中就包含基于贝叶斯算法的文本分类器实现,得益于 PredictionIO 的 DASE 模型,我们只需要拷贝此项目,根据项目实际情况做小修改,然后加入 ansj 分词功能,完成。

针对 DataSource,我们需要根据我们模型实际数据结构,设定 entityType,eventNames,以及对 Observation 结构的处理。

针对 Preparator,我们用 ansj 替换原有的分词方式,同时限制了内容长度。

相关代码地址:https://github.com/yplam/predictionio-spam-detection-cn

后记:通过分析“漏网之鱼”的内容,发现已经有针对贝叶斯算法的攻击,也就是在长篇内容中夹入少量垃圾信息,通过只截取标题+内容头尾各500个字的方式,可以有一定的效果提升。