🌞

cache2go源码阅读(一)

cache2go
并发安全的golang缓存库,支持过期功能。

从Example开始

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 待存储的元素的结构体定义
type myStruct struct {
	text     string
	moreData []byte
}

func main() {
	// 创建一个缓存表
	cache := cache2go.Cache("myCache")

  // 准备元素
	val := myStruct{"This is a test!", []byte{}}

  // 缓存表添加元素,设定过期时间
	cache.Add("someKey", 5*time.Second, &val)

	// 从缓存中获取元素
	res, err := cache.Value("someKey")
	if err == nil {
		fmt.Println("Found value in cache:", res.Data().(*myStruct).text)
	} else {
		fmt.Println("Error retrieving value from cache:", err)
	}

	// 等待直至元素过期
	time.Sleep(6 * time.Second)
	res, err = cache.Value("someKey")
	if err != nil {
		fmt.Println("Item is not cached (anymore).")
	}

	// 添加新的永不过期的元素
	cache.Add("someKey", 0, &val)

	// 设置删除元素时给反馈
	cache.SetAboutToDeleteItemCallback(func(e *cache2go.CacheItem) {
		fmt.Println("Deleting:", e.Key(), e.Data().(*myStruct).text, e.CreatedOn())
	})

	// 移除元素
	cache.Delete("someKey")

	// 清空缓存
	cache.Flush()
}

这个example完成了以下事情:

  • 新建缓存表
  • 往表内添加、删除元素
  • 元素过期处理

从这个例子来看,能发现两个重要的数据结构,缓存表CacheTable,和元素CacheItem

主要数据结构:CacheTable和CacheItem

1. 元素CacheItem
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type CacheItem struct {
	sync.RWMutex

	// 元素的key
	key interface{}
	// 元素内容
	data interface{}
	// 元素生命周期
	lifeSpan time.Duration

	// 创建时间
	createdOn time.Time
	// 最后被访问时间
	accessedOn time.Time
	// 访问次数
	accessCount int64

	// 到期签可触发的方法
	aboutToExpire []func(key interface{})
}

构造函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
	t := time.Now()
	return &CacheItem{
		key:           key,
		lifeSpan:      lifeSpan,
		createdOn:     t,
		accessedOn:    t,
		accessCount:   0,
		aboutToExpire: nil,
		data:          data,
	}
}

cache2go是并发安全的,CacheItem内有个读写锁,在对可能被其他线程改写的属性进行访问时,都会做上锁操作,通过这个锁实现cache2go的并发安全。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (item *CacheItem) KeepAlive() {
	item.Lock()
	defer item.Unlock()
	item.accessedOn = time.Now()
	item.accessCount++
}

func (item *CacheItem) AccessedOn() time.Time {
	item.RLock()
	defer item.RUnlock()
	return item.accessedOn
}

func (item *CacheItem) AccessCount() int64 {
	item.RLock()
	defer item.RUnlock()
	return item.accessCount
}

func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
	if len(item.aboutToExpire) > 0 {
		item.RemoveAboutToExpireCallback()
	}
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = append(item.aboutToExpire, f)
}

func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = append(item.aboutToExpire, f)
}

func (item *CacheItem) RemoveAboutToExpireCallback() {
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = nil
}
2. 缓存表CacheTable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type CacheTable struct {
	sync.RWMutex

	// 缓存表命名
	name string
	// 缓存表内存储的所有元素,是个slice
	items map[interface{}]*CacheItem

	// 触发清楚的定时器
	cleanupTimer *time.Timer
	// 当前定时器间隔
	cleanupInterval time.Duration

	logger *log.Logger

	// Callback method triggered when trying to load a non-existing key.
	loadData func(key interface{}, args ...interface{}) *CacheItem
	// Callback method triggered when adding a new item to the cache.
	addedItem []func(item *CacheItem)
	// Callback method triggered before deleting an item from the cache.
	aboutToDeleteItem []func(item *CacheItem)
}

同样的,涉及并发访问的属性,CacheTable的方法都会做上锁操作,比如以下方法,没有一一贴出来

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 获取元素个数
func (table *CacheTable) Count() int {
	table.RLock()
	defer table.RUnlock()
	return len(table.items)
}

// 遍历
func (table *CacheTable) Foreach(trans func(key interface{}, item *CacheItem)) {
	table.RLock()
	defer table.RUnlock()

	for k, v := range table.items {
		trans(k, v)
	}
}
缓存表添加元素

添加新元素

