0%

[python源码分析] random模块解析

本文将从random seed 入手,分析python中如何实现random的 ## 1、 random seed种子 ### 1.1、示例代码 先看一段代码

1
2
3
4
5
6
7
8
9
import random

random.seed(10)
a = set([random.random() for i in range(1000)])

random.seed(10)
b = set([random.random() for i in range(1000)])

print len(a), len(b), a==b
输出为
1
1000 1000 True

从上面可以看出, random其实产生的是伪随机序列。当seed种子一样时,输出将会一模一样

1.2、 random.py文件中的seed#

我们来看下random.py文件的seed 方法

我们可以看到 - 用户可以自定以a; - 若没有自定义a,则尝试调用 **_urandom(16)生成a - 若调用失败,使用time.time()** * 256 生成

1.3、 _randommodule源码#

接下来, 将a传入Random父类的seed。 _random.Random是什么呢? 我们在cpython源码 **Modules/_randommodule.c能找到答案

1
2
3
4
5
6
7
8
static PyMethodDef random_methods[] = {
_RANDOM_RANDOM_RANDOM_METHODDEF
_RANDOM_RANDOM_SEED_METHODDEF
_RANDOM_RANDOM_GETSTATE_METHODDEF
_RANDOM_RANDOM_SETSTATE_METHODDEF
_RANDOM_RANDOM_GETRANDBITS_METHODDEF
{NULL, NULL} /* sentinel */
};
_RANDOM_RANDOM_RANDOM_METHODDEF**为例,找出其定义如下
1
2
#define _RANDOM_RANDOM_RANDOM_METHODDEF    \
{"random", (PyCFunction)_random_Random_random, METH_NOARGS, _random_Random_random__doc__},
照葫芦画瓢, 将上面的宏逐一替换:
1
2
3
4
5
6
7
8
static PyMethodDef random_methods[] = {
{"random", (PyCFunction)_random_Random_random, METH_NOARGS, _random_Random_random__doc__},
{"seed", (PyCFunction)(void(*)(void))_random_Random_seed, METH_FASTCALL, _random_Random_seed__doc__},
{"getstate", (PyCFunction)_random_Random_getstate, METH_NOARGS, _random_Random_getstate__doc__},
{"setstate", (PyCFunction)_random_Random_setstate, METH_O, _random_Random_setstate__doc__},
{"getrandbits", (PyCFunction)_random_Random_getrandbits, METH_O, _random_Random_getrandbits__doc__},
{NULL, NULL} /* sentinel */
};
random c语言

1.4、 RandomObject#

首先我们来了解下RandomObject对象

1
2
3
4
5
6
7
8
#define N 624


typedef struct {
PyObject_HEAD
int index;
uint32_t state[N];
} RandomObject;
RandomObject由三部分构成: - PyObject_HEAD: 公共头部 - index: 存储当前对应的state索引 - 大小为624的uint32_t数组, 调用random.seed就会初始化一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static PyObject *
_random_Random_seed(RandomObject *self, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
PyObject *n = Py_None;

if (!_PyArg_CheckPositional("seed", nargs, 0, 1)) {
goto exit;
}
if (nargs < 1) {
goto skip_optional;
}
n = args[0];
skip_optional:
return_value = _random_Random_seed_impl(self, n); //调用_random_Random_seed_impl

exit:
return return_value;
}

实际上_random_Random_seed做一些校验后,会调用 **_random_Random_seed_impl**(self, n)

1
2
3
4
5
6
static PyObject *
_random_Random_seed_impl(RandomObject *self, PyObject *n)
/*[clinic end generated code: output=0fad1e16ba883681 input=78d6ef0d52532a54]*/
{
return random_seed(self, n);
}

