雪花算法

由Twitter提出,用于在分布式环境下生成唯一且有序的64位整型ID,并且可以确保递增性。我在实习中经常见到使用雪花算法作为数据库主键,因此就把这个算法引入到ElectricSearch这个项目中作为倒排索引的文档ID。

算法原理与实现

雪花算法的各个组成部分:

符号位(1位) 时间戳(41位) 机器ID(10位) 序列号(12位)
  1. 时间戳部分:占用了41位,用来记录生成ID的时间,单位为毫秒。时间戳部分可以支持大约68年的时间跨度(从算法开始的时间点算起)。
  2. 机器标识部分:占用了10位,可以用来标识不同的机器。这意味着可以支持最多2^10 = 1024台机器。每台机器在这个集群中都有一个唯一的标识号。
  3. 序列号部分:占用了12位,用来在同一个毫秒内生成多个ID。每个机器每毫秒内可以生成2^12 = 4096个不同的ID。
  4. 标志位:占用了1位,通常设为0,不使用。

生成唯一ID的过程:

  1. 获取当前的时间戳(毫秒级)。
  2. 将当前时间戳减去某个固定的时间基点(可以为编写算法时的时间点),得到相对于起始时间的时间差。
  3. 将时间差左移42位(即41位时间戳 + 1位不用的标志位)。
  4. 将机器ID左移12位(即序列号占用的位数)。
  5. 将上述两个结果相加。
  6. 加上当前毫秒内的序列号。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
const (
workerBits uint8 = 10 // 每台机器(节点)的ID位数 10位最大可以有2^10=1024个节点
numberBits uint8 = 12 // 表示每个集群下的每个节点,1毫秒内可生成的id序号的二进制位数 即每毫秒可生成 2^12-1=4096个唯一ID
workerMax uint64 = -1 ^ (-1 << workerBits) // 节点ID的最大值,用于防止溢出
numberMax uint64 = -1 ^ (-1 << numberBits) // 同上,用来表示生成id序号的最大值
timeShift uint8 = workerBits + numberBits // 时间戳向左的偏移量
workerShift uint8 = numberBits // 节点ID向左的偏移量
// 41位字节作为时间戳数值的话 大约68年就会用完
// 假如你2010年1月1日开始开发系统 如果不减去2010年1月1日的时间戳 那么白白浪费40年的时间戳啊!
// 这个一旦定义且开始生成ID后千万不要改了 不然可能会生成相同的ID
epoch uint64 = 1724896773911 // 在写这个变量时的时间戳
)

type Worker struct {
mu sync.Mutex
timestamp uint64
workerId uint64
number uint64
}

func NewWorker(workerId uint64) (*Worker, error) {
if workerId > workerMax {
return nil, errors.New("Worker ID excess of quantity")
}

return &Worker{
timestamp: 0,
workerId: workerId,
number: 0,
}, nil
}

func (w *Worker) GetId() uint64 {
w.mu.Lock()
defer w.mu.Unlock()

// 获取生成时的时间戳
now := uint64(time.Now().UnixNano() / 1e6) // 纳秒转毫秒
if w.timestamp == now {
w.number++
// 如果当前工作节点在1毫秒内生成的ID已经超过上限 需要等待1毫秒再继续生成
if w.number > numberMax {
for now <= w.timestamp {
now = uint64(time.Now().UnixNano() / 1e6)
}
}
} else {
// 如果当前时间与工作节点上一次生成ID的时间不一致 则需要重置工作节点生成ID的序号
w.number = 0
w.timestamp = now // 将机器上一次生成ID的时间更新为当前时间
}
// 第一段 now - epoch 为该算法目前已经运行了xxx毫秒
// 如果在程序跑了一段时间修改了epoch这个值 可能会导致生成相同的ID
ID := uint64((now-epoch)<<timeShift | (w.workerId << workerShift) | (w.number))
return ID
}

算法的测试与生产者消费者模型

由于雪花算法适用于分布式系统,一般用到的场景都会有不小的并发,这里我使用生产者消费者模式来测试雪花算法。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
workerIds := []uint64{123, 456}
const (
p = 10 //生产者数量
c = 12 //消费者数量
n = 120000 //生成ID总数
)
//创建两个worker节点,模拟两台机器
worker1, _ := util.NewWorker(workerIds[0])
worker2, _ := util.NewWorker(workerIds[1])
ids := sync.Map{} //用于判重的集合,键为雪花算法生成的ID,值为空结构体

repetitions, count := 0, 0
results := make(chan uint64, n/p) //用于接收生产者生成的ID
pwg := sync.WaitGroup{}
cwg := sync.WaitGroup{}

cwg.Add(c)
pwg.Add(p)

start := time.Now() //记录生成ID的开始时间
for i := 0; i < p; i++ { //开启p个生产者并发生成ID发送至results
go func() {
defer pwg.Done()
for j := 0; j < n/p+4; j++ {
if i&1 == 0 {
results <- worker1.GetId()
} else {
results <- worker2.GetId()
}
}
}()
}

for i := 0; i < c; i++ { //开启c个消费者并发读取results并对ID进行判重
go func() {
defer cwg.Done()
for id := range results {
// 如果ID已存在于ids集合中,则说明重复
if _, loaded := ids.LoadOrStore(id, struct{}{}); loaded {
fmt.Printf("id: %d 重复!\n", id)
repetitions++
count++
} else {
count++
}
}
}()
}

pwg.Wait()
end := time.Now() //记录生成ID的结束时间
close(results)
cwg.Wait()

duration := end.Sub(start)
fmt.Printf("生成的ID总数:%d,重复的ID共:%d个,重复率为:%.4f\n", count, repetitions, float64(repetitions)/float64(count)*100)
fmt.Printf("生成ID耗时: %d 毫秒\n", duration.Milliseconds())

测试结果:

1
2
3
4
id: 2656171589087232 重复!
id: 2656171593281536 重复!
生成的ID总数:120023,重复的ID共:2个,重复率为:0.0017
生成ID耗时: 79 毫秒

可见雪花算法不仅效率高,并且在超高并发下也能保证极低的错误率。