diff --git a/README.md b/README.md index 64c4f7b..bf52f5f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ 本项目是作者多年学习与工作实践的总结,绝大部分代码都经过实际运行保证准确无误。 -作者非CS专业科班出身,在校期间并未系统学习过数据结构,操作系统,设计模式等课程,相关的知识都是工作以后再进行系统学习,所以该项目特别适合非CS专业同学。 +作者非CS专业科班出身,在校期间并未系统学习过数据结构,操作系统,设计模式等课程,相关的知识都是工作以后再进行系统学习,所以该项目特别适合非CS专业同学参考。 结合作者工作实践发现,算法并不只是离线train model,甚至可以说离线train model只是工作很小的一部分。相反对应的工程能力,代码能力,数据能力非常重要。 diff --git a/deep-learning/numpy广播机制小结 .md b/deep-learning/numpy广播机制小结 .md new file mode 100644 index 0000000..c5afde4 --- /dev/null +++ b/deep-learning/numpy广播机制小结 .md @@ -0,0 +1,248 @@ +## 1.Broadcast(广播)机制 +numpy中的广播很常见,其用法是针对不同的shape的ndarray进行对应的数值计算的时候,将较小的ndarray广播变成更大的ndarray进行对应的shape匹配,从而使两个看起来shape不匹配的数组能进行相应的数值运算。 + +## 2.广播的规则 +1.All input arrays with ndim smaller than the input array of largest ndim, have 1’s prepended to their shapes. + +2.The size in each dimension of the output shape is the maximum of all the input sizes in that dimension. + +3.An input can be used in the calculation if its size in a particular dimension either matches the output size in that dimension, or has value exactly 1. + +4.If an input has a dimension size of 1 in its shape, the first data entry in that dimension will be used for all calculations along that dimension. In other words, the stepping machinery of the ufunc will simply not step along that dimension (the stride will be 0 for that dimension). + +翻译过来就是 +1.让所有输入数组都向其中shape最长的数组看齐,shape中不足的部分都通过在前面加1补齐 +2.输出数组的shape是输入数组shape的各个轴上的最大值 +3.如果输入数组的某个轴和输出数组的对应轴的长度相同或者其长度为1时,这个数组能够用来计算,否则出错 +4.当输入数组的某个轴的长度为1时,沿着此轴运算时都用此轴上的第一组值 + +上面的解释其实还是比较抽象,下面我们通过一些例子来理解。 + +## 3.实例分析 +首先我们看最简单的一种广播方式,向量对标量的运算。 + +``` +import numpy as np + + +def t1(): + array = np.arange(5) + print("array is: ", array) + array = array * 4 + print("after broadcast, array is: ", array) +``` + +运行的结果为 + +``` +array is: [0 1 2 3 4] +after broadcast, array is: [ 0 4 8 12 16] +``` + +这个没啥好讲的,比较典型的element-wise运算方式,本质上是array每个位置都乘4,运算过程可以理解为将4这个标量广播成了5*1的维度。 + +``` +def t2(): + array = np.arange(12).reshape(4, 3) + print("array is: ", array) + + print(array.mean(0)) + print(array.mean(0).shape) + print(array.mean(1)) + print(array.mean(1).shape) + array = array - array.mean(0) + print("after broadcast, array is: ", array) + array = array - array.mean(1) + print("after broadcast, array is: ", array) +``` + +输出的结果为 + +``` +array is: [[ 0 1 2] + [ 3 4 5] + [ 6 7 8] + [ 9 10 11]] +[4.5 5.5 6.5] +(3,) +[ 1. 4. 7. 10.] +(4,) +after broadcast, array is: [[-4.5 -4.5 -4.5] + [-1.5 -1.5 -1.5] + [ 1.5 1.5 1.5] + [ 4.5 4.5 4.5]] +Traceback (most recent call last): + File "/Users/wanglei/wanglei/code/python/tfpractice/basic/Broadcast.py", line 81, in + t2() + File "/Users/wanglei/wanglei/code/python/tfpractice/basic/Broadcast.py", line 29, in t2 + array = array - array.mean(1) +ValueError: operands could not be broadcast together with shapes (4,3) (4,) +``` + +上面的代码中,mean(0)的维度为(3,),与(4, 3)维度的array运算时,mean(0)最后一维为3,与(4,3)的最后一维相等,所以能被顺利广播。但是mean(1)的维度为(4, ),与(4,3)的最后一维不相等,所以无法被广播。 + +``` +def t3(): + array = np.arange(12).reshape(4, 3) + print("array is: ", array) + print(array.mean(0)) + print(array.mean(0).shape) + print(array.mean(1)) + print(array.mean(1).shape) + array = array - array.mean(0).reshape(1, 3) + print("after broadcast, array is: ", array) +``` + +运行结果如下 + +``` +array is: [[ 0 1 2] + [ 3 4 5] + [ 6 7 8] + [ 9 10 11]] +[4.5 5.5 6.5] +(3,) +[ 1. 4. 7. 10.] +(4,) +after broadcast, array is: [[-4.5 -4.5 -4.5] + [-1.5 -1.5 -1.5] + [ 1.5 1.5 1.5] + [ 4.5 4.5 4.5]] +``` + +``` +def t4(): + array = np.arange(12).reshape(4, 3) + print("array is: ", array) + print(array.mean(1).reshape(4, 1)) + array = array - array.mean(1).reshape(4, 1) + print("after broadcast, array is: ", array) +``` + +运行结果如下 + +``` +array is: [[ 0 1 2] + [ 3 4 5] + [ 6 7 8] + [ 9 10 11]] +[[ 1.] + [ 4.] + [ 7.] + [10.]] +after broadcast, array is: [[-1. 0. 1.] + [-1. 0. 1.] + [-1. 0. 1.] + [-1. 0. 1.]] +``` + +当mean(1)被reshape成(4,1)以后,与array的维度(4, 3)相比,第一维相同,第二维是1,所以能顺利广播。 + + +``` +def t5(): + array = np.arange(24).reshape(2, 3, 4) + print("in the beginning, array is: ", array) + arrayb = np.arange(12).reshape(3, 4) + print("arrayb is: ", arrayb) + array = array - arrayb + print("after broadcast, array is: ", array) +``` + +运行结果 + +``` +in the beginning, array is: [[[ 0 1 2 3] + [ 4 5 6 7] + [ 8 9 10 11]] + + [[12 13 14 15] + [16 17 18 19] + [20 21 22 23]]] +arrayb is: [[ 0 1 2 3] + [ 4 5 6 7] + [ 8 9 10 11]] +after broadcast, array is: [[[ 0 0 0 0] + [ 0 0 0 0] + [ 0 0 0 0]] + + [[12 12 12 12] + [12 12 12 12] + [12 12 12 12]]] +``` + + +上述array的维度是(2, 3, 4),arrayb的维度是(3,4),因此广播的时候,是相当于是在shape[0]的维度上进行复制。 + +``` +def t6(): + array = np.arange(24).reshape(2, 3, 4) + print("in the beginning, array is: ", array) + arrayb = np.arange(8).reshape(2, 1, 4) + print("arrayb is: ", arrayb) + array = array - arrayb + print("after broadcast, array is: ", array) + + +def t7(): + array = np.arange(24).reshape(2, 3, 4) + print("in the beginning, array is: ", array) + arrayb = np.arange(6).reshape(2, 3, 1) + print("arrayb is: ", arrayb) + array = array - arrayb + print("after broadcast, array is: ", array) +``` + +运行结果为 + +``` +in the beginning, array is: [[[ 0 1 2 3] + [ 4 5 6 7] + [ 8 9 10 11]] + + [[12 13 14 15] + [16 17 18 19] + [20 21 22 23]]] +arrayb is: [[[0 1 2 3]] + + [[4 5 6 7]]] +after broadcast, array is: [[[ 0 0 0 0] + [ 4 4 4 4] + [ 8 8 8 8]] + + [[ 8 8 8 8] + [12 12 12 12] + [16 16 16 16]]] +in the beginning, array is: [[[ 0 1 2 3] + [ 4 5 6 7] + [ 8 9 10 11]] + + [[12 13 14 15] + [16 17 18 19] + [20 21 22 23]]] +arrayb is: [[[0] + [1] + [2]] + + [[3] + [4] + [5]]] +after broadcast, array is: [[[ 0 1 2 3] + [ 3 4 5 6] + [ 6 7 8 9]] + + [[ 9 10 11 12] + [12 13 14 15] + [15 16 17 18]]] +``` + +上面的两个方法,则分别是在shape[1]与shape[2]的维度上进行广播 + + +## 4.总结 +结合上面的实例,我们总结一下广播的基本原则: +1.两个数组从末尾开始进行维度的比较。 +2.如果维度相等或者其中有一个值为1,则认为可以广播。 +3.维度如果缺失,可以忽略。 +4.广播是在缺失的维度或者为1的维度上进行的。 + diff --git a/recommend/ALS分解实现.md b/recommend/ALS分解实现.md new file mode 100644 index 0000000..42f3851 --- /dev/null +++ b/recommend/ALS分解实现.md @@ -0,0 +1,424 @@ +## 1.ALS简介 +在前面相关的文章中,已经详细介绍了ALS的原理。用最简单的一句话总结就是:ALS是通过将user与item分别表示为一个低维稠密向量来进行后续的使用。 + +## 2.基于spark的ALS实现 +首先看一部分辅助代码 + +``` +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Set; +import java.util.TreeSet; + +/** + * Created by WangLei on 20-1-10. + */ +public class TimeUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(TimeUtils.class); + + public static final String DATE_FORMAT = "yyyyMMdd"; + public static final String TIME_FORMAT = "yyyyMMdd HH:mm:ss"; + public static final String HOUR_TIME_FORMAT = "yyyyMMdd HH"; + + public static final long TIME_DAY_MILLISECOND = 86400000; + + /** + * timestamp -> ymd + * @param timestamp + * @return + */ + public static String timestamp2Ymd(long timestamp) { + String format = "yyyyMMdd"; + return timestamp2Ymd(timestamp, format); + } + + public static String timestamp2Ymd(long timestamp, String format) { + SimpleDateFormat sdf; + try { + //支持输入10位的时间戳 + if(String.valueOf(timestamp).length() == 10) { + timestamp *= 1000; + } + sdf = new SimpleDateFormat(format); + return sdf.format(new Date(timestamp)); + } catch(Exception ex) { + sdf = new SimpleDateFormat(DATE_FORMAT); + try { + return sdf.format(new Date(timestamp)); + } catch (Exception e){} + } + return null; + } + + public static String timestamp2Hour(long timestamp) { + String time = timestamp2Ymd(timestamp, TIME_FORMAT); + return time.substring(9, 11); + } + + /** + * ymd -> Date + * @param ymd + * @return + */ + public static Date ymd2Date(String ymd) { + return ymd2Date(ymd, "yyyyMMdd"); + } + + public static Date ymd2Date(String ymd, String format) { + try { + SimpleDateFormat sdf = new SimpleDateFormat(format); + return sdf.parse(ymd); + } catch(ParseException ex) { + LOGGER.error("parse ymd to timestamp error!", ex); + } catch (Exception ex) { + LOGGER.error("there is some problem when transfer ymd2Date!", ex); + } + return null; + } + + /** + * ymd -> timestamp + * @param ymd + * @return + */ + public static long ymd2timestamp(String ymd) { + return ymd2Date(ymd).getTime(); + } + + + public static String genLastDayStr() { + return timestamp2Ymd(System.currentTimeMillis() + TIME_DAY_MILLISECOND * (-1)); + } + + + /** + * get the datestr before or after the given datestr + * attention transfer the num from int to long + * @param ymd + * @param num + * @return + */ + public static String genDateAfterInterval(String ymd, int num) { + long timestamp = ymd2timestamp(ymd); + long resTimeStamp = timestamp + TIME_DAY_MILLISECOND * Long.valueOf(num); + return timestamp2Ymd(resTimeStamp); + } + + public static String genLastDayStr(String ymd) { + return genDateAfterInterval(ymd, -1); + } + + + public static Set genYmdSet(long beginTs, long endTs) { + TreeSet ymdSet = new TreeSet(); + for(long ts = beginTs; ts <= endTs; ts += 86400000L) { + ymdSet.add(timestamp2Ymd(ts)); + } + return ymdSet; + } + + public static Set genYmdSet(String beginYmd, String endYmd) { + long beginTs = ymd2timestamp(beginYmd); + long endTs = ymd2timestamp(endYmd); + return genYmdSet(beginTs, endTs); + } + + /** + * end between begin days + * if begin or end is not number format or end < begin, return Integer.MIN_VALUE + * @param begin + * @param end + * @return + */ + public static int getIntervalBetweenTwoDays(String begin, String end) { + try { + int begintmp = Integer.valueOf(begin), endtmp = Integer.valueOf(end); + if(begintmp > endtmp) { + LOGGER.error("we need end no smaller than end!"); + return Integer.MIN_VALUE; + } + Date d1 = ymd2Date(begin); + Date d2 = ymd2Date(end); + Long mils = (d2.getTime() - d1.getTime()) / TIME_DAY_MILLISECOND; + return mils.intValue(); + } catch (NumberFormatException numformatex) { + numformatex.printStackTrace(); + return Integer.MIN_VALUE; + } + } +} +``` + +里面包含了很多时间的处理方法,可以直接加入代码库。 + +``` +/** + * Created by WangLei on 20-1-13. + */ +object DateSpec extends Enumeration { + type DateSpec = Value + + val YMD , Y_M_D, YMD2 = Value +} +``` + + +HDFS相关的工具类 + +``` +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkContext +import org.joda.time.DateTime + +/** + * Created by WangLei on 20-1-10. + */ +object HDFSUtils { + + val conf = new Configuration() + + def delete(sc: SparkContext, path: String) = { + FileSystem.get(sc.hadoopConfiguration).delete(new Path(path), true) + } + + def isExist(sc: SparkContext, path: String) = { + FileSystem.get(sc.hadoopConfiguration).exists(new Path(path)) + } + + def checkFileExist(conf: Configuration = conf, FileName: String): Boolean = { + var isExist = false + + try { + val hdfs = FileSystem.get(conf) + val path = new Path(FileName) + isExist = hdfs.exists(path) + } catch { + case e: Exception => e.printStackTrace() + } + + isExist + } + + def latestMidPath(conf: Configuration, basePath: String): Option[String] = { + val today = new Date + latestMidPath(conf, basePath, new DateTime(today.getTime), 7) + } + + def latestMidPath(conf: Configuration, basePath: String, ymd: String) : Option[String] = { + val timestamp = TimeUtils.ymd2timestamp(ymd) + latestMidPath(conf, basePath, new DateTime(timestamp), 7, false, DateSpec.YMD2) + } + + def latestMidPath(conf: Configuration = conf, basePath: String, date: DateTime, limit: Int,with_success_file:Boolean = true,dateSpec: DateSpec = DateSpec.YMD): Option[String] = { + for (i <- 0 to limit) { + val day = date.minusDays(i) + val path = dateSpec match { + case DateSpec.YMD => basePath + "/date=%04d%02d%02d".format(day.getYear, day.getMonthOfYear, day.getDayOfMonth) + case DateSpec.Y_M_D => basePath + "/year=%04d/month=%02d/day=%02d".format(day.getYear, day.getMonthOfYear, day.getDayOfMonth) + case DateSpec.YMD2 => basePath + "/%04d%02d%02d".format(day.getYear, day.getMonthOfYear, day.getDayOfMonth) + } + + if (checkFileExist(conf, if(with_success_file) path + "/_SUCCESS" else path)) + return Some(path) + } + None + } + +} +``` + + +ALS训练相关代码 + +``` +import org.apache.spark.SparkConf +import org.apache.spark.ml.evaluation.RegressionEvaluator +import org.apache.spark.ml.recommendation.ALS +import org.apache.spark.sql.SparkSession +import org.slf4j.LoggerFactory + +import scala.collection.JavaConversions._ + +object AlsTraining { + + val logger = LoggerFactory.getLogger(this.getClass) + val separator = "\t" + + def genUserItemRdd(spark: SparkSession, ymd: String) = { + val baseinput = PathUtils.user_item_click_path + val (yesterday, daybegin) = (TimeUtils.genLastDayStr(ymd), TimeUtils.genDateAfterInterval(ymd, -29)) + val days = TimeUtils.genYmdSet(daybegin, yesterday) + + // userid itemid clicknum + var rdd = spark.sparkContext.textFile(baseinput + ymd) + .map(x => { + val l = x.split("\t") + (l(0), l(1), l(2)) + }) + + for (day <- days) { + val path = baseinput + day + if (HDFSUtils.isExist(spark.sparkContext, path)) { + val tmp = spark.sparkContext.textFile(path) + .map(x => { + val l = x.split("\t") + (l(0), l(1), l(2)) + }) + rdd = rdd.union(tmp) + } + } + rdd.cache + } + + def genUserItemIndex(spark: SparkSession, ymd: String) = { + val rdd = genUserItemRdd(spark, ymd) + val userindex = rdd.map(x => x._1).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1)) + val itemindex = rdd.map(x => x._2).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1)) + + (userindex, itemindex) + } + + case class Rating(userid: Int, itemid: Int, rating: Float) + + def trainmodel(spark: SparkSession, ymd: String) = { + import spark.implicits._ + + val rdd = genUserItemRdd(spark, ymd) + + val userindexrdd = rdd.map(x => x._1).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1)) + val itemindexrdd = rdd.map(x => x._2).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1)) + + val data = rdd.map(x => { + val (userid, itemid, count) = (x._1, x._2, x._3.toInt) + (userid + separator + itemid, count) + }) + .reduceByKey(_ + _) + .map(x => { + val (userid, itemid, count) = (x._1.split(separator)(0), x._1.split(separator)(1), x._2) + (userid, itemid + separator + count) + }) + .join(userindexrdd) + .map(x => { + val (itemandcount, userindex) = (x._2._1, x._2._2) + val (itemid, count) = (itemandcount.split(separator)(0), itemandcount.split(separator)(1)) + (itemid, userindex + separator + count) + }) + .join(itemindexrdd) + .map(x => { + val (userandcount, itemindex) = (x._2._1, x._2._2) + val (userindex, count) = (userandcount.split(separator)(0), userandcount.split(separator)(1)) + Rating(userindex.toInt, itemindex.toInt, count.toFloat) + }).toDF() + + val Array(training, test) = data.randomSplit(Array(0.8, 0.2)) + val als = new ALS().setRank(128).setMaxIter(8).setRegParam(0.01). + setUserCol("userid").setItemCol("itemid").setRatingCol("rating") + val model = als.fit(training) + + model.setColdStartStrategy("drop") + + val predictions = model.transform(test) + val evaluator = new RegressionEvaluator() + .setMetricName("rmse") + .setLabelCol("rating") + .setPredictionCol("prediction") + val rmse = evaluator.evaluate(predictions) + + logger.error("root-mean-square error is: {}", rmse) + + val userindex2userid = userindexrdd.map(x => (x._2, x._1)) + val userfactors = model.userFactors.rdd.map(x => { + val (userid, userfactor) = (x.getInt(0).toLong, x.getList(1).toArray().mkString(",")) + (userid, userfactor) + }).join(userindex2userid) + .map(x => { + val (userindex, userfactor, userid) = (x._1, x._2._1, x._2._2) + (userindex, userid, userfactor) + }) + .repartition(1) + .sortBy(x => x._1) + .map(x => "%s\t%s\t%s".format(x._1, x._2, x._3)) + + val itemindex2itemid = itemindexrdd.map(x => (x._2, x._1)) + val itemfactors = model.itemFactors.rdd.map(x => { + val (itemid, itemfactor) = (x.getInt(0).toLong, x.getList(1).toArray().mkString(",")) + (itemid, itemfactor) + }).join(itemindex2itemid) + .map(x => { + val (itemindex, itemfactor, itemid) = (x._1, x._2._1, x._2._2) + (itemindex, itemid, itemfactor) + }) + .repartition(1) + .sortBy(x => x._1) + .map(x => "%s\t%s\t%s".format(x._1, x._2, x._3)) + + (userfactors, itemfactors) + } + + def main(args: Array[String]): Unit = { + val (ymd, operation) = (args(0), args(1)) + val sparkConf = new SparkConf() + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + sparkConf.setAppName("user-item-als-training" + ymd) + + val spark = SparkSession.builder().config(sparkConf).getOrCreate() + + operation match { + case "index" => { + val useroutput = PathUtils.user_index_path + ymd + val itemoutput = PathUtils.item_index_path + ymd + val (userindex, itemindex) = genUserItemIndex(spark, ymd) + userindex.repartition(1).sortBy(_._2).map(x => "%s\t%s".format(x._2, x._2)).saveAsTextFile(useroutput) + itemindex.repartition(1).sortBy(_._2).map(x => "%s\t%s".format(x._2, x._2)).saveAsTextFile(itemoutput) + } + case "model" => { + val (userfactors, itemfactors) = trainmodel(spark, ymd) + val user_embedding_path = PathUtils.user_factor_path + ymd + val item_embedding_path = PathUtils.item_factor_path + ymd + HDFSUtils.delete(spark.sparkContext, user_embedding_path) + HDFSUtils.delete(spark.sparkContext, item_embedding_path) + + userfactors.saveAsTextFile(user_embedding_path) + itemfactors.saveAsTextFile(item_embedding_path) + } + } + spark.stop() + } +} + +``` + + +## 3.代码分析 + +``` +PathUtils.user_item_click_path +``` + +这个是输入的数据集,包含三个字段:userid, itemid, 点击数 + +``` +genUserItemIndex +``` + +这个方法是针对userid与itemid进行编码,注意是分开编码 + + +``` +trainmodel +``` + +这个方法的具体步骤如下: + +1.构造训练集 +2.得到als对象 +3.训练模型 als.fit +4.根据得到的模型进行预测 +5.分别得到user向量与item向量。 + diff --git a/traditional-algorithm/tree/CART树.md b/traditional-algorithm/tree/CART树.md index 3b8acb4..b2abf38 100644 --- a/traditional-algorithm/tree/CART树.md +++ b/traditional-algorithm/tree/CART树.md @@ -34,7 +34,7 @@ $$\min _ { j , s } \left[ \min _ { c _ { 1 } } \sum _ { x _ { i } \in R _ { 1 } 2.用选定的(j,s),划分区域,并决定相应的输出 $$\hat{c}\_{m}=average(y_{i}|x_{i} \in R_{m}(j,s))$$ -3.对两个子区域重复1,2步骤,直到满足终止条件 +3.对两个子区域重复1,2步骤,直到满足终止条件 4.将输入的空间划分为M个区域,$R_1, R_2, ..., R_M$,在每个单元上有固定的输出$c_m$,最终生成决策树 $$f(x) = \sum_{m=1} ^M c_mI, X \in R_m$$