ska::flat_hash_map 源码分析
背景
最近在调研各种hashmap.. 发现ska::flat hash map性能优秀。。于是来看看代码。。 发现最大的特点是,ska::flat_hash_map使用了带probe count上限的robin hood hashing
相关概念
Distance_from_desired
对于采用了open addressing的hash实现,当插入发生冲突时,会以一定方式(如线性探测、平方探测等)来探测下一个可以插入的slot. 因而实际插入的slot位置与理想的slot位置通常不相同,这段距离定义为distance_from_desired 在没有冲突的理想情况下,所有distance_from_desired的值应该都为0 distance_from_desired的一种更常见的说法叫做probe sequence lengths(PSL)
robin hood hashing
robin hood hashing的核心思想是"劫富济贫" distance_from_desired小的slot被认为更"富有",distance_from_desired大的slot被认为更"贫穷" 具体来说,当去插入一个新的元素时,如果当前位置的元素的distance_from_desired要比待插入元素的distance_from_desired要小,那么就将待插入元素放入当前位置,将当前位置的元素取出,寻找一个新的位置。
这样做使得所有元素的distance_from_desired的分布更为平均,variance更小。 这样的分布对cache更友好(几乎全部元素distance_from_desired都小于一个cache line的长度,因此在find的时候只需要fetch一次cache line),从而拥有更好的性能。
一般的robin hashing 在find时,一般用一个全局的最大distance_from_desired作为没有找到该元素终止条件。 一种常见的改进是,不维护全局最大distance_from_desired,而是在看到当前位置元素的distance_from_desired比要插入的元素的distance_from_desired小时终止。
1
2 iterator find(const FindKey& key) {
3 size_t index =
4 hash_policy.index_for_hash(hash_object(key), num_slots_minus_one);
5 EntryPointer it = entries + ptrdiff_t(index);
6 for (int8_t distance = 0; it->distance_from_desired >= distance;
7 ++distance, ++it) {
8 if (compares_equal(key, it->value)) return {it};
9 }
10 return end();
11 }
12
带上限的robin hashing
一般的robin hashing在insert时,会不断进行寻找(包括了可能的swap过程),直到找到一个空的slot为止。该过程在hash table较满时可能接近线性的时间复杂度。 ska::flat_hash_map对这一点的改进是,限制了insert时尝试的上限次数,作者给出的经验值为log(N),其中N为slots的个数。 这样保证每个slot的最大distance_from_desired不会超过log(N)
关键实现
emplace
插入一个元素 分析见注释 其中emplace 函数主要是查找是否已经存在该元素+调整到合适的插入位置 emplace_new_key函数执行真正的emplace操作
1
2 template <typename Key, typename... Args>
3 std::pair<iterator, bool> emplace(Key&& key, Args&&... args) {
4 size_t index =
5 hash_policy.index_for_hash(hash_object(key), num_slots_minus_one);
6 EntryPointer current_entry = entries + ptrdiff_t(index);
7 int8_t distance_from_desired = 0;
8 // 插入前先查找是否存在。。。
9 // 只需要查找有限的距离
10
11 // trick在于。。初始 current_entry->distance_from_desired为-1
12 // 此时不会进入for loop,直接进行emplace_new_key。
13 // 该for loop有两层意义: 1.在index位置不为空时找到合适的位置: 空的slot或者更富有的slot(也就是current_entry->distance_from_desired < distance_from_desired的slot)
14 // 2.在该过程中找一下是否已经插入了该值
15 for (; current_entry->distance_from_desired >= distance_from_desired;
16 ++current_entry, ++distance_from_desired) {
17 if (compares_equal(key, current_entry->value))
18 return {{current_entry}, false};
19 }
20 return emplace_new_key(distance_from_desired, current_entry,
21 std::forward<Key>(key), std::forward<Args>(args)...);
22 }
23
1
2 template <typename Key, typename... Args>
3 SKA_NOINLINE(std::pair<iterator, bool>)
4 emplace_new_key(int8_t distance_from_desired, EntryPointer current_entry,
5 Key&& key, Args&&... args) {
6 using std::swap;
7 // num_slots_minus_one初始值为0,表示第一次进行插入。。需要先进行grow..很合理。。
8 // 如果得到max_load_factor,或者查找次数达到max_lookups,就进行rehash
9 if (num_slots_minus_one == 0 || distance_from_desired == max_lookups ||
10 num_elements + 1 >
11 (num_slots_minus_one + 1) * static_cast<double>(_max_load_factor)) {
12 grow();
13 return emplace(std::forward<Key>(key), std::forward<Args>(args)...);
14 } else if (current_entry->is_empty()) {
15 current_entry->emplace(distance_from_desired, std::forward<Key>(key),
16 std::forward<Args>(args)...);
17 ++num_elements;
18 return {{current_entry}, true};
19 }
20
21 // 执行到这里,说明有更富有的slot。于是进行swap,转而为被换出的pair<key,value>找一个新的slot
22 // to_insert是当前要插入的,由于swap的发生,可能并不是最初要插入的那一对值
23 value_type to_insert(std::forward<Key>(key), std::forward<Args>(args)...);
24 swap(distance_from_desired, current_entry->distance_from_desired);
25 swap(to_insert, current_entry->value);
26 iterator result = {current_entry};
27 for (++distance_from_desired, ++current_entry;; ++current_entry) {
28 if (current_entry->is_empty()) {
29 // 如果被换过的slot后面某个slot是空的。。就直接放置了
30 current_entry->emplace(distance_from_desired, std::move(to_insert));
31 ++num_elements;
32 return {result, true};
33 } else if (current_entry->distance_from_desired < distance_from_desired) {
34 // 在找新slot的过程中,仍然进行劫富济贫的操作, 转而为被换出的pair<key,value>找一个新的slot
35 swap(distance_from_desired, current_entry->distance_from_desired);
36 swap(to_insert, current_entry->value);
37 ++distance_from_desired;
38 } else {
39 // 如果没有空的slot,也没有更富有的slot,那就只能继续往前寻找了,直到达到上限
40 ++distance_from_desired;
41 if (distance_from_desired == max_lookups) {
42 // 如果找了max_lookups个位置还没找到,就进行rehash
43 swap(to_insert, result.current->value);
44 grow();
45 return emplace(std::move(to_insert));
46 }
47 }
48 }
49 }
50
rehash
1 void rehash(size_t num_buckets) {
2 num_buckets = std::max(
3 num_buckets,
4 static_cast<size_t>(
5 std::ceil(num_elements / static_cast<double>(_max_load_factor))));
6 if (num_buckets == 0) {
7 reset_to_empty_state();
8 return;
9 }
10 auto new_prime_index = hash_policy.next_size_over(num_buckets);
11 if (num_buckets == bucket_count()) return;
12 int8_t new_max_lookups = compute_max_lookups(num_buckets);
13 // 额外分配了max_lookups个slots,避免了find时bound checking的开销
14 EntryPointer new_buckets(
15 AllocatorTraits::allocate(*this, num_buckets + new_max_lookups));
16 EntryPointer special_end_item =
17 new_buckets + static_cast<ptrdiff_t>(num_buckets + new_max_lookups - 1);
18 for (EntryPointer it = new_buckets; it != special_end_item; ++it)
19 it->distance_from_desired = -1;
20 special_end_item->distance_from_desired = Entry::special_end_value;
21 std::swap(entries, new_buckets);
22 std::swap(num_slots_minus_one, num_buckets);
23 --num_slots_minus_one;
24 hash_policy.commit(new_prime_index);
25 int8_t old_max_lookups = max_lookups;
26 max_lookups = new_max_lookups;
27 num_elements = 0;
28 // new_buckets其实是旧的entries..
29 // num_buckets其实也是旧的值。。因为已经被swap了
30 for (EntryPointer
31 it = new_buckets,
32 end = it + static_cast<ptrdiff_t>(num_buckets + old_max_lookups);
33 it != end; ++it) {
34 if (it->has_value()) {
35 emplace(std::move(it->value));
36 it->destroy_value();
37 }
38 }
39 deallocate_data(new_buckets, num_buckets, old_max_lookups);
40 }
41
一些其他trick
通过多分配log(N)的slot消除bound checking的开销
代码见上面的rehash部分 由于每个元素的最大distance_from_desired不会超过log(N),因此可以保证查找时不需要做Bound checking. 使得find部分的实现非常简洁:
使用素数个slot而不是2的整数次幂个slot
2的整数次幂个slot是一种很常见的实现。 这种实现的主要好处是在将hash转换为index时,避免了代价高昂取模操作,而是用代价很小的按位与(&)替代
但是使用2的整数次幂个slot的缺点是,取模后得到的结果较少,比起使用素数个slot更容易发生冲突。 ska::flat_hash_map的作者借鉴了boost::multi_index中的做法,将变量展开为compile time const(对compile time const做取模运算要远远快于变量的取模运算),从而减小了这部分开销的影响。
1
2struct prime_number_hash_policy {
3 static size_t mod0(size_t) {
4 return 0llu;
5 }
6 static size_t mod2(size_t hash) {
7 return hash % 2llu;
8 }
9 static size_t mod3(size_t hash) {
10 return hash % 3llu;
11 }
12 static size_t mod5(size_t hash) {
13 return hash % 5llu;
14 }
15 static size_t mod7(size_t hash) {
16 return hash % 7llu;
17 }
18 static size_t mod11(size_t hash) {
19 return hash % 11llu;
20 }
21 static size_t mod13(size_t hash) {
22 return hash % 13llu;
23 }
24 static size_t mod17(size_t hash) {
25 return hash % 17llu;
26 }
27 static size_t mod23(size_t hash) {
28 return hash % 23llu;
29 }
30 static size_t mod29(size_t hash) {
31 return hash % 29llu;
32 }
33 static size_t mod37(size_t hash) {
34 return hash % 37llu;
35 }
36 static size_t mod47(size_t hash) {
37 return hash % 47llu;
38 }
39 ...
40