顺藤摸瓜, 看一下random_seed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static PyObject *
random_seed(RandomObject *self, PyObject *arg)
{
PyObject *result = NULL; /* guilty until proved innocent */
PyObject *n = NULL;
uint32_t *key = NULL;
size_t bits, keyused;
int res;

if (arg == NULL || arg == Py_None) {
if (random_seed_urandom(self) < 0) {
PyErr_Clear();

/* Reading system entropy failed, fall back on the worst entropy:
use the current time and process identifier. */
random_seed_time_pid(self);
}
Py_RETURN_NONE;
}

/* This algorithm relies on the number being unsigned.
* So: if the arg is a PyLong, use its absolute value.
* Otherwise use its hash value, cast to unsigned.
* 对传入的seed值进行处理,取绝对值 or hash值
*/
if (PyLong_CheckExact(arg)) {
n = PyNumber_Absolute(arg);
} else if (PyLong_Check(arg)) {
/* Calling int.__abs__() prevents calling arg.__abs__(), which might
return an invalid value. See issue #31478. */
n = PyObject_CallOneArg(_randomstate_global->Long___abs__, arg);
}
else {
Py_hash_t hash = PyObject_Hash(arg);
if (hash == -1)
goto Done;
n = PyLong_FromSize_t((size_t)hash);
}
if (n == NULL)
goto Done;

/* Now split n into 32-bit chunks, from the right. */
bits = _PyLong_NumBits(n);
if (bits == (size_t)-1 && PyErr_Occurred())
goto Done;

/* Figure out how many 32-bit chunks this gives us. */
keyused = bits == 0 ? 1 : (bits - 1) / 32 + 1;

/* Convert seed to byte sequence. seed转换为字节流*/
key = (uint32_t *)PyMem_Malloc((size_t)4 * keyused);
if (key == NULL) {
PyErr_NoMemory();
goto Done;
}
res = _PyLong_AsByteArray((PyLongObject *)n,
(unsigned char *)key, keyused * 4,
PY_LITTLE_ENDIAN,
0); /* unsigned */
if (res == -1) {
goto Done;
}

#if PY_BIG_ENDIAN
{
size_t i, j;
/* Reverse an array. */
for (i = 0, j = keyused - 1; i < j; i++, j--) {
uint32_t tmp = key[i];
key[i] = key[j];
key[j] = tmp;
}
}
#endif
init_by_array(self, key, keyused); // 设置seed到self对象中

Py_INCREF(Py_None);
result = Py_None;

Done:
Py_XDECREF(n);
PyMem_Free(key);
return result;
}
上面一大堆校验和计算,最后将传入的seed参数arg 会被转换为 key(字节流形式),然后传入init_by_array函数中。继续看下init_by_array函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void
init_by_array(RandomObject *self, uint32_t init_key[], size_t key_length)
{
size_t i, j, k; /* was signed in the original code. RDH 12/16/2002 */
uint32_t *mt;

mt = self->state;
init_genrand(self, 19650218U);
i=1; j=0;
k = (N>key_length ? N : key_length);
// 根据传入的init_key数组, 初始化self->state
for (; k; k--) {
mt[i] = (mt[i] ^ ((mt[i-1] ^ (mt[i-1] >> 30)) * 1664525U))
+ init_key[j] + (uint32_t)j; /* non linear */
i++; j++;
if (i>=N) { mt[0] = mt[N-1]; i=1; }
if (j>=key_length) j=0;
}
for (k=N-1; k; k--) {
mt[i] = (mt[i] ^ ((mt[i-1] ^ (mt[i-1] >> 30)) * 1566083941U))
- (uint32_t)i; /* non linear */
i++;
if (i>=N) { mt[0] = mt[N-1]; i=1; }
}

mt[0] = 0x80000000U; /* MSB is 1; assuring non-zero initial array */
}
可以看到, 该函数根据传入的init_key数组,设置RandomObject的state数组(大小为N = 624)。 阅读至此,我们可以得出如下结论: - RandomObject对象由公共头部、index 和 state(大小为624的uint32_t)数组组成。 - 调用py文件中的random.seed(xxx), 经过一系列的校验和计算,最终设置state数组,这个数组使random实现伪随机 - 传入同样的seed,state数组一致,随机的结果也会一致 - 不传默认参数时,通过time.time等机制实现对seed的随机,从而实现state数组不一致。 - seed的参数,必须为Long型可哈希类型

2. random.random#

趁热打铁,我们看一波random.random的原理

1
2
3
4
5
static PyObject *
_random_Random_random(RandomObject *self, PyObject *Py_UNUSED(ignored))
{
return _random_Random_random_impl(self);
}
继续追击_random_Random_random_impl
1
2
3
4
5
6
7
static PyObject *
_random_Random_random_impl(RandomObject *self)
/*[clinic end generated code: output=117ff99ee53d755c input=afb2a59cbbb00349]*/
{
uint32_t a=genrand_uint32(self)>>5, b=genrand_uint32(self)>>6;
return PyFloat_FromDouble((a*67108864.0+b)*(1.0/9007199254740992.0));
}
具体算法就不细究了,这里生成a,b两个数,使用了一个关键的函数genrand_uint32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/* Period parameters -- These are all magic.  Don't change. */
#define N 624
#define M 397
#define MATRIX_A 0x9908b0dfU /* constant vector a */
#define UPPER_MASK 0x80000000U /* most significant w-r bits */
#define LOWER_MASK 0x7fffffffU /* least significant r bits */

/* generates a random number on [0,0xffffffff]-interval */
static uint32_t
genrand_uint32(RandomObject *self)
{
uint32_t y;
static const uint32_t mag01[2] = {0x0U, MATRIX_A};
/* mag01[x] = x * MATRIX_A for x=0,1 */
uint32_t *mt;

mt = self->state;、
// 越界了,重新计算state数组
if (self->index >= N) { /* generate N words at one time */
int kk;

for (kk=0;kk<N-M;kk++) {
y = (mt[kk]&UPPER_MASK)|(mt[kk+1]&LOWER_MASK);
mt[kk] = mt[kk+M] ^ (y >> 1) ^ mag01[y & 0x1U];
}
for (;kk<N-1;kk++) {
y = (mt[kk]&UPPER_MASK)|(mt[kk+1]&LOWER_MASK);
mt[kk] = mt[kk+(M-N)] ^ (y >> 1) ^ mag01[y & 0x1U];
}
y = (mt[N-1]&UPPER_MASK)|(mt[0]&LOWER_MASK);
mt[N-1] = mt[M-1] ^ (y >> 1) ^ mag01[y & 0x1U];
//初始化index
self->index = 0;
}

y = mt[self->index++];
y ^= (y >> 11);
y ^= (y << 7) & 0x9d2c5680U;
y ^= (y << 15) & 0xefc60000U;
y ^= (y >> 18);
return y;
}
上面函数可以总结为: - 取完state[index]的值后, index 加 1, 并经过一系列固定步骤的计算得到返回值 - 若index大于N时,使用一系列复杂的算法重新计算state数组,并将index置为0

3. 排坑#

如下是一个可指定rand的函数

1
2
3
def my_rand(seed=None):
random.seed(seed)
return random.random()
结合上面的seed函数分析, random其实都是伪随机算法 - 若不同时间多次调用, 传入一样的seed时, state数组就会一致, 且index从0开始, 导致随机输出很可能就会一致 - 对于强随机算法,这个就会出bug。而且random在一个进程中只存在一个object,这样会影响全局的随机概率

修复思路

1
2
3
4
5
def my_rand(seed=None):
random.seed(seed)
result = random.random()
random.seed() //重新随机seed一波
return result