徐志远

推荐算法工程师的学习日常

0%

论文背景

这篇是讲slot filling和intent detection的联合训练。传统的方法slot filling和intent detection是分开做的,会出现error propagation的问题。
所以现代一点的方法主要有下面几种:
joint attention-based rnn (2016)
rnn-lstm (2016)
slot-gated model (2018)
这篇文章同样认同两个任务之间的相关性并且提出SF-ID网络。

模型结构

Integration of context

slot filling:

然后这篇加权的方法和16年那篇一样,是用encoder和decoder一起生成context向量并且最后喂给encoder端的hidden_state进行加权表示,具体可以看16年那篇文章。
intent detection:
similar to slot filling

SF-ID Network

SF-ID Network有SF-First和ID-First两种形式。下面就说其中一种SF-first,另外一种只是把subnet的先后顺序互换而已。

SF subnet
ID subnet
Iteration Mechanism

向量表示的交互和耦合把context对应的向量换成reinforce对应的向量,最后输出softmax分类。

CRF layer

添加转移概率增加label预测的约束和鲁棒性,就比较常规。

Experiment

其中有2个比较有意思的分析:

Analysis of Model mode:

ID-First mode的实验结果在slot filling上更好, SF-First mode的实验结果在intent detection上的效果更好。说白了,哪个先训练就定了整个模型的任务倾向性基调。

Iteration Mechanism:

一定的iteration迭代能够提高训练的效果,但是迭代的次数越多反而就不好了。

counting common friends pseudocode without graphframe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master('local').getOrCreate()
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql import Row

Edgelist = [(1,2),(1,3),(1,4),(2,3),(2,4),(xx,xx)]
graphData= sparkSession.sparkContext.parallelize(Edgelist).map(lambda (src,dst): Row(src,dst))
graphSchemaAB = StructType([StructField('A',IntegerType(),nullable=False),StructField('B',StringType(),nullable=False)])

abDF=sparkSession.createDataFrame(graphData,graphSchemaAB)
graphSchemaBC =StructType([StructField('B',IntegerType(),nullable=False),StructField('C',StringType(),nullable=False)])
bcDF = sparkSession.createDataFrame(graphData,graphSchemaBC)

abDF.show()

joinDF = abDF.join(bcDF,abDF.B == bcDF.B)
joinDF.show()
abcDF.drop('B').groupBy('A','C').count().filter('A=1').show()

graphframe

edges = sparkSession.createDataFrame([(‘xx’,‘xx’,‘friend’),(‘xx’,‘xx’,‘friend’)],[‘src’,‘dst’,‘relationship’])
g = GraphFrame(vertices,edges)

DSL

  1. Edge
  2. Union of Edges
  3. Names:
    identify common elements
    identify names of columns in the result DataFrame
  4. Anonymous edges and vertices
  5. Negation

triangles

spark.sql

pipeline

1
2
3
4
5
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.enableHiveSupport().appName('spark sql').master('local').getOrCreate()
spark_session.sql("""
show databases
""").toPandas()

query and execution

It depends on query. Spark optimizer makes a decision about what it will be.

tempView

RDD vs. DF vs. SQL

DF is faster than RDD
Error will be found at code compilation when using DF

functions

mapping functions

generating functions

aggregating functions

user defined funcitons

time processing

xxx.withColumn(‘unixtime’,f.unix_timestamp(‘time’).limit(5).toPandas())

window functions

window.partitionBy(“ip”).orderBy(“unixtime”)
access_log_ts.select(“ip”,“unixtime”,f.row_number().over(user_window).alias(“count”),f.lag(“unixtime”).over(user_window).alias(“lag”),f.lead(“unixtime”).over(user_window).alias(“lead”)).limit(5).toPandas()

DDL

MetaStore

Metastore provides information for data layout in hdfs.
HDFS is designated for sequential scans while metastore is designed for random reads and updates.
Hive metastore can be used independently from hive framework.

create

create table tablename;

describe

describe table tablename;

delimiter

Hadoop Mapreduce use tab character as default delimiter.
Hive choose ctrl+A
array ctrl+B
key-value ctrl+C

external table

Data in hdfs will leave unchanged. Metadata will be automatically removed after the Hive Session is closed.

