add numpy and als

pull/1/head
wanglei 2020-09-26 21:06:58 +08:00
parent 6635c235ee
commit 94069858eb
4 changed files with 674 additions and 2 deletions

View File

@ -1,6 +1,6 @@
本项目是作者多年学习与工作实践的总结,绝大部分代码都经过实际运行保证准确无误。
作者非CS专业科班出身在校期间并未系统学习过数据结构,操作系统,设计模式等课程相关的知识都是工作以后再进行系统学习所以该项目特别适合非CS专业同学。
作者非CS专业科班出身在校期间并未系统学习过数据结构,操作系统,设计模式等课程相关的知识都是工作以后再进行系统学习所以该项目特别适合非CS专业同学参考
结合作者工作实践发现算法并不只是离线train model甚至可以说离线train model只是工作很小的一部分。相反对应的工程能力代码能力数据能力非常重要。

View File

@ -0,0 +1,248 @@
## 1.Broadcast(广播)机制
numpy中的广播很常见其用法是针对不同的shape的ndarray进行对应的数值计算的时候将较小的ndarray广播变成更大的ndarray进行对应的shape匹配从而使两个看起来shape不匹配的数组能进行相应的数值运算。
## 2.广播的规则
1.All input arrays with ndim smaller than the input array of largest ndim, have 1s prepended to their shapes.
2.The size in each dimension of the output shape is the maximum of all the input sizes in that dimension.
3.An input can be used in the calculation if its size in a particular dimension either matches the output size in that dimension, or has value exactly 1.
4.If an input has a dimension size of 1 in its shape, the first data entry in that dimension will be used for all calculations along that dimension. In other words, the stepping machinery of the ufunc will simply not step along that dimension (the stride will be 0 for that dimension).
翻译过来就是
1.让所有输入数组都向其中shape最长的数组看齐shape中不足的部分都通过在前面加1补齐
2.输出数组的shape是输入数组shape的各个轴上的最大值
3.如果输入数组的某个轴和输出数组的对应轴的长度相同或者其长度为1时这个数组能够用来计算否则出错
4.当输入数组的某个轴的长度为1时沿着此轴运算时都用此轴上的第一组值
上面的解释其实还是比较抽象,下面我们通过一些例子来理解。
## 3.实例分析
首先我们看最简单的一种广播方式,向量对标量的运算。
```
import numpy as np
def t1():
array = np.arange(5)
print("array is: ", array)
array = array * 4
print("after broadcast, array is: ", array)
```
运行的结果为
```
array is: [0 1 2 3 4]
after broadcast, array is: [ 0 4 8 12 16]
```
这个没啥好讲的比较典型的element-wise运算方式本质上是array每个位置都乘4运算过程可以理解为将4这个标量广播成了5*1的维度。
```
def t2():
array = np.arange(12).reshape(4, 3)
print("array is: ", array)
print(array.mean(0))
print(array.mean(0).shape)
print(array.mean(1))
print(array.mean(1).shape)
array = array - array.mean(0)
print("after broadcast, array is: ", array)
array = array - array.mean(1)
print("after broadcast, array is: ", array)
```
输出的结果为
```
array is: [[ 0 1 2]
[ 3 4 5]
[ 6 7 8]
[ 9 10 11]]
[4.5 5.5 6.5]
(3,)
[ 1. 4. 7. 10.]
(4,)
after broadcast, array is: [[-4.5 -4.5 -4.5]
[-1.5 -1.5 -1.5]
[ 1.5 1.5 1.5]
[ 4.5 4.5 4.5]]
Traceback (most recent call last):
File "/Users/wanglei/wanglei/code/python/tfpractice/basic/Broadcast.py", line 81, in <module>
t2()
File "/Users/wanglei/wanglei/code/python/tfpractice/basic/Broadcast.py", line 29, in t2
array = array - array.mean(1)
ValueError: operands could not be broadcast together with shapes (4,3) (4,)
```
上面的代码中mean(0)的维度为(3,),与(4, 3)维度的array运算时mean(0)最后一维为3与(4,3)的最后一维相等所以能被顺利广播。但是mean(1)的维度为(4, ),与(4,3)的最后一维不相等,所以无法被广播。
```
def t3():
array = np.arange(12).reshape(4, 3)
print("array is: ", array)
print(array.mean(0))
print(array.mean(0).shape)
print(array.mean(1))
print(array.mean(1).shape)
array = array - array.mean(0).reshape(1, 3)
print("after broadcast, array is: ", array)
```
运行结果如下
```
array is: [[ 0 1 2]
[ 3 4 5]
[ 6 7 8]
[ 9 10 11]]
[4.5 5.5 6.5]
(3,)
[ 1. 4. 7. 10.]
(4,)
after broadcast, array is: [[-4.5 -4.5 -4.5]
[-1.5 -1.5 -1.5]
[ 1.5 1.5 1.5]
[ 4.5 4.5 4.5]]
```
```
def t4():
array = np.arange(12).reshape(4, 3)
print("array is: ", array)
print(array.mean(1).reshape(4, 1))
array = array - array.mean(1).reshape(4, 1)
print("after broadcast, array is: ", array)
```
运行结果如下
```
array is: [[ 0 1 2]
[ 3 4 5]
[ 6 7 8]
[ 9 10 11]]
[[ 1.]
[ 4.]
[ 7.]
[10.]]
after broadcast, array is: [[-1. 0. 1.]
[-1. 0. 1.]
[-1. 0. 1.]
[-1. 0. 1.]]
```
当mean(1)被reshape成(4,1)以后与array的维度(4, 3)相比第一维相同第二维是1所以能顺利广播。
```
def t5():
array = np.arange(24).reshape(2, 3, 4)
print("in the beginning, array is: ", array)
arrayb = np.arange(12).reshape(3, 4)
print("arrayb is: ", arrayb)
array = array - arrayb
print("after broadcast, array is: ", array)
```
运行结果
```
in the beginning, array is: [[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
[[12 13 14 15]
[16 17 18 19]
[20 21 22 23]]]
arrayb is: [[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
after broadcast, array is: [[[ 0 0 0 0]
[ 0 0 0 0]
[ 0 0 0 0]]
[[12 12 12 12]
[12 12 12 12]
[12 12 12 12]]]
```
上述array的维度是(2, 3, 4)arrayb的维度是(3,4)因此广播的时候是相当于是在shape[0]的维度上进行复制。
```
def t6():
array = np.arange(24).reshape(2, 3, 4)
print("in the beginning, array is: ", array)
arrayb = np.arange(8).reshape(2, 1, 4)
print("arrayb is: ", arrayb)
array = array - arrayb
print("after broadcast, array is: ", array)
def t7():
array = np.arange(24).reshape(2, 3, 4)
print("in the beginning, array is: ", array)
arrayb = np.arange(6).reshape(2, 3, 1)
print("arrayb is: ", arrayb)
array = array - arrayb
print("after broadcast, array is: ", array)
```
运行结果为
```
in the beginning, array is: [[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
[[12 13 14 15]
[16 17 18 19]
[20 21 22 23]]]
arrayb is: [[[0 1 2 3]]
[[4 5 6 7]]]
after broadcast, array is: [[[ 0 0 0 0]
[ 4 4 4 4]
[ 8 8 8 8]]
[[ 8 8 8 8]
[12 12 12 12]
[16 16 16 16]]]
in the beginning, array is: [[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
[[12 13 14 15]
[16 17 18 19]
[20 21 22 23]]]
arrayb is: [[[0]
[1]
[2]]
[[3]
[4]
[5]]]
after broadcast, array is: [[[ 0 1 2 3]
[ 3 4 5 6]
[ 6 7 8 9]]
[[ 9 10 11 12]
[12 13 14 15]
[15 16 17 18]]]
```
上面的两个方法则分别是在shape[1]与shape[2]的维度上进行广播
## 4.总结
结合上面的实例,我们总结一下广播的基本原则:
1.两个数组从末尾开始进行维度的比较。
2.如果维度相等或者其中有一个值为1则认为可以广播。
3.维度如果缺失,可以忽略。
4.广播是在缺失的维度或者为1的维度上进行的。

