add files

master
wanglei 2021-01-12 22:31:30 +08:00
parent 5385394405
commit c95e3dd2dd
5 changed files with 478 additions and 0 deletions

View File

@ -0,0 +1,15 @@
hive sql 中有时间戳转日期的函数
from_unixtime(BIGINT unixtime [, STRING format])
这里面的unixtime输入单位是秒也就一个十位的BIGINT。但是我们实际中用的时间戳一般都是13位的到毫秒时间戳如果直接将该时间戳输入方法中会有错误。
如果是13位时间戳可以这么使用
```
from_unixtime(cast(timestamp/1000 as bigint)) as xxx_time
```
timestamp/1000 是一个double类型直接cast变成bigint就可以了。
当然还可以在方法后面填入可选参数为日期格式。

View File

@ -0,0 +1,85 @@
## 1.遇到的问题
在实际分析数据过程中,需要拉取最近一年的数据进行统计,并且一年的数据按天分区。
```
val ymdSet = TimeUtils.genYmdSet(beginYmd, endYmd) // 获取过去一年时间的日期
var rdd = SparkIo.readThriftParquetFile(spark.sparkContext, pathxxx, classOf[xxx])
for(eachYmd <- ymdSet) {
val tmppath = PathUtils.xxx + eachYmd
val tmprdd = SparkIo.readThriftParquetFile(spark.sparkContext, tmppath, classOf[xxx])
rdd = rdd.union(tmprdd)
}
rdd
```
上面的代码逻辑比较清晰按照每天的数据生成一个临时的rdd然后将该rdd不停union到最初的rdd上得到最终一年的数据。
当只选择过去7天的数据进行分析的时候上面的代码没有问题可以正常运行。当代码读取的数据变为过去一整年时会抛出异常
```
ERROR executor.Executor: Exception in task 28.0 in stage 0.0 (TID 28)
java.lang.StackOverflowError
at java.lang.Exception.<init>(Exception.java:102)
at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:89)
at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
at sun.reflect.GeneratedSerializationConstructorAccessor13.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:967)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
......
```
## 2.原因分析
从异常来看是使用java.io.ObjectInputStream序列化的时候出现了死循环导致。
结合前面的现象7天数据的时候没问题而一年的数据会有异常主要是一年的数据文件量太大导致栈空间不足。不停的union过程导致了rdd的lineage太长最终导致栈空间的不足。因为每执行一次union操作就会给lineage的步长加1。
## 3.解决方案
既然定位到了问题,那解决方案就出来了,无非是两种方式
1.加大栈空间。
2.减少lineage的长度。
加大栈空间是个治标不治本的方案因为集群的资源始终是有限的而且一次处理太大的数据始终是个隐患所以最终采取了第二种方案减少lineage长度。
具体实施也比较简单
```
def genrdd(startYmd: String, endYmd: String) = {
val ymdSet = TimeUtils.genYmdSet(beginYmd, endYmd) // 获取过去一段时间的日期
var rdd = SparkIo.readThriftParquetFile(spark.sparkContext, pathxxx, classOf[xxx])
for(eachYmd <- ymdSet) {
val tmppath = PathUtils.xxx + eachYmd
val tmprdd = SparkIo.readThriftParquetFile(spark.sparkContext,tmppath,classOf[xxx])
rdd = rdd.union(tmprdd)
}
rdd
}
```
首先将生成rdd的逻辑封装成一个方法方法的参数为起止时间。
然后将一年的时间段拆开比如拆成4段每段3个月分别得到起止时间。
最后将该方法调用4次最后union到一起就可以成功将一年的数据合并。

View File

