当今机器学习整套理论都是建立在概率论的基础上,但凡涉及到概率的地方就有随机采样,这篇文章就来介绍一下日常写程序的时候用到的随机采样实现。
最基本的随机采样
如果是不带权重的随机,一般的想法是等概率地将数组打乱即可:
| 1 2 3 4 | import random def sample(in_l: list) -> list:     scored_l = [[item, random.random()] for item in in_l]       return [item for item, _ in sorted(scored_l, key=lambda x:-x[1])] | 
以上是一种等概率打散的实现,就是随机给每个item打一个分,最后按随机给分排序输出,时间复杂度是\(O(nlogn)\).
除此之外也可以采用洗牌算法(Fisher-Yates Shuffle)来降时间复杂度降低到\(O(n)\):
| 1 2 3 4 5 6 | def sample_shuffle(in_l: list) -> list:     for i in range(len(in_l)):         cur_idx = i         swp_idx = i + random.randint(0, len(in_l[i:])-1) # 从当前item开始一直到最后一个item 随机挑选一个来做交换         in_l[cur_idx], in_l[swp_idx] = in_l[swp_idx], in_l[cur_idx]     return in_l | 
加权随机采样
加权随机(Weighted Random Sampling)是一个在机器学习里经常会用到的东西,比如在推荐系统当中,对一些item完成打分过后,如果直接按照打分由高到低的顺序出,就容易导致曝光过于集中于头部,尾部item鲜有曝光机会,这样推荐系统的推荐结果会集中于头部,不利于长期的用户体验。如果基于权重来进行一个随机打散,那么在保证推荐准确率的同时,还能提高推荐效果的多样性。在Efraimidis & Spirakis的Weighted Random Sampling这篇论文里面给出了一个算法,对于每一个item,首先生成一个[0, 1)之间的随机数\(u\),然后每个item的score为\(u^w\), \(w\)就是item对应的权重
| 1 2 3 4 5 6 | from typing import Any, List, Tuple import random import math def weighted_random_sample(in_l: List[List[Tuple[Any, float]]]) -> List[List[Tuple[Any, float]]]:   rescored_l = [[x[0], math.pow(random.random(), 1/x[1])] for x in in_l]   return sorted(rescored_l, key=lambda x:x[1], reverse=True) | 
首先可以用一个简单粗暴的方法来测一下,假设每次只取采样结果的第一个:
| 1 2 3 4 5 6 7 | test_l = [["a", 0.5], ["b", 0.2], ["c", 0.3]] stat = {"a": 0, "b": 0, "c": 0} for i in range(1000):   chosen_one = weighted_random_sample(test_l)[0][0]   stat[chosen_one] += 1 print(stat) | 
输出结果是
| 1 | {'a': 496, 'b': 195, 'c': 309} | 
可以看见 “a”, “b”, “c” 被采样到概率比等于\(496:195:309 \approx 0.5:0.2:0.3\)
以下是这个算法的证明:
首先假设只有两个item,权重分别为\(w_1, w_2\),则我们期望生成两个随机数\(M_1, M_2\),其中的\(M_1,M_2 \sim D_w\),\(D_w\)则由\(w_1, w_2\)来定义,用公式可以描述成\(p(M_1<M_2)=\frac{w_2}{w_1+w_2}\)。
对于概率分布\(D_w\),其对应累计分布函数等于\(F_w(m)=m^w\),至于这个等式为什么可以成立,可以看接下来的证明。概率密度函数\(f_w(m)=\frac{\mathrm{d}}{\mathrm{d}m}F_w(m)=wm^{w-1}\),再定义指示函数\(\mathbb I_{m_1<m_2}\)在\(m_1 < m_2\)的时候等于1否则为0,则:
\(\begin{aligned}p(M_1<M_2) &= \int_{m_2=0}^{1}\int_{m_1=0}^{1}\mathbb I_{m_1<m_2}f_{w_1}(m_1)f_{w_2}(m_1)\mathrm{d}m_1\mathrm{d}m_2 \\ &= \int_{m_2=0}^{1}\int_{m_1=0}^{m_2}f_{w_2}(m_2)f_{w_1}(m_1)\mathrm{d}m_1\mathrm{d}m_2 \\ &= \int_{m_2=0}^{1}f_{w_2}(m_2) \bigg [m_1^{w_1} \bigg ]^{m_2}_{m_1=0}\mathrm{d}m_2 \\ &= \int_{m_2=0}^{1}w_2m_2^{w_2-1} \mathrm{d}m_2 \\ &=\frac{w_2 \cdot m_2^{w_1+w_2}}{w_1+w_2} \bigg |^1_{m_2=0} \\ &= \frac{w_2}{w_1+w_2} \end{aligned}\)得证,故\(F_w(m)=m^w\)。
所以对于任意\(X \sim D_w\)
\(p(X< \alpha) = F_w(\alpha)= \alpha^w\)定义另外一个变量\(Y\) ,\(Y= \sqrt[w]{R}\) 其中\( R \sim U(0, 1)\),\(U\)即正态分布,那么:
\(p(Y < \alpha) = p(\sqrt[w]{R} < \alpha) = p( R < \alpha^w) = F_U^{(\alpha^w)} = \alpha^w\)所以\(Y\)也服从\(D_w\)分布,即对于每一个item我们根据\(m=r^{\frac{1}{w}}\)来进行加权随机采样,每一个item的分布是服从\(D_w\)的。
在我们加权随机采样的时候,通常只关注的是item之间的偏序关系,具体的分值并不重要,在用\(m=r^{\frac{1}{w}}\)这个公式的时候\(\frac{1}{w}\)的值可能非常大,而\(r \sim [0, 1)\),所以最终的权重值可能会非常大从而超出变量的精度范围,如下所示:
| 1 2 3 4 | >>> from random import random >>> m = lambda w: random() ** (1.0/w) >>> m(0.002), m(0.001) (0.0, 0.0) | 
对此我们可以对公式取一个log,即\( m =\frac{1}{w}log(r)\),这样一般就不会出现溢出的情况了,最终实现如下:
| 1 2 3 4 5 6 7 | from typing import Any, List, Tuple import random import math def weighted_random_sample(in_l: List[List[Tuple[Any, float]]]) -> List[List[Tuple[Any, float]]]:   rescored_l = [[x[0], (1/x[1])* math.log(random.random())] for x in in_l]   return sorted(rescored_l, key=lambda x:x[1], reverse=True) | 
时间复杂度为\(O(n)\),空间复杂度也为\(O(n)\).
别名方法采样
如果期望按照指定分布从m个item中随机采样出k(k>1)个结果,那么刚才介绍的算法基本上就是最快的一种实现了,不过如果只需要采样1个结果,最快的时间复杂度是多少?答案是\(O(1)\),接下来就介绍一下更快的别名方法采样(Alias Method Sampling)算法:
- 假设总共有\(N\)个item,第i个item对应的权重是\(w_i\)。对item的权重进行归一化操作,然后用坐标轴的上的矩形来表示,对于每个item转换过后的矩形面积等于\(\frac{w_i*N}{\sum_{k=N}^{k=1}w_k}\),如下图中的1所示;
- 将面积大于1的item多出来的面积补充到面积小于1的item中,确保每一个小方格的面积为1,同时保证每一个方格最多存储两个item,如下图中的2,3,4描述;
- 生成两个字典,一个accept[i]:表示第i个item占第i列矩形的面积比例,另外一个字典alias[i]则表示第i列中不是第i个item的另外一个item的编号;
- 在采样的时候,先生成一个随机正整数\(i \in [0, N)\),再生成一个随机数\( r \sim Unif(0, 1)\) ,若果\(r < accept[i] \) 则返回第i个item,否则的话返回第alias[i]个item。