1
2
3
4
5
6
7
8
9
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
	item := NewCacheItem(key, lifeSpan, data)

	// Add item to cache.
	table.Lock()
	table.addInternal(item)

	return item
}

添加不存在的元素

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (table *CacheTable) NotFoundAdd(key interface{}, lifeSpan time.Duration, data interface{}) bool {
	table.Lock()

	if _, ok := table.items[key]; ok {
		table.Unlock()
		return false
	}

	// 查找不到元素,则新建元素添加
	item := NewCacheItem(key, lifeSpan, data)
	table.addInternal(item)

	return true
}

添加元素均调用addInternal方法,调用此方法的前提是必须上锁,然后添加元素,并做过期检查

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (table *CacheTable) addInternal(item *CacheItem) {
	// Careful: do not run this method unless the table-mutex is locked!
	// It will unlock it for the caller before running the callbacks and checks
	table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
	table.items[item.key] = item

	// Cache values so we don't keep blocking the mutex.
	expDur := table.cleanupInterval
	addedItem := table.addedItem
	table.Unlock()

	// Trigger callback after adding an item to cache.
	if addedItem != nil {
		for _, callback := range addedItem {
			callback(item)
		}
	}

	// If we haven't set up any expiration check timer or found a more imminent item.
	if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
		table.expirationCheck()
	}
}
缓存表删除元素
1
2
3
4
5
6
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
	table.Lock()
	defer table.Unlock()

	return table.deleteInternal(key)
}

类似于添加元素方法,加锁后调用deleteInternal方法,先检查元素是否存在,在删除元素前,先确定有没有aboutToDeleteItem回调函数要调用,再对CacheItem上锁删除

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
	r, ok := table.items[key]
	if !ok {
		return nil, ErrKeyNotFound
	}

	// Cache value so we don't keep blocking the mutex.
	aboutToDeleteItem := table.aboutToDeleteItem
	table.Unlock()

	// Trigger callbacks before deleting an item from cache.
	if aboutToDeleteItem != nil {
		for _, callback := range aboutToDeleteItem {
			callback(r)
		}
	}

	r.RLock()
	defer r.RUnlock()
	if r.aboutToExpire != nil {
		for _, callback := range r.aboutToExpire {
			callback(key)
		}
	}

	table.Lock()
	table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
	delete(table.items, key)

	return r, nil
}
元素过期检查机制
  • lifeSpan为0的元素永久保留
  • 通过元素的accessedOn属性判断是否过期,已过期则删除,未过期更新剩余生命时长

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    
    func (table *CacheTable) expirationCheck() {
    	table.Lock()
    	if table.cleanupTimer != nil {
    		table.cleanupTimer.Stop()
    	}
    	if table.cleanupInterval > 0 {
    		table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
    	} else {
    		table.log("Expiration check installed for table", table.name)
    	}
    
    	// To be more accurate with timers, we would need to update 'now' on every
    	// loop iteration. Not sure it's really efficient though.
    	now := time.Now()
    	smallestDuration := 0 * time.Second
    	for key, item := range table.items {
    		// Cache values so we don't keep blocking the mutex.
    		item.RLock()
    		lifeSpan := item.lifeSpan
    		accessedOn := item.accessedOn
    		item.RUnlock()
    
    		// 为0的元素永久保留
    		if lifeSpan == 0 {
    			continue
    		}
    
    		// 通过元素的`accessedOn`属性判断是否过期,已过期则删除
    		if now.Sub(accessedOn) >= lifeSpan {
    			// Item has excessed its lifespan.
    			table.deleteInternal(key)
    		} else {
    			// 未过期更新剩余生命时长
    			// Find the item chronologically closest to its end-of-lifespan.
    			if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
    				smallestDuration = lifeSpan - now.Sub(accessedOn)
    			}
    		}
    	}
    
    	// Setup the interval for the next cleanup run.
    	table.cleanupInterval = smallestDuration
    	if smallestDuration > 0 {
    		table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
    			go table.expirationCheck()
    		})
    	}
    	table.Unlock()
    }
缓存表清空

关闭定时器,存放元素的map指向一个空map即可

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (table *CacheTable) Flush() {
	table.Lock()
	defer table.Unlock()

	table.log("Flushing table", table.name)

	table.items = make(map[interface{}]*CacheItem)
	table.cleanupInterval = 0
	if table.cleanupTimer != nil {
		table.cleanupTimer.Stop()
	}
}
updatedupdated2020-01-212020-01-21