@ -0,0 +1,169 @@
## 1.loc用法
loc是基于行的index可以选择特定的行同时还可以根据列名称选定指定列。
iloc是基于行/列的位置(position)来进行选择
```
def select_test():
a = [i for i in range(10)]
b = [2*x + 0.1 for x in a]
data = {"x": a, "y": b}
tmp = pd.DataFrame(data, index=["r1", "r2", "r3", "r4", "r5", "r6", "r7", "r8", "r9", "r10"])
print(tmp.index)
print(tmp.columns)
print()
return tmp
```
方法的输出结果为
```
Index(['r1', 'r2', 'r3', 'r4', 'r5', 'r6', 'r7', 'r8', 'r9', 'r10'], dtype='object')
Index(['x', 'y'], dtype='object')
```
用loc方法选择第一行
```
tmp = select_test()
print(tmp.loc["r1"])
```
输出结果
```
x 0.0
y 0.1
Name: r1, dtype: float64
```
用loc方法选择前三行并只选择x列
```
print(tmp.loc[["r1", "r2", "r3"], "x"])
```
```
r1 0
r2 1
r3 2
Name: x, dtype: int64
```
如果用loc[1]方法,会报错
```
TypeError: cannot do label indexing on <class 'pandas.core.indexes.base.Index'> with these indexers [1] of <class 'int'>
```
## 2.iloc用法
选择前五行
```
print(tmp.iloc[0:5])
```
```
x y
r1 0 0.1
r2 1 2.1
r3 2 4.1
r4 3 6.1
r5 4 8.1
```
选择前五行的第二列(第一列的索引为0):
```
print(tmp.iloc[0:5, 1:2])
```
```
y
r1 0.1
r2 2.1
r3 4.1
r4 6.1
r5 8.1
```
```
print(tmp.iloc[0:5, "x"])
```
上面这行代码会报错:
```
ValueError: Location based indexing can only have [integer, integer slice (START point is INCLUDED, END point is EXCLUDED), listlike of integers, boolean array] types
```
原因很简单iloc只能用行列的起始位置进行选择不能使用行列名。
## 3.ix
ix在旧版本中是loc与iloc的混合体既支持位置选择也支持列名选择。
在新版本中该方法已经被废弃。个人觉得也应该被废弃。API太灵活带来的必然后果就是代码不规范可读性差。
## 4.索引快速选择
还有快速选择行/列的方式
```
print(tmp[0:5])
print(tmp[['x', 'y']])
```
```
x y
r1 0 0.1
r2 1 2.1
r3 2 4.1
r4 3 6.1
r5 4 8.1
x y
r1 0 0.1
r2 1 2.1
r3 2 4.1
r4 3 6.1
r5 4 8.1
r6 5 10.1
r7 6 12.1
r8 7 14.1
r9 8 16.1
r10 9 18.1
```
其中第一行代码选择前5行数据第二行代码选择x,y两列数据。
## 5.at/iat方法
at可以根据行index及列名快速选择dataframe中的某个元素
```
print(tmp.at["r3", "x"])
```
输出为
```
2
```
如果使用如下代码会报错
```
print(tmp.at[3, "x"])
```
```
ValueError: At based indexing on an non-integer index can only have non-integer indexers
```
与iloc类似iat也是通过位置(position)定位元素
```
print(tmp.iat[0, 0])
```
输出为
```
0
```

View File

@ -0,0 +1,65 @@
## 1.计算均值
```
import numpy as np
a = [5, 6, 16, 9]
print(np.mean(a))
```
最后结果
```
9.0
```
np.mean方法即可求均值
## 2.计算方差
```
var = np.var(a)
print(var)
```
输出结果
```
18.5
```
如果我们模拟一下计算方差的过程
```
var2 = [math.pow(x-np.mean(a), 2) for x in a]
print(np.mean(var2))
```
输出结果
```
18.5
```
np.var计算的是整体方差如果想要计算样本方差即除数的分母为N-1可以指定ddof参数
```
sample_var = np.var(a, ddof=1)
print(sample_var)
```
输出结果为
```
24.666666666666668
```
## 3.计算标准差
```
std = np.std(a)
std2 = np.std(a, ddof=1)
print(std)
print(std2)
```
std函数计算的是整体标准差。跟var函数一样如果指定ddof=1计算的是样本标准差。

View File

