当今机器学习整套理论都是建立在概率论的基础上,但凡涉及到概率的地方就有随机采样,这篇文章就来介绍一下日常写程序的时候用到的随机采样实现。
最基本的随机采样
如果是不带权重的随机,一般的想法是等概率地将数组打乱即可:
1 2 3 4 |
import random def sample(in_l: list) -> list: scored_l = [[item, random.random()] for item in in_l] return [item for item, _ in sorted(scored_l, key=lambda x:-x[1])] |
以上是一种等概率打散的实现,就是随机给每个item打一个分,最后按随机给分排序输出,时间复杂度是\(O(nlogn)\).
除此之外也可以采用洗牌算法(Fisher-Yates Shuffle)来降时间复杂度降低到\(O(n)\):
1 2 3 4 5 6 |
def sample_shuffle(in_l: list) -> list: for i in range(len(in_l)): cur_idx = i swp_idx = i + random.randint(0, len(in_l[i:])-1) # 从当前item开始一直到最后一个item 随机挑选一个来做交换 in_l[cur_idx], in_l[swp_idx] = in_l[swp_idx], in_l[cur_idx] return in_l |
加权随机采样
加权随机(Weighted Random Sampling)是一个在机器学习里经常会用到的东西,比如在推荐系统当中,对一些item完成打分过后,如果直接按照打分由高到低的顺序出,就容易导致曝光过于集中于头部,尾部item鲜有曝光机会,这样推荐系统的推荐结果会集中于头部,不利于长期的用户体验。如果基于权重来进行一个随机打散,那么在保证推荐准确率的同时,还能提高推荐效果的多样性。在Efraimidis & Spirakis的Weighted Random Sampling这篇论文里面给出了一个算法,对于每一个item,首先生成一个[0, 1)之间的随机数\(u\),然后每个item的score为\(u^w\), \(w\)就是item对应的权重
1 2 3 4 5 6 |
from typing import Any, List, Tuple import random import math def weighted_random_sample(in_l: List[List[Tuple[Any, float]]]) -> List[List[Tuple[Any, float]]]: rescored_l = [[x[0], math.pow(random.random(), 1/x[1])] for x in in_l] return sorted(rescored_l, key=lambda x:x[1], reverse=True) |
首先可以用一个简单粗暴的方法来测一下,假设每次只取采样结果的第一个:
1 2 3 4 5 6 7 |
test_l = [["a", 0.5], ["b", 0.2], ["c", 0.3]] stat = {"a": 0, "b": 0, "c": 0} for i in range(1000): chosen_one = weighted_random_sample(test_l)[0][0] stat[chosen_one] += 1 print(stat) |
输出结果是
1 |
{'a': 496, 'b': 195, 'c': 309} |
可以看见 “a”, “b”, “c” 被采样到概率比等于\(496:195:309 \approx 0.5:0.2:0.3\)
以下是这个算法的证明:
首先假设只有两个item,权重分别为\(w_1, w_2\),则我们期望生成两个随机数\(M_1, M_2\),其中的\(M_1,M_2 \sim D_w\),\(D_w\)则由\(w_1, w_2\)来定义,用公式可以描述成\(p(M_1<M_2)=\frac{w_2}{w_1+w_2}\)。
对于概率分布\(D_w\),其对应累计分布函数等于\(F_w(m)=m^w\),至于这个等式为什么可以成立,可以看接下来的证明。概率密度函数\(f_w(m)=\frac{\mathrm{d}}{\mathrm{d}m}F_w(m)=wm^{w-1}\),再定义指示函数\(\mathbb I_{m_1<m_2}\)在\(m_1 < m_2\)的时候等于1否则为0,则:
\(\begin{aligned}p(M_1<M_2) &= \int_{m_2=0}^{1}\int_{m_1=0}^{1}\mathbb I_{m_1<m_2}f_{w_1}(m_1)f_{w_2}(m_1)\mathrm{d}m_1\mathrm{d}m_2 \\ &= \int_{m_2=0}^{1}\int_{m_1=0}^{m_2}f_{w_2}(m_2)f_{w_1}(m_1)\mathrm{d}m_1\mathrm{d}m_2 \\ &= \int_{m_2=0}^{1}f_{w_2}(m_2) \bigg [m_1^{w_1} \bigg ]^{m_2}_{m_1=0}\mathrm{d}m_2 \\ &= \int_{m_2=0}^{1}w_2m_2^{w_2-1} \mathrm{d}m_2 \\ &=\frac{w_2 \cdot m_2^{w_1+w_2}}{w_1+w_2} \bigg |^1_{m_2=0} \\ &= \frac{w_2}{w_1+w_2} \end{aligned}\)得证,故\(F_w(m)=m^w\)。
所以对于任意\(X \sim D_w\)
\(p(X< \alpha) = F_w(\alpha)= \alpha^w\)定义另外一个变量\(Y\) ,\(Y= \sqrt[w]{R}\) 其中\( R \sim U(0, 1)\),\(U\)即正态分布,那么:
\(p(Y < \alpha) = p(\sqrt[w]{R} < \alpha) = p( R < \alpha^w) = F_U^{(\alpha^w)} = \alpha^w\)所以\(Y\)也服从\(D_w\)分布,即对于每一个item我们根据\(m=r^{\frac{1}{w}}\)来进行加权随机采样,每一个item的分布是服从\(D_w\)的。
在我们加权随机采样的时候,通常只关注的是item之间的偏序关系,具体的分值并不重要,在用\(m=r^{\frac{1}{w}}\)这个公式的时候\(\frac{1}{w}\)的值可能非常大,而\(r \sim [0, 1)\),所以最终的权重值可能会非常大从而超出变量的精度范围,如下所示:
1 2 3 4 |
>>> from random import random >>> m = lambda w: random() ** (1.0/w) >>> m(0.002), m(0.001) (0.0, 0.0) |
对此我们可以对公式取一个log,即\( m =\frac{1}{w}log(r)\),这样一般就不会出现溢出的情况了,最终实现如下:
1 2 3 4 5 6 7 |
from typing import Any, List, Tuple import random import math def weighted_random_sample(in_l: List[List[Tuple[Any, float]]]) -> List[List[Tuple[Any, float]]]: rescored_l = [[x[0], (1/x[1])* math.log(random.random())] for x in in_l] return sorted(rescored_l, key=lambda x:x[1], reverse=True) |
时间复杂度为\(O(n)\),空间复杂度也为\(O(n)\).
别名方法采样
如果期望按照指定分布从m个item中随机采样出k(k>1)个结果,那么刚才介绍的算法基本上就是最快的一种实现了,不过如果只需要采样1个结果,最快的时间复杂度是多少?答案是\(O(1)\),接下来就介绍一下更快的别名方法采样(Alias Method Sampling)算法:
- 假设总共有\(N\)个item,第i个item对应的权重是\(w_i\)。对item的权重进行归一化操作,然后用坐标轴的上的矩形来表示,对于每个item转换过后的矩形面积等于\(\frac{w_i*N}{\sum_{k=N}^{k=1}w_k}\),如下图中的1所示;
- 将面积大于1的item多出来的面积补充到面积小于1的item中,确保每一个小方格的面积为1,同时保证每一个方格最多存储两个item,如下图中的2,3,4描述;
- 生成两个字典,一个accept[i]:表示第i个item占第i列矩形的面积比例,另外一个字典alias[i]则表示第i列中不是第i个item的另外一个item的编号;
- 在采样的时候,先生成一个随机正整数\(i \in [0, N)\),再生成一个随机数\( r \sim Unif(0, 1)\) ,若果\(r < accept[i] \) 则返回第i个item,否则的话返回第alias[i]个item。
可以看出第i个item被选中的概率\(p(i)=\frac{1}{N}\sum_{k=1}^{k=N}A_ik = \frac{w_i}{\sum_{k=1}^{k=N}w_k}\),其中\(A_ik\)是第i个item在第k个矩形中占据的面积。因为每一次采样只需要生成两个随机数,所以时间复杂度 \(O(1)\),空间复杂度\(O(n)\)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
from typing import Any, List, Tuple import random def build_alias_table(p: List[float]) -> (List[float], List[int]): sum_p = sum(p) l_sz = len(p) accept = [0.0]*l_sz alias = [0]*l_sz small, large = [], [] area_ratio = [ l_sz*x/sum_p for x in p] for i, prob in enumerate(area_ratio): if prob < 1.0: small.append(i) else: large.append(i) while small and large: small_idx, large_idx = small.pop(), large.pop() accept[small_idx] = area_ratio[small_idx] alias[small_idx] = large_idx area_ratio[large_idx] = area_ratio[large_idx] - (1-area_ratio[small_idx]) if area_ratio[large_idx] < 1.0: small.append(large_idx) else: large.append(large_idx) while large: large_idx = large.pop() accept[large_idx] = 1 while small: small_idx = small.pop() accept[small_idx] = 1 return accept, alias def alias_method_sampling(in_l: List[List[Tuple[Any, float]]], accept: List[float], alias:List[int]) -> List[List[Tuple[Any, float]]]: N = len(in_l) i = int(random.random()*N) r = random.random() if r < accept[i]: return in_l[i] else: return in_l[alias[i]] if __name__ == "__main__": test_l = [["a", 0.5], ["b", 0.2], ["c", 0.3]] accept, alias = build_alias_table([x[1] for x in test_l]) stat = {"a": 0, "b": 0, "c": 0} for i in range(1000): chosen_one = alias_method_sampling(test_l, accept, alias)[0] stat[chosen_one] += 1 |