DML

import

load data inpath ‘/local/path/employees-data’ into table employees;

multiple-insert

from employees insert (overwrite) directory ‘path’ select name, salary, address where …

ctas

create table xxx as select xxx,xxx from employees where xx = ‘xx’

Phase

select from: choose from existing columns, could be done with a map phase
where: a map phase
group by: shuffle/sort phase + having: reduce
join: map/reduce side
order by/sort by: reduce

Hive optimization

level partitioning

create table partitioned by (xx,xx,xx)

  1. partitioned columns go at the end
  2. partitioned columns order is important
  3. use configuration parameters
  4. control empty partitions

set hive.enforce.bucketing=True

map-side join

use power of two

skewed data problem

skewed by user_id on (‘user_id1’,‘unknown’)

REGEX

regex serde

create table xx (xx,xx) ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.RegexSerDe’ with serdeproperties (“input.regex” = ‘xxxx’) location ‘xxxx’

regexp_extract

regex optional

  1. re.match: from the start of a string
  2. re.search: not necessary from the start of a string
  3. re.IGNORECASE
  4. capture group: parenthesis have a special meaning in a regular expression language, can be used to retrieve the content of much later on.
  5. groups: locate only capturing groups
  6. non-capturing groups:
    example: re.search(“really (?: good|nice)”, …)
  7. some special regex expressions:
    \d all digits
    \D no digits
    \w all alphanumeric characters
    \W no alphanumeric characters
    \s all whitespace characters
    \S no whitespace characters
    . any single character
    ^ beginning of string
    $ end of string

view

view is a read-only table generated on the fly than necessary.
create view xxx (
xxx,
xxx
)
as select xxx, xxx from xxx;

view limitations:
  1. read-only
  2. extra cpu cycles, but some optimization exists
  3. meta information fluctuation

function

functions: udf = user defined functions
aggregate functions: udaf
table-generating functions: udtf

show functions

describe functions

udf: map phase

udaf: reduce phase

udtf: both scenarios

explode:

select explode(direct reports) as employee from management

lateral view: merge output from udtf

select manager_name, employee from management lateral view explode(direct reports) lateral_table as employee

streaming

hive window functions

SELECT xx, ROW_NUMBER() OVER (PARTITION BY column_C) from table_name;
SELECT xx, RANK() OVER (PARTITION BY column_C) from table_name;
SELECT xx, DENSE_RANK() OVER (PARTITION BY column_C) from table_name;

same windows

SELECT column_A, ROW_NUMBER() OVER (PARTITION BY column_C), RANK() OVER (PARTITION BY column_C), DENSE_RANK() OVER (PARTITION BY column_C) FROM table_name; = SELECT column_A, ROW_NUMBER() OVER w, RANK() OVER w, DENSE_RANK() OVER w FROM table_name WINDOW w AS (PARTITION BY column_C);

different windows

SELECT column_A, ROW_NUMBER() OVER (PARTITION BY column_C), RANK() OVER (PARTITION BY column_D), DENSE_RANK() over (
PARITION BY column_E) FROM table_name;

论文背景

现实世界中很多数据特征是缺失的。所以本文希望做半监督学习来一般填充缺失值,一边做点击率的预测任务。

模型结构

图学习模块

Embedding Fusion Layer

这里的符号比较复杂,要先搞清楚一些定义。(对我来说有点混乱)

$ P \in R^{d \times M} $ 用户id的embedding矩阵

$ Q \in R^{d \times N} $ 物品id的embedding矩阵

在第 $ l $ 轮迭代的时候,近似的用户和物品属性矩阵是 $ X^l \in R^{d_x \times M} $ , $ Y^l \in R^{d_y \times N} $ 。
然后第 $ l $ 层的向量是由前一层更新过来的。使用时和属性向量和id embedding向量和在一起用。
$ u_a^{l,0}=[p_a, x_a^l \times W_u] $
$ v_i^{l,0}=[q_i, y_i^l \times W_v] $
第一轮的属性向量就由加权平均填充。

Embedding propagation Layer

Attribute update module

Prediction Part
Attribute Update Part
Loss

inference loss + recommendation loss

实验结果

总结