View File

@ -0,0 +1,424 @@
## 1.ALS简介
在前面相关的文章中已经详细介绍了ALS的原理。用最简单的一句话总结就是ALS是通过将user与item分别表示为一个低维稠密向量来进行后续的使用。
## 2.基于spark的ALS实现
首先看一部分辅助代码
```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.TreeSet;
/**
* Created by WangLei on 20-1-10.
*/
public class TimeUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeUtils.class);
public static final String DATE_FORMAT = "yyyyMMdd";
public static final String TIME_FORMAT = "yyyyMMdd HH:mm:ss";
public static final String HOUR_TIME_FORMAT = "yyyyMMdd HH";
public static final long TIME_DAY_MILLISECOND = 86400000;
/**
* timestamp -> ymd
* @param timestamp
* @return
*/
public static String timestamp2Ymd(long timestamp) {
String format = "yyyyMMdd";
return timestamp2Ymd(timestamp, format);
}
public static String timestamp2Ymd(long timestamp, String format) {
SimpleDateFormat sdf;
try {
//支持输入10位的时间戳
if(String.valueOf(timestamp).length() == 10) {
timestamp *= 1000;
}
sdf = new SimpleDateFormat(format);
return sdf.format(new Date(timestamp));
} catch(Exception ex) {
sdf = new SimpleDateFormat(DATE_FORMAT);
try {
return sdf.format(new Date(timestamp));
} catch (Exception e){}
}
return null;
}
public static String timestamp2Hour(long timestamp) {
String time = timestamp2Ymd(timestamp, TIME_FORMAT);
return time.substring(9, 11);
}
/**
* ymd -> Date
* @param ymd
* @return
*/
public static Date ymd2Date(String ymd) {
return ymd2Date(ymd, "yyyyMMdd");
}
public static Date ymd2Date(String ymd, String format) {
try {
SimpleDateFormat sdf = new SimpleDateFormat(format);
return sdf.parse(ymd);
} catch(ParseException ex) {
LOGGER.error("parse ymd to timestamp error!", ex);
} catch (Exception ex) {
LOGGER.error("there is some problem when transfer ymd2Date!", ex);
}
return null;
}
/**
* ymd -> timestamp
* @param ymd
* @return
*/
public static long ymd2timestamp(String ymd) {
return ymd2Date(ymd).getTime();
}
public static String genLastDayStr() {
return timestamp2Ymd(System.currentTimeMillis() + TIME_DAY_MILLISECOND * (-1));
}
/**
* get the datestr before or after the given datestr
* attention transfer the num from int to long
* @param ymd
* @param num
* @return
*/
public static String genDateAfterInterval(String ymd, int num) {
long timestamp = ymd2timestamp(ymd);
long resTimeStamp = timestamp + TIME_DAY_MILLISECOND * Long.valueOf(num);
return timestamp2Ymd(resTimeStamp);
}
public static String genLastDayStr(String ymd) {
return genDateAfterInterval(ymd, -1);
}
public static Set<String> genYmdSet(long beginTs, long endTs) {
TreeSet ymdSet = new TreeSet();
for(long ts = beginTs; ts <= endTs; ts += 86400000L) {
ymdSet.add(timestamp2Ymd(ts));
}
return ymdSet;
}
public static Set<String> genYmdSet(String beginYmd, String endYmd) {
long beginTs = ymd2timestamp(beginYmd);
long endTs = ymd2timestamp(endYmd);
return genYmdSet(beginTs, endTs);
}
/**
* end between begin days
* if begin or end is not number format or end < begin, return Integer.MIN_VALUE
* @param begin
* @param end
* @return
*/
public static int getIntervalBetweenTwoDays(String begin, String end) {
try {
int begintmp = Integer.valueOf(begin), endtmp = Integer.valueOf(end);
if(begintmp > endtmp) {
LOGGER.error("we need end no smaller than end!");
return Integer.MIN_VALUE;
}
Date d1 = ymd2Date(begin);
Date d2 = ymd2Date(end);
Long mils = (d2.getTime() - d1.getTime()) / TIME_DAY_MILLISECOND;
return mils.intValue();
} catch (NumberFormatException numformatex) {
numformatex.printStackTrace();
return Integer.MIN_VALUE;
}
}
}
```
里面包含了很多时间的处理方法,可以直接加入代码库。
```
/**
* Created by WangLei on 20-1-13.
*/
object DateSpec extends Enumeration {
type DateSpec = Value
val YMD , Y_M_D, YMD2 = Value
}
```
HDFS相关的工具类
```
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.DateTime
/**
* Created by WangLei on 20-1-10.
*/
object HDFSUtils {
val conf = new Configuration()
def delete(sc: SparkContext, path: String) = {
FileSystem.get(sc.hadoopConfiguration).delete(new Path(path), true)
}
def isExist(sc: SparkContext, path: String) = {
FileSystem.get(sc.hadoopConfiguration).exists(new Path(path))
}
def checkFileExist(conf: Configuration = conf, FileName: String): Boolean = {
var isExist = false
try {
val hdfs = FileSystem.get(conf)
val path = new Path(FileName)
isExist = hdfs.exists(path)
} catch {
case e: Exception => e.printStackTrace()
}
isExist
}
def latestMidPath(conf: Configuration, basePath: String): Option[String] = {
val today = new Date
latestMidPath(conf, basePath, new DateTime(today.getTime), 7)
}
def latestMidPath(conf: Configuration, basePath: String, ymd: String) : Option[String] = {
val timestamp = TimeUtils.ymd2timestamp(ymd)
latestMidPath(conf, basePath, new DateTime(timestamp), 7, false, DateSpec.YMD2)
}
def latestMidPath(conf: Configuration = conf, basePath: String, date: DateTime, limit: Int,with_success_file:Boolean = true,dateSpec: DateSpec = DateSpec.YMD): Option[String] = {
for (i <- 0 to limit) {
val day = date.minusDays(i)
val path = dateSpec match {
case DateSpec.YMD => basePath + "/date=%04d%02d%02d".format(day.getYear, day.getMonthOfYear, day.getDayOfMonth)
case DateSpec.Y_M_D => basePath + "/year=%04d/month=%02d/day=%02d".format(day.getYear, day.getMonthOfYear, day.getDayOfMonth)
case DateSpec.YMD2 => basePath + "/%04d%02d%02d".format(day.getYear, day.getMonthOfYear, day.getDayOfMonth)
}
if (checkFileExist(conf, if(with_success_file) path + "/_SUCCESS" else path))
return Some(path)
}
None
}
}
```
ALS训练相关代码
```
import org.apache.spark.SparkConf
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
object AlsTraining {
val logger = LoggerFactory.getLogger(this.getClass)
val separator = "\t"
def genUserItemRdd(spark: SparkSession, ymd: String) = {
val baseinput = PathUtils.user_item_click_path
val (yesterday, daybegin) = (TimeUtils.genLastDayStr(ymd), TimeUtils.genDateAfterInterval(ymd, -29))
val days = TimeUtils.genYmdSet(daybegin, yesterday)
// userid itemid clicknum
var rdd = spark.sparkContext.textFile(baseinput + ymd)
.map(x => {
val l = x.split("\t")
(l(0), l(1), l(2))
})
for (day <- days) {
val path = baseinput + day
if (HDFSUtils.isExist(spark.sparkContext, path)) {
val tmp = spark.sparkContext.textFile(path)
.map(x => {
val l = x.split("\t")
(l(0), l(1), l(2))
})
rdd = rdd.union(tmp)
}
}
rdd.cache
}
def genUserItemIndex(spark: SparkSession, ymd: String) = {
val rdd = genUserItemRdd(spark, ymd)
val userindex = rdd.map(x => x._1).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1))
val itemindex = rdd.map(x => x._2).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1))
(userindex, itemindex)
}
case class Rating(userid: Int, itemid: Int, rating: Float)
def trainmodel(spark: SparkSession, ymd: String) = {
import spark.implicits._
val rdd = genUserItemRdd(spark, ymd)
val userindexrdd = rdd.map(x => x._1).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1))
val itemindexrdd = rdd.map(x => x._2).distinct().sortBy(x => x).zipWithIndex().map(x => (x._1, x._2 + 1))
val data = rdd.map(x => {
val (userid, itemid, count) = (x._1, x._2, x._3.toInt)
(userid + separator + itemid, count)
})
.reduceByKey(_ + _)
.map(x => {
val (userid, itemid, count) = (x._1.split(separator)(0), x._1.split(separator)(1), x._2)
(userid, itemid + separator + count)
})
.join(userindexrdd)
.map(x => {
val (itemandcount, userindex) = (x._2._1, x._2._2)
val (itemid, count) = (itemandcount.split(separator)(0), itemandcount.split(separator)(1))
(itemid, userindex + separator + count)
})
.join(itemindexrdd)
.map(x => {
val (userandcount, itemindex) = (x._2._1, x._2._2)
val (userindex, count) = (userandcount.split(separator)(0), userandcount.split(separator)(1))
Rating(userindex.toInt, itemindex.toInt, count.toFloat)
}).toDF()
val Array(training, test) = data.randomSplit(Array(0.8, 0.2))
val als = new ALS().setRank(128).setMaxIter(8).setRegParam(0.01).
setUserCol("userid").setItemCol("itemid").setRatingCol("rating")
val model = als.fit(training)
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
logger.error("root-mean-square error is: {}", rmse)
val userindex2userid = userindexrdd.map(x => (x._2, x._1))
val userfactors = model.userFactors.rdd.map(x => {
val (userid, userfactor) = (x.getInt(0).toLong, x.getList(1).toArray().mkString(","))
(userid, userfactor)
}).join(userindex2userid)
.map(x => {
val (userindex, userfactor, userid) = (x._1, x._2._1, x._2._2)
(userindex, userid, userfactor)
})
.repartition(1)
.sortBy(x => x._1)
.map(x => "%s\t%s\t%s".format(x._1, x._2, x._3))
val itemindex2itemid = itemindexrdd.map(x => (x._2, x._1))
val itemfactors = model.itemFactors.rdd.map(x => {
val (itemid, itemfactor) = (x.getInt(0).toLong, x.getList(1).toArray().mkString(","))
(itemid, itemfactor)
}).join(itemindex2itemid)
.map(x => {
val (itemindex, itemfactor, itemid) = (x._1, x._2._1, x._2._2)
(itemindex, itemid, itemfactor)
})
.repartition(1)
.sortBy(x => x._1)
.map(x => "%s\t%s\t%s".format(x._1, x._2, x._3))
(userfactors, itemfactors)
}
def main(args: Array[String]): Unit = {
val (ymd, operation) = (args(0), args(1))
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.setAppName("user-item-als-training" + ymd)
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
operation match {
case "index" => {
val useroutput = PathUtils.user_index_path + ymd
val itemoutput = PathUtils.item_index_path + ymd
val (userindex, itemindex) = genUserItemIndex(spark, ymd)
userindex.repartition(1).sortBy(_._2).map(x => "%s\t%s".format(x._2, x._2)).saveAsTextFile(useroutput)
itemindex.repartition(1).sortBy(_._2).map(x => "%s\t%s".format(x._2, x._2)).saveAsTextFile(itemoutput)
}
case "model" => {
val (userfactors, itemfactors) = trainmodel(spark, ymd)
val user_embedding_path = PathUtils.user_factor_path + ymd
val item_embedding_path = PathUtils.item_factor_path + ymd
HDFSUtils.delete(spark.sparkContext, user_embedding_path)
HDFSUtils.delete(spark.sparkContext, item_embedding_path)
userfactors.saveAsTextFile(user_embedding_path)
itemfactors.saveAsTextFile(item_embedding_path)
}
}
spark.stop()
}
}
```
## 3.代码分析
```
PathUtils.user_item_click_path
```
这个是输入的数据集包含三个字段userid, itemid, 点击数
```
genUserItemIndex
```
这个方法是针对userid与itemid进行编码注意是分开编码
```
trainmodel
```
这个方法的具体步骤如下:
1.构造训练集
2.得到als对象
3.训练模型 als.fit
4.根据得到的模型进行预测
5.分别得到user向量与item向量。

View File

@ -34,7 +34,7 @@ $$\min _ { j , s } \left[ \min _ { c _ { 1 } } \sum _ { x _ { i } \in R _ { 1 }
2.用选定的(j,s),划分区域,并决定相应的输出
$$\hat{c}\_{m}=average(y_{i}|x_{i} \in R_{m}(j,s))$$
3.对两个子区域重复1,2步骤直到满足终止条件
3.对两个子区域重复1,2步骤直到满足终止条件
4.将输入的空间划分为M个区域$R_1, R_2, ..., R_M$,在每个单元上有固定的输出$c_m$,最终生成决策树
$$f(x) = \sum_{m=1} ^M c_mI, X \in R_m$$