@ -0,0 +1,144 @@
## 1.均值,方差
假设有个数组一共n个元素那么均值方差的计算公式为
均值:
$$\bar x = \frac{1}{n} \sum_{i=1}^n x_i$$
方差(整体方差):
$$var = \frac{1}{n} \sum_{i=1}^n (x_i - \bar x) ^ 2$$
方差(样本方差):
$$S = \frac{1}{n-1} \sum_{i=1}^n (x_i - \bar x) ^ 2$$
其中整体方差除的是n样本方差除的是n-1。
## 2.协方差与皮尔逊系数
协方差Covariance用于衡量两个随机变量的联合变化程度。所以计算协方差的时候需要输入两个变量。而方差是协方差的一种特殊情况即变量与自身的协方差。
$$
cov(X, Y) = \frac{\sum_{i=1}^n(x_i - \bar x)(y_i - \bar y )}{n-1}
$$
如果分母上除以的是n-1那么计算的是样本的协方差。如果除以的是n计算的是整体协方差。
两个变量之间的皮尔逊相关系数为两个变量之间的协方差与标准差的商:
$$\rho_{x,y} = \frac{cov(X,Y)}{\sigma_x \sigma_y} = \frac{E[(X-\mu_x)(X-\mu_y)]}{\sigma_x \sigma_y}$$
样本的皮尔逊系数,经常用字母$r$表示:
$${\displaystyle r={\frac {\sum _{i=1}^{n}(X_{i}-{\overline {X}})(Y_{i}-{\overline {Y}})}{{\sqrt {\sum _{i=1}^{n}(X_{i}-{\overline {X}})^{2}}}{\sqrt {\sum _{i=1}^{n}(Y_{i}-{\overline {Y}})^{2}}}}}}$$
皮尔逊系数r是一个衡量线性独立的无量纲数其取值在[1, 1]之间。如果r=1表示完全线性相关。如果r=-1表示完全线性负相关。r=0即cov值为0说明两个变量不相关或者更准确地说叫作“线性无关”、“线性不相关”表明X 与Y 两随机变量之间没有线性相关性并非表示它们之间一定没有任何内在的非线性函数关系X、Y二者并不一定是统计独立。
## 3.代码实现
前面说了这么多理论,接下来我们看具体实现。
```
import numpy as np
import math
import pandas as pd
def t1():
a = [5, 6, 16, 9]
print(np.mean(a))
print()
# 整体方差除以n
var = np.var(a)
print(var)
# 自己计算方差
var2 = [math.pow(x-np.mean(a), 2) for x in a]
print(np.mean(var2))
print()
# 样本方差除以n-1
sample_var = np.var(a, ddof=1)
print(sample_var)
print()
# 标准差
std = np.std(a)
std2 = np.std(a, ddof=1)
print(std)
print(std2)
t1()
```
输出结果:
```
9.0
18.5
18.5
24.666666666666668
4.301162633521313
4.96655480858378
```
下面再看看皮尔逊系数的计算
```
def t2():
a = [i for i in range(10)]
b = [2*x + 0.1 for x in a]
dic = {"x": a, "y": b}
df = pd.DataFrame(dic)
print(df)
print()
print("dfcorr is: ", df.corr())
# 手动计算pearson系数
n = len(a)
abar = sum(a)/float(len(a))
bbar = sum(b)/float(len(b))
covab = sum([(x-abar)*(y-bbar) for (x, y) in zip(a, b)]) / n
stda = math.sqrt(sum([math.pow(x-abar, 2) for x in a]) / n)
stdb = math.sqrt(sum([math.pow(y-bbar, 2) for y in b]) / n)
print()
print(covab)
print(stda)
print(stdb)
print()
corrnum = covab / (stda * stdb)
print("corrnum is: ", corrnum)
t2()
```
输出结果
```
x y
0 0 0.1
1 1 2.1
2 2 4.1
3 3 6.1
4 4 8.1
5 5 10.1
6 6 12.1
7 7 14.1
8 8 16.1
9 9 18.1
dfcorr is: x y
x 1.0 1.0
y 1.0 1.0
16.5
2.8722813232690143
5.7445626465380295
corrnum is: 0.9999999999999998
```
x, y分别是两个变量 y = 2x + 0.1两者具有完全的线性相关性所以最后计算出来的pearson系数为1.0。