其实我在看这篇文章的时候一直在找inference能提升效果的原因分析,但是感觉作者介绍的不是很清楚。总的来讲思路有点像EM,涉及到的问题也是工业界比较棘手的问题。

论文背景

我一直觉得论文背景其实比之后的模型结构重要的多。因为这很体现文章的价值,比如做了什么,为什么这么做。这篇文章主要涉及的是多兴趣向量的可控性问题。

模型结构

模型结构的一些部件都很常用,感觉也没什么值得多说的。

Multi-Interest Extractino

Dynamic Routing

Self Attentive Method

Aggregation Module

$ f(u,i)=max({e_i^T} v_u^{(k)}) $

$ 1<=k<=K $

$ Q(u,S) = \Sigma_{i \in S} f(u,i) + \lambda \Sigma_{i \in S}\Sigma_{j \in S} g(i,j) $

评价标准

Recall
Hitrate
NDCG

Controllable Study

总结

显式更改loss得以控制精度和多样性。思想很简单,效果要检验一下才知道。

论文背景

作者针对过往的负采样技术进行总结,提出了新的框架KGPolicy。一般有两种负采样,静态负采样和自适应负采样。

静态采样

静态采样一般选取均匀分布或者按照热度(流行度)负采样。缺点是采样的负样本独立于模型,对模型造成不了什么影响。

自适应负采样

自适应负采样会更加关注难样本。因为这些样本会给模型带来更大的价值。但是这些假设都基于历史。被选取的样本可能在未来出现,所以会损害模型性能。

近期方法

近期方法一般都选取综合的评价,比如展现未点击,点击未转化来加强负采样的有效程度。但依然需要更有效的措施来进行负采样。

模型结构

一般建模的目标都是

$ y_{ui}=f_R(u,i)=r_u^Tr_i $,

根据过去的一些研究,这篇文章使用了BPR损失函数。

然后这里还涉及到负样本的信息性,衡量标准是梯度的幅值。$ \nabla_{u,i,j} = 1-\sigma(f_R(u,i)-f_R(u,j)) $。所以好的负样本要使得这个数值尽量大。

负样本选取规则

首先定义原子路径:
$ i \rightarrow e^{`} \rightarrow j $
其中 $ i $ 是和 $ u $ 发生过交互关系的,$ j $ 是路径和还没和 $ u $ 发生过交互关系的。
这样做有两点好处:

  1. 因为 $ i $ 和 $ j $ 都依赖于同样的实体,那么其实它们更加可能相近,所以能提供的负样本信息性更大。
  2. 可以反映出用户的真实兴趣。说明有足够理由相信用户确实不太对$ j $感兴趣。同样可以继续做路径扩展增加负样本的置信度。

强化学习

强化学习涉及到几个关键问题:

  1. 动作定义:
    $ a_t = (e_t \rightarrow e_t^{`} \rightarrow e_{t+1} ) $
  2. 状态动态转移:
    $ P(s_{t+1} = (u,e_{t+1}) | s_t = (u,e_t),a_t = (e_t \rightarrow e_t^{`} \rightarrow e_{t+1}) )=1 $
  3. 奖励函数:
    预测奖励:$ f_R(u,e_t) ,考虑预测奖励是为了获得更大信息度的样本。 相似奖励: g_R(i,e_t) $,考虑相似奖励是为了获得反映用户真实兴趣爱好的样本,表明用户确实不喜欢。
    最后总的奖励 = 相似奖励 + 预测奖励
  4. 目标函数:

知识图策略网

GraphSage获取节点特征
邻居注意力模块

$ P(a_t|s_t) = P((e_t,e_t^{`})|s_t) * P((e_t^{`},e_{t+1}) | s_t, (e_t,e_t^{`})) $
这个模块由两部分注意力模块构成,分别是(1)知识图谱的邻居注意力和(2)物品的邻居注意力
(1)

(2)

邻居剪枝

为了减少不必要的探索并且保证样本的效果,提出了一个新的策略,具体是

  1. 先从节点的邻居降采样或者过采样得到一个集合的子集
  2. 然后从全空间负采样得到另外一些节点保证多样性
  3. 根据内积打分函数选取与节点最相似的几个节点作为返回结果
    这样既降低了时间复杂度也保证了效果