可以看出第i个item被选中的概率\(p(i)=\frac{1}{N}\sum_{k=1}^{k=N}A_ik = \frac{w_i}{\sum_{k=1}^{k=N}w_k}\),其中\(A_ik\)是第i个item在第k个矩形中占据的面积。因为每一次采样只需要生成两个随机数,所以时间复杂度 \(O(1)\),空间复杂度\(O(n)\)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | from typing import Any, List, Tuple import random def build_alias_table(p: List[float]) -> (List[float], List[int]):     sum_p = sum(p)     l_sz = len(p)     accept = [0.0]*l_sz     alias = [0]*l_sz     small, large = [], []     area_ratio = [ l_sz*x/sum_p for x in p]     for i, prob in enumerate(area_ratio):         if prob < 1.0:             small.append(i)         else:             large.append(i)     while small and large:         small_idx, large_idx = small.pop(), large.pop()         accept[small_idx] = area_ratio[small_idx]         alias[small_idx] = large_idx         area_ratio[large_idx] = area_ratio[large_idx] - (1-area_ratio[small_idx])         if area_ratio[large_idx] < 1.0:             small.append(large_idx)         else:             large.append(large_idx)     while large:         large_idx = large.pop()         accept[large_idx] = 1     while small:         small_idx = small.pop()         accept[small_idx] = 1     return accept, alias def alias_method_sampling(in_l: List[List[Tuple[Any, float]]], accept: List[float], alias:List[int]) -> List[List[Tuple[Any, float]]]:     N = len(in_l)     i = int(random.random()*N)     r = random.random()     if r < accept[i]:         return in_l[i]     else:         return in_l[alias[i]] if __name__ == "__main__":     test_l = [["a", 0.5], ["b", 0.2], ["c", 0.3]]     accept, alias = build_alias_table([x[1] for x in test_l])     stat = {"a": 0, "b": 0, "c": 0}     for i in range(1000):         chosen_one = alias_method_sampling(test_l, accept, alias)[0]         stat[chosen_one] += 1 |