优化

推荐器的优化

固定采样器参数,优化推荐器

采样器的优化

老套路,上reinforce算法做策略梯度

假阴性问题

论文中就说知识图谱的鲁棒性更加高一点,虽然这个问题确实无法避免。

KGPolicy实验

实验为了说明几个问题

  1. 和现在的主流方法们比结果怎么样
  1. KGPolicy的参数影响
  1. 负样本的深层次分析

总结

这篇文章其实还是有点东西的。读一读类似的文章对自己还是会有帮助的。

接下来一段时间可能会主要关注图学习在推荐系统中的运用。先来一篇何老师的NGCF。

论文背景

传统的协同过滤主要学习两点。

  1. 学习物品和用户的表示。
  2. 学习物品和用户的交互。
    但是它们都缺乏显式的协同信号。解决这两个问题的办法就是探究高阶的连接交互。虽然近期Hop-Rec也提出类似的概念,但是仅仅丰富了训练数据,最后交互依然用的MF。所以它们是本质不同的。

模型结构

模型结构比较容易。

user embeddings and item embeddings

模型结构图

First order propagation

$ m_{u \leftarrow i} = f(e_i,e_u,p_{ui}) $

其中文中给的$ f $:

$ m_{u \leftarrow i} = \frac{1}{\sqrt(|N_u| )\sqrt(|N_i| )}(W_1e_i+W_2(e_i \odot e_u)) $

Message Aggregation

$ e_u^{(1)} = LeakyRelu(m_{u \leftarrow u} + \Sigma_{i \in N_u} m_{u \leftarrow i}) $

Higher order propagation

和First order propagation类似

Model prediction

$ y_{NGCF}(u,i) = e_u ^T e_i $

Optimization

$ Loss = \Sigma_{(u,i,j) \in O } -ln \sigma(\hat{y_{u,i}} - \hat{y_{u,j}} ) + \lambda || \theta||^2_2 $

Model size

用了很少的额外空间就达到了高阶的连接。

Message and Node dropout

node dropout能够提高泛化能力。

实验效果

总的来看,实验结果还是挺不错的。但是依据历史发展规律来看,还有很多可以改进。不然也不会出lightgcn了哈哈。

代码

Github

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import torch
import torch.nn as nn
from torch.nn import Module
from scipy.sparse import coo_matrix
from scipy.sparse import vstack
from scipy import sparse
import numpy as np


class SVD(Module):

def __init__(self,userNum,itemNum,dim):
super(SVD, self).__init__()
self.uEmbd = nn.Embedding(userNum,dim)
self.iEmbd = nn.Embedding(itemNum,dim)
self.uBias = nn.Embedding(userNum,1)
self.iBias = nn.Embedding(itemNum,1)
self.overAllBias = nn.Parameter(torch.Tensor([0]))

def forward(self, userIdx,itemIdx):
uembd = self.uEmbd(userIdx)
iembd = self.iEmbd(itemIdx)
ubias = self.uBias(userIdx)
ibias = self.iBias(itemIdx)

biases = ubias + ibias + self.overAllBias
prediction = torch.sum(torch.mul(uembd,iembd),dim=1) + biases.flatten()

return prediction

class NCF(Module):

def __init__(self,userNum,itemNum,dim,layers=[128,64,32,8]):
super(NCF, self).__init__()
self.uEmbd = nn.Embedding(userNum,dim)
self.iEmbd = nn.Embedding(itemNum,dim)
self.fc_layers = torch.nn.ModuleList()
self.finalLayer = torch.nn.Linear(layers[-1],1)

for From,To in zip(layers[:-1],layers[1:]):
self.fc_layers.append(nn.Linear(From,To))

def forward(self, userIdx,itemIdx):
uembd = self.uEmbd(userIdx)
iembd = self.iEmbd(itemIdx)
embd = torch.cat([uembd, iembd], dim=1)
x = embd
for l in self.fc_layers:
x = l(x)
x = nn.ReLU()(x)

prediction = self.finalLayer(x)
return prediction.flatten()


class GNNLayer(Module):

def __init__(self,inF,outF):

super(GNNLayer,self).__init__()
self.inF = inF
self.outF = outF
self.linear = torch.nn.Linear(in_features=inF,out_features=outF)
self.interActTransform = torch.nn.Linear(in_features=inF,out_features=outF)

def forward(self, laplacianMat,selfLoop,features):
# for GCF ajdMat is a (N+M) by (N+M) mat
# laplacianMat L = D^-1(A)D^-1 # 拉普拉斯矩阵
L1 = laplacianMat + selfLoop
L2 = laplacianMat.cuda()
L1 = L1.cuda()
inter_feature = torch.sparse.mm(L2,features)
inter_feature = torch.mul(inter_feature,features)

inter_part1 = self.linear(torch.sparse.mm(L1,features))
inter_part2 = self.interActTransform(torch.sparse.mm(L2,inter_feature))

return inter_part1+inter_part2

class GCF(Module):

def __init__(self,userNum,itemNum,rt,embedSize=100,layers=[100,80,50],useCuda=True):

super(GCF,self).__init__()
self.useCuda = useCuda
self.userNum = userNum
self.itemNum = itemNum
self.uEmbd = nn.Embedding(userNum,embedSize)
self.iEmbd = nn.Embedding(itemNum,embedSize)
self.GNNlayers = torch.nn.ModuleList()
self.LaplacianMat = self.buildLaplacianMat(rt) # sparse format
self.leakyRelu = nn.LeakyReLU()
self.selfLoop = self.getSparseEye(self.userNum+self.itemNum)

self.transForm1 = nn.Linear(in_features=layers[-1]*(len(layers))*2,out_features=64)
self.transForm2 = nn.Linear(in_features=64,out_features=32)
self.transForm3 = nn.Linear(in_features=32,out_features=1)

for From,To in zip(layers[:-1],layers[1:]):
self.GNNlayers.append(GNNLayer(From,To))

def getSparseEye(self,num):
i = torch.LongTensor([[k for k in range(0,num)],[j for j in range(0,num)]])
val = torch.FloatTensor([1]*num)
return torch.sparse.FloatTensor(i,val)

def buildLaplacianMat(self,rt):

rt_item = rt['itemId'] + self.userNum
uiMat = coo_matrix((rt['rating'], (rt['userId'], rt['itemId'])))

uiMat_upperPart = coo_matrix((rt['rating'], (rt['userId'], rt_item)))
uiMat = uiMat.transpose()
uiMat.resize((self.itemNum, self.userNum + self.itemNum))

A = sparse.vstack([uiMat_upperPart,uiMat])
selfLoop = sparse.eye(self.userNum+self.itemNum)
sumArr = (A>0).sum(axis=1)
diag = list(np.array(sumArr.flatten())[0])
diag = np.power(diag,-0.5)
D = sparse.diags(diag)
L = D * A * D
L = sparse.coo_matrix(L)
row = L.row
col = L.col
i = torch.LongTensor([row,col])
data = torch.FloatTensor(L.data)
SparseL = torch.sparse.FloatTensor(i,data)
return SparseL

def getFeatureMat(self):
uidx = torch.LongTensor([i for i in range(self.userNum)])
iidx = torch.LongTensor([i for i in range(self.itemNum)])
if self.useCuda == True:
uidx = uidx.cuda()
iidx = iidx.cuda()

userEmbd = self.uEmbd(uidx)
itemEmbd = self.iEmbd(iidx)
features = torch.cat([userEmbd,itemEmbd],dim=0)
return features

def forward(self,userIdx,itemIdx):

itemIdx = itemIdx + self.userNum
userIdx = list(userIdx.cpu().data)
itemIdx = list(itemIdx.cpu().data)
# gcf data propagation
features = self.getFeatureMat()
finalEmbd = features.clone()
for gnn in self.GNNlayers:
features = gnn(self.LaplacianMat,self.selfLoop,features)
features = nn.ReLU()(features)
finalEmbd = torch.cat([finalEmbd,features.clone()],dim=1)

userEmbd = finalEmbd[userIdx]
itemEmbd = finalEmbd[itemIdx]
embd = torch.cat([userEmbd,itemEmbd],dim=1)

embd = nn.ReLU()(self.transForm1(embd))
embd = self.transForm2(embd)
embd = self.transForm3(embd)
prediction = embd.flatten()

return prediction