mirror of
https://gitee.com/juicedata/JuiceFS.git
synced 2025-12-06 09:39:14 +08:00
meta/redis: support client cache (#6495)
Signed-off-by: jiefenghuang <jiefeng@juicedata.io>
This commit is contained in:
90
docs/en/reference/redis-csc.md
Normal file
90
docs/en/reference/redis-csc.md
Normal file
@@ -0,0 +1,90 @@
|
||||
# Redis Client-Side Caching Support in JuiceFS
|
||||
|
||||
Starting with version 6.0, Redis provides [Client-Side Caching](https://redis.io/docs/latest/develop/reference/client-side-caching) which allows clients to maintain local caches of data in a faster and more efficient way. JuiceFS includes full support for this feature, offering significant performance improvements for metadata operations.
|
||||
|
||||
## How it works
|
||||
|
||||
Redis Client-Side Caching (CSC) works by:
|
||||
|
||||
1. The client enables tracking mode with `CLIENT TRACKING ON BCAST`
|
||||
2. The client caches data locally after reading it from Redis
|
||||
3. Redis notifies the client when cached keys are modified by any client
|
||||
4. The client invalidates those keys in its local cache
|
||||
|
||||
This results in reduced network traffic, lower latency, and higher throughput.
|
||||
|
||||
## Configuration
|
||||
|
||||
JuiceFS supports Redis CSC through the following options in the metadata URL:
|
||||
|
||||
```shell
|
||||
--meta-url="redis://localhost/1?client-cache=true" # Enable client-side caching (always BCAST mode)
|
||||
--meta-url="redis://localhost/1?client-cache=true&client-cache-size=500" # Set cache size (default 12800)
|
||||
--meta-url="redis://localhost/1?client-cache=true&client-cache-expire=60s" # Set cache expiration (default: 60s)
|
||||
```
|
||||
|
||||
### Options
|
||||
|
||||
- `client-cache`: Enables client-side caching in BCAST mode (set to any value except "false")
|
||||
- `client-cache-size`: Maximum cache size (default: 12800)
|
||||
- `client-cache-expire`: Cache expiration time (default: 60s)
|
||||
- `client-cache-preload`: Number of file objects under the root directory preloaded after mounting. (default: 0)
|
||||
|
||||
When client-side caching is enabled, JuiceFS caches:
|
||||
|
||||
1. **Inode attributes**: File/directory metadata like permissions, size, timestamps
|
||||
2. **Directory entries**: Name to inode mappings for faster lookups
|
||||
|
||||
> **Note:** Redis Client Side Cache requires Redis server version 6.0 or higher. Using this feature with older Redis versions will result in errors.
|
||||
|
||||
### Preloading Cache
|
||||
|
||||
When client-side caching is enabled and `client-cache-preload` is set, JuiceFS will preload the file-object attributes and entries under the root directory after mounting. This lazy preloading happens in the background and helps to:
|
||||
|
||||
1. Warm up the cache for common operations
|
||||
2. Reduce latency for initial file system operations
|
||||
3. Provide better performance from the moment the file system is mounted
|
||||
|
||||
The preloading process intelligently prioritizes the most important inodes by:
|
||||
|
||||
1. Starting with the root directory
|
||||
2. Loading the most frequently accessed top-level directories and files
|
||||
3. Recursively exploring important subdirectories
|
||||
|
||||
The preloading process runs in a background goroutine with fail-safe mechanisms and won't block or affect normal file system operations.
|
||||
|
||||
## Modes
|
||||
|
||||
JuiceFS uses BCAST mode for simplicity and reliability:
|
||||
|
||||
- **BCAST mode**: All keys accessed by the client are tracked and notifications are sent for any changes.
|
||||
|
||||
BCAST mode provides the simplest implementation while ensuring cache coherence across all clients.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Redis server version 6.0 or higher
|
||||
- JuiceFS with CSC support enabled
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
1. The default 12800 cache size should be sufficient for most workloads
|
||||
2. For very large filesystems with millions of files, you may benefit from increasing the cache size
|
||||
3. The cache is most effective for metadata-heavy workloads with many repeated operations
|
||||
4. For very write-heavy workloads, consider disabling CSC as invalidation traffic may offset benefits
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If you experience crashes or instability with CSC enabled:
|
||||
|
||||
1. Update to the latest JuiceFS version which contains important fixes for CSC
|
||||
2. Try reducing the cache size with `client-cache-size`
|
||||
3. Check Redis server logs for any memory or client tracking issues
|
||||
4. Make sure your Redis server version is 6.0 or higher
|
||||
5. If problems persist, disable CSC by removing the `client-cache` parameter
|
||||
|
||||
JuiceFS includes robust error handling for various Redis CSC-specific responses to ensure stable operation even when Redis sends unexpected response formats due to client tracking.
|
||||
|
||||
## References
|
||||
|
||||
- [Redis Client-Side Caching Documentation](https://redis.io/docs/latest/develop/reference/client-side-caching)
|
||||
5
go.mod
5
go.mod
@@ -41,6 +41,7 @@ require (
|
||||
github.com/hanwen/go-fuse/v2 v2.1.1-0.20210611132105-24a1dfe6b4f8
|
||||
github.com/hashicorp/consul/api v1.29.2
|
||||
github.com/hashicorp/go-hclog v1.6.3
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.21.12+incompatible
|
||||
github.com/hungys/go-lz4 v0.0.0-20170805124057-19ff7f07f099
|
||||
github.com/jackc/pgx/v5 v5.7.3
|
||||
@@ -68,7 +69,7 @@ require (
|
||||
github.com/prometheus/prometheus v0.54.1
|
||||
github.com/qingstor/qingstor-sdk-go/v4 v4.4.0
|
||||
github.com/qiniu/go-sdk/v7 v7.25.2
|
||||
github.com/redis/go-redis/v9 v9.7.3
|
||||
github.com/redis/go-redis/v9 v9.16.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/smartystreets/goconvey v1.7.2
|
||||
github.com/spf13/cast v1.7.1
|
||||
@@ -354,3 +355,5 @@ replace github.com/mattn/go-colorable v0.1.9 => github.com/juicedata/go-colorabl
|
||||
replace github.com/mattn/go-colorable v0.0.9 => github.com/juicedata/go-colorable v0.0.0-20250208072043-a97a0c2023db
|
||||
|
||||
replace github.com/cloudsoda/go-smb2 => github.com/juicedata/go-smb2 v0.0.0-20250917090526-d2d0abfb0e05
|
||||
|
||||
replace github.com/hashicorp/golang-lru/v2 v2.0.7 => github.com/juicedata/golang-lru/v2 v2.0.8-0.20251126062551-1b321869f904
|
||||
|
||||
6
go.sum
6
go.sum
@@ -487,6 +487,8 @@ github.com/juicedata/godaemon v0.0.0-20210629045518-3da5144a127d h1:kpQMvNZJKGY3
|
||||
github.com/juicedata/godaemon v0.0.0-20210629045518-3da5144a127d/go.mod h1:dlxKkLh3qAIPtgr2U/RVzsZJDuXA1ffg+Njikfmhvgw=
|
||||
github.com/juicedata/gogfapi v0.0.0-20241204082332-ecd102647f80 h1:EPg/f3lhbAOjE2M0WpVi47Fk62mEmmPejRuGVdOFQww=
|
||||
github.com/juicedata/gogfapi v0.0.0-20241204082332-ecd102647f80/go.mod h1:Ho5G4KgrgbMKW0buAJdOmYoJcOImkzznJQaLiATrsx4=
|
||||
github.com/juicedata/golang-lru/v2 v2.0.8-0.20251126062551-1b321869f904 h1:oNtkL1jwrNMMcBlHNW1fhdl4quK7p1EdR7o1Rja5xpM=
|
||||
github.com/juicedata/golang-lru/v2 v2.0.8-0.20251126062551-1b321869f904/go.mod h1:qnbgnNzfydwuHjSCApF4bdul+tZ8T3y1MkZG/OFczLA=
|
||||
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible h1:2/ttSmYoX+QMegpNyAJR0Y6aHcVk57F7RJit5xN2T/s=
|
||||
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible/go.mod h1:Ukwa8ffRQLV6QRwpqGioPjn2Wnf7TBDA4DbennDOqHE=
|
||||
github.com/juicedata/minio v0.0.0-20251120043259-079fa6a601db h1:yGKlGEz3nOD2IovjI+V4O+eY1TPgOp/T6gOxMl9/xKI=
|
||||
@@ -701,8 +703,8 @@ github.com/qiniu/go-sdk/v7 v7.25.2/go.mod h1:dmKtJ2ahhPWFVi9o1D5GemmWoh/ctuB9peq
|
||||
github.com/qiniu/x v1.10.5/go.mod h1:03Ni9tj+N2h2aKnAz+6N0Xfl8FwMEDRC2PAlxekASDs=
|
||||
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 h1:UVArwN/wkKjMVhh2EQGC0tEc1+FqiLlvYXY5mQ2f8Wg=
|
||||
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93/go.mod h1:Nfe4efndBz4TibWycNE+lqyJZiMX4ycx+QKV8Ta0f/o=
|
||||
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
|
||||
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
|
||||
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
|
||||
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
||||
|
||||
@@ -50,6 +50,7 @@ import (
|
||||
"github.com/juicedata/juicefs/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/redis/go-redis/v9/maintnotifications"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
@@ -92,6 +93,7 @@ type redisMeta struct {
|
||||
prefix string
|
||||
shaLookup string // The SHA returned by Redis for the loaded `scriptLookup`
|
||||
shaResolve string // The SHA returned by Redis for the loaded `scriptResolve`
|
||||
cache *redisCache
|
||||
}
|
||||
|
||||
var _ Meta = (*redisMeta)(nil)
|
||||
@@ -122,6 +124,14 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) {
|
||||
keyFile := query.pop("tls-key-file")
|
||||
caCertFile := query.pop("tls-ca-cert-file")
|
||||
tlsServerName := query.pop("tls-server-name")
|
||||
|
||||
// Client-side caching options
|
||||
clientCacheStr := query.pop("client-cache")
|
||||
clientCache := clientCacheStr != "false" && clientCacheStr != ""
|
||||
clientCacheSize := query.getInt("client-cache-size", "client_cache_size", 12800)
|
||||
// Default TTL to prevent reading stale cache for a long time when the connection fails.
|
||||
clientCacheExpiry := query.duration("client-cache-expire", "client_cache_expire", time.Minute)
|
||||
clientCachePreload := query.getInt("client-cache-preload", "client_cache_preload", 0) // may cause conflict
|
||||
u.RawQuery = values.Encode()
|
||||
|
||||
hosts := u.Host
|
||||
@@ -173,8 +183,10 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) {
|
||||
opt.MaxRetryBackoff = maxRetryBackoff
|
||||
opt.ReadTimeout = readTimeout
|
||||
opt.WriteTimeout = writeTimeout
|
||||
var rdb redis.UniversalClient
|
||||
opt.MaintNotificationsConfig = &maintnotifications.Config{Mode: maintnotifications.ModeDisabled}
|
||||
var prefix string
|
||||
var rdb redis.UniversalClient
|
||||
|
||||
if strings.Contains(hosts, ",") && strings.Index(hosts, ",") < strings.Index(hosts, ":") {
|
||||
var fopt redis.FailoverOptions
|
||||
ps := strings.Split(hosts, ",")
|
||||
@@ -269,15 +281,37 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) {
|
||||
rdb: rdb,
|
||||
prefix: prefix,
|
||||
}
|
||||
if clientCache {
|
||||
m.cache = newRedisCache(prefix, clientCacheSize, clientCacheExpiry, clientCachePreload)
|
||||
if err = m.cache.init(m.rdb); err != nil {
|
||||
logger.Warnf("Failed to setup client-side caching: %v", err)
|
||||
m.cache = nil
|
||||
}
|
||||
}
|
||||
m.en = m
|
||||
m.checkServerConfig()
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *redisMeta) Shutdown() error {
|
||||
if m.cache != nil {
|
||||
m.cache.close()
|
||||
m.cache = nil
|
||||
}
|
||||
return m.rdb.Close()
|
||||
}
|
||||
|
||||
// Override NewSession to initialize client-side cache after session is created
|
||||
func (m *redisMeta) NewSession(record bool) error {
|
||||
// First, create the session normally
|
||||
err := m.baseMeta.NewSession(record)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go m.preloadCache()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *redisMeta) doDeleteSlice(id uint64, size uint32) error {
|
||||
return m.rdb.HDel(Background(), m.sliceRefs(), m.sliceKey(id, size)).Err()
|
||||
}
|
||||
@@ -919,6 +953,20 @@ func (m *redisMeta) doLookup(ctx Context, parent Ino, name string, inode *Ino, a
|
||||
var encodedAttr []byte
|
||||
var err error
|
||||
entryKey := m.entryKey(parent)
|
||||
if m.cache != nil {
|
||||
if entry, ok := m.cache.entryCache.Get(m.cache.entryName(parent, name)); ok {
|
||||
if !entry.isMark() {
|
||||
*inode = entry.ino
|
||||
if attr != nil {
|
||||
*attr = entry.Attr
|
||||
}
|
||||
return 0
|
||||
}
|
||||
m.cache.entryCache.AddIf(m.cache.entryName(parent, name), &entryMark, func(oldEntry *cachedEntry, exists bool) bool {
|
||||
return exists
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(m.shaLookup) > 0 && attr != nil && !m.conf.CaseInsensi && m.prefix == "" {
|
||||
var res interface{}
|
||||
var returnedIno int64
|
||||
@@ -946,6 +994,13 @@ func (m *redisMeta) doLookup(ctx Context, parent Ino, name string, inode *Ino, a
|
||||
if err == nil {
|
||||
m.parseAttr(encodedAttr, attr)
|
||||
m.of.Update(foundIno, attr)
|
||||
if m.cache != nil {
|
||||
ce := &cachedEntry{ino: foundIno}
|
||||
m.parseAttr(encodedAttr, &ce.Attr)
|
||||
_, _ = m.cache.entryCache.AddIf(m.cache.entryName(parent, name), ce, func(oldEntry *cachedEntry, exists bool) bool {
|
||||
return exists && oldEntry.isMark()
|
||||
})
|
||||
}
|
||||
} else if err == redis.Nil { // corrupt entry
|
||||
logger.Warnf("no attribute for inode %d (%d, %s)", foundIno, parent, name)
|
||||
*attr = Attr{Typ: foundType}
|
||||
|
||||
327
pkg/meta/redis_csc.go
Normal file
327
pkg/meta/redis_csc.go
Normal file
@@ -0,0 +1,327 @@
|
||||
//go:build !noredis
|
||||
// +build !noredis
|
||||
|
||||
/*
|
||||
* JuiceFS, Copyright 2020 Juicedata, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/redis/go-redis/v9/push"
|
||||
)
|
||||
|
||||
var entryMark cachedEntry
|
||||
|
||||
type cachedEntry struct {
|
||||
ino Ino
|
||||
Attr
|
||||
}
|
||||
|
||||
func (e *cachedEntry) isMark() bool {
|
||||
return e.ino == 0
|
||||
}
|
||||
|
||||
// redisCache support bcast mode client-side cache
|
||||
// cache attrs and entries only, chunks are already cached in OpenCache
|
||||
type redisCache struct {
|
||||
cli *redis.Client
|
||||
prefix string
|
||||
cap int
|
||||
expiry time.Duration
|
||||
preload int
|
||||
subscription *redis.PubSub
|
||||
|
||||
inodeCache *expirable.LRU[Ino, []byte]
|
||||
entryCache *expirable.LRU[string, *cachedEntry]
|
||||
}
|
||||
|
||||
func newRedisCache(prefix string, cap int, expiry time.Duration, preload int) *redisCache {
|
||||
logger.Infof("Initializing Redis client-side cache with size %d and expiry %+v", cap, expiry)
|
||||
return &redisCache{
|
||||
prefix: prefix,
|
||||
cap: cap,
|
||||
expiry: expiry,
|
||||
preload: preload,
|
||||
inodeCache: expirable.NewLRU[Ino, []byte](cap, nil, expiry),
|
||||
entryCache: expirable.NewLRU[string, *cachedEntry](cap, nil, expiry),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisCache) init(cli redis.UniversalClient) error {
|
||||
ctx := context.WithValue(context.Background(), invalidConnKey{}, true)
|
||||
var err error
|
||||
if rc, ok := cli.(*redis.Client); ok {
|
||||
c.cli = rc
|
||||
} else if cc, ok := cli.(*redis.ClusterClient); ok {
|
||||
// For cluster mode, we should get the master node for our key
|
||||
if c.cli, err = cc.MasterForKey(ctx, c.prefix); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.cli.Options().OnConnect = c.onInvalidateConnect
|
||||
// under the RESP3 protocol, "__redis__:invalidate" actually has no effect.
|
||||
// we use Pubsub channel to simplify connection management and receiving PUSH messages.
|
||||
c.subscription = c.cli.Subscribe(ctx, "__redis__:invalidate")
|
||||
_ = c.subscription.Channel()
|
||||
// handle PUSH notifications for invalidation in c.HandlePushNotification
|
||||
if err = c.cli.RegisterPushNotificationHandler("invalidate", c, true); err != nil {
|
||||
c.close()
|
||||
return err
|
||||
}
|
||||
// handle client cmd to avoid race conditions
|
||||
c.cli.AddHook(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
keyTypOther = iota
|
||||
keyTypInode
|
||||
keyTypEntry
|
||||
)
|
||||
|
||||
func (c *redisCache) parse(key string) int {
|
||||
if strings.HasPrefix(key, c.prefix+"i") {
|
||||
return keyTypInode
|
||||
}
|
||||
if strings.HasPrefix(key, c.prefix+"d") {
|
||||
return keyTypEntry
|
||||
}
|
||||
return keyTypOther
|
||||
}
|
||||
|
||||
func (c *redisCache) entryName(parent Ino, name string) string {
|
||||
return fmt.Sprintf("%d%d%s", parent, os.PathSeparator, name)
|
||||
}
|
||||
|
||||
func (c *redisCache) HandlePushNotification(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
|
||||
if len(notification) != 2 || notification[0] == nil || notification[1] == nil {
|
||||
return nil
|
||||
}
|
||||
if typ, ok := notification[0].(string); !ok || typ != "invalidate" {
|
||||
return nil
|
||||
}
|
||||
iKeys := notification[1].([]interface{})
|
||||
var key string
|
||||
for _, iKey := range iKeys {
|
||||
key = iKey.(string)
|
||||
typ := c.parse(key)
|
||||
switch typ {
|
||||
case keyTypInode:
|
||||
inodeStr := key[len(c.prefix)+1:]
|
||||
inode, err := strconv.ParseUint(inodeStr, 10, 64)
|
||||
if err == nil {
|
||||
c.inodeCache.Remove(Ino(inode))
|
||||
}
|
||||
case keyTypEntry:
|
||||
parentStr := key[len(c.prefix)+1:]
|
||||
// invalidate all entries related to this directory
|
||||
prefix := fmt.Sprintf("%s%d", parentStr, os.PathSeparator)
|
||||
for _, k := range c.entryCache.Keys() {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
c.entryCache.Remove(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *redisCache) DialHook(next redis.DialHook) redis.DialHook { return nil }
|
||||
|
||||
var inodeMark []byte
|
||||
|
||||
func (c *redisCache) beforeProcess(cmd redis.Cmder, skip bool) bool {
|
||||
name, args := cmd.Name(), cmd.Args()
|
||||
var key string
|
||||
var ok bool
|
||||
if len(args) < 2 {
|
||||
return true
|
||||
}
|
||||
if key, ok = args[1].(string); !ok {
|
||||
return true
|
||||
}
|
||||
typ := c.parse(key)
|
||||
|
||||
if name == "get" && typ == keyTypInode {
|
||||
num, err := strconv.ParseUint(key[len(c.prefix)+1:], 10, 64)
|
||||
if err == nil {
|
||||
inode := Ino(num)
|
||||
if data, ok := c.inodeCache.Get(inode); ok {
|
||||
if !skip && len(data) > 0 {
|
||||
rsp := cmd.(*redis.StringCmd)
|
||||
rsp.SetErr(nil)
|
||||
rsp.SetVal(bytesToString(data))
|
||||
return false
|
||||
}
|
||||
}
|
||||
c.inodeCache.AddIf(inode, inodeMark, func(oldVal []byte, exists bool) bool {
|
||||
return !exists
|
||||
})
|
||||
// request to Redis server
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *redisCache) afterProcess(cmd redis.Cmder) {
|
||||
name, args := cmd.Name(), cmd.Args()
|
||||
var key string
|
||||
var ok bool
|
||||
if len(args) < 2 {
|
||||
return
|
||||
}
|
||||
if key, ok = args[1].(string); !ok {
|
||||
return
|
||||
}
|
||||
typ := c.parse(key)
|
||||
|
||||
switch name {
|
||||
case "get":
|
||||
if typ == keyTypInode {
|
||||
if data, err := cmd.(*redis.StringCmd).Bytes(); err == nil {
|
||||
num, err := strconv.ParseUint(key[len(c.prefix)+1:], 10, 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, _ = c.inodeCache.AddIf(Ino(num), data, func(oldVal []byte, exists bool) bool {
|
||||
return exists && len(oldVal) == 0
|
||||
})
|
||||
}
|
||||
}
|
||||
case "set":
|
||||
if typ == keyTypInode {
|
||||
if cmd.(*redis.StatusCmd).Err() == nil {
|
||||
if num, err := strconv.ParseUint(key[len(c.prefix)+1:], 10, 64); err == nil {
|
||||
_ = c.inodeCache.Remove(Ino(num))
|
||||
}
|
||||
}
|
||||
}
|
||||
case "hdel":
|
||||
if typ == keyTypEntry {
|
||||
if err := cmd.(*redis.IntCmd).Err(); err == nil {
|
||||
for i := 2; i < len(args); i++ {
|
||||
_ = c.entryCache.Remove(fmt.Sprintf("%s%d%s", key[len(c.prefix)+1:], os.PathSeparator, args[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
case "hset":
|
||||
if typ == keyTypEntry {
|
||||
if err := cmd.(*redis.IntCmd).Err(); err == nil {
|
||||
for i := 2; i < len(args); i += 2 {
|
||||
_ = c.entryCache.Remove(fmt.Sprintf("%s%d%s", key[len(c.prefix)+1:], os.PathSeparator, args[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisCache) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
||||
return func(ctx context.Context, cmd redis.Cmder) error {
|
||||
if !c.beforeProcess(cmd, false) {
|
||||
return nil
|
||||
}
|
||||
err := next(ctx, cmd)
|
||||
c.afterProcess(cmd)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisCache) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
||||
return func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
for _, cmd := range cmds {
|
||||
_ = c.beforeProcess(cmd, true)
|
||||
}
|
||||
err := next(ctx, cmds)
|
||||
for _, cmd := range cmds {
|
||||
c.afterProcess(cmd)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *redisCache) close() {
|
||||
if c.subscription != nil {
|
||||
if err := c.subscription.Close(); err != nil {
|
||||
logger.Warnf("failed closing Redis cache subscription: %v", err)
|
||||
}
|
||||
c.subscription = nil
|
||||
}
|
||||
if c.cli != nil {
|
||||
c.cli.Options().OnConnect = nil
|
||||
}
|
||||
c.cli = nil
|
||||
}
|
||||
|
||||
type invalidConnKey struct{}
|
||||
|
||||
func (c *redisCache) onInvalidateConnect(ctx context.Context, cn *redis.Conn) error {
|
||||
if ctx.Value(invalidConnKey{}) == nil {
|
||||
return nil
|
||||
}
|
||||
// clear all caches on reconnect
|
||||
c.inodeCache.Purge()
|
||||
c.entryCache.Purge()
|
||||
// use the pubsub connection to handle tracking and invalidate
|
||||
_ = cn.Do(ctx, "CLIENT", "TRACKING", "OFF").Err()
|
||||
if err := cn.Do(ctx, "CLIENT", "TRACKING", "ON", "BCAST", "PREFIX", c.prefix+"i", "PREFIX", c.prefix+"d").Err(); err != nil {
|
||||
logger.Warnf("Failed to enable Redis client-side caching on new connection: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *redisMeta) preloadCache() {
|
||||
if m.cache == nil {
|
||||
return
|
||||
}
|
||||
if m.cache.preload <= 0 {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
ctx := Background()
|
||||
attr := &Attr{}
|
||||
if eno := m.doGetAttr(ctx, m.root, attr); eno != 0 {
|
||||
logger.Warnf("failed to get root inode %d attribute: %d", m.root, eno)
|
||||
return
|
||||
}
|
||||
|
||||
var entries []*Entry
|
||||
if eno := m.doReaddir(ctx, m.root, 1, &entries, m.cache.preload); eno != 0 {
|
||||
logger.Warnf("failed to read root %d directory: %d", m.root, eno)
|
||||
return
|
||||
}
|
||||
for _, entry := range entries {
|
||||
m.cache.entryCache.Add(m.cache.entryName(m.root, string(entry.Name)), &cachedEntry{
|
||||
ino: entry.Inode,
|
||||
Attr: *entry.Attr,
|
||||
})
|
||||
}
|
||||
logger.Infof("preload %d inodes in %v", m.cache.inodeCache.Len(), time.Since(start))
|
||||
}
|
||||
|
||||
func bytesToString(b []byte) string {
|
||||
return *(*string)(unsafe.Pointer(&b))
|
||||
}
|
||||
106
pkg/meta/redis_csc_test.go
Normal file
106
pkg/meta/redis_csc_test.go
Normal file
@@ -0,0 +1,106 @@
|
||||
//go:build !noredis
|
||||
|
||||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func mockRedisCSCMeta(t *testing.T) *redisMeta {
|
||||
m, err := newRedisMeta("redis", "127.0.0.1:6379/10?client-cache=true", testConfig())
|
||||
require.NoError(t, err, "failed to create redis meta")
|
||||
require.Equal(t, "redis", m.Name(), "meta name should be redis")
|
||||
return m.(*redisMeta)
|
||||
}
|
||||
|
||||
func TestRedisCache(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
m := mockRedisCSCMeta(t)
|
||||
_ = m.rdb.FlushAll(ctx)
|
||||
defer m.Shutdown()
|
||||
defer m.cache.close()
|
||||
|
||||
var err error
|
||||
t.Run("invalidation handling", func(t *testing.T) {
|
||||
cache := m.cache
|
||||
ino := Ino(100)
|
||||
attr := &Attr{Typ: TypeFile, Mode: 0644}
|
||||
cache.inodeCache.Add(ino, attr.Marshal())
|
||||
if _, ok := cache.inodeCache.Get(ino); !ok {
|
||||
t.Fatal("inode should be in cache")
|
||||
}
|
||||
|
||||
err = m.rdb.Set(ctx, m.inodeKey(ino), m.marshal(&Attr{Mode: 0755}), 0).Err()
|
||||
require.NoError(t, err, "failed to set key %d", ino)
|
||||
dumIno := Ino(101)
|
||||
err = m.rdb.Set(ctx, m.inodeKey(dumIno), m.marshal(&Attr{Mode: 0755}), 0).Err()
|
||||
require.NoError(t, err, "failed to set key %d", dumIno)
|
||||
time.Sleep(3 * time.Second)
|
||||
if _, ok := cache.inodeCache.Get(Ino(100)); ok {
|
||||
t.Fatal("inode should be invalidated and removed from cache")
|
||||
}
|
||||
|
||||
cache.entryCache.Add(cache.entryName(101, "file"), &cachedEntry{})
|
||||
m.rdb.HSet(ctx, m.entryKey(100), "file", "content").Err()
|
||||
})
|
||||
t.Run("cache expiration", func(t *testing.T) {
|
||||
shortExpiry := 50 * time.Millisecond
|
||||
cache := newRedisCache("jfs", 1000, shortExpiry, 0)
|
||||
attr := &Attr{Typ: TypeFile, Mode: 0644}
|
||||
cache.inodeCache.Add(Ino(102), attr.Marshal())
|
||||
time.Sleep(3 * shortExpiry)
|
||||
if _, ok := cache.inodeCache.Get(Ino(102)); ok {
|
||||
t.Fatal("inode should be expired")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("inode hook", func(t *testing.T) {
|
||||
cache := m.cache
|
||||
ino := Ino(103)
|
||||
attr := &Attr{Typ: TypeFile, Length: 10}
|
||||
cache.inodeCache.Add(ino, attr.Marshal())
|
||||
|
||||
data, err := m.rdb.Get(ctx, m.inodeKey(ino)).Bytes()
|
||||
require.NoError(t, err, "failed to get inode")
|
||||
attr2 := &Attr{}
|
||||
attr2.Unmarshal(data)
|
||||
attr2.Full = false
|
||||
require.Equal(t, *attr, *attr2)
|
||||
|
||||
attr3 := &Attr{Typ: TypeFile, Length: 20}
|
||||
err = m.rdb.Set(ctx, m.inodeKey(ino), attr3.Marshal(), 0).Err()
|
||||
require.NoError(t, err)
|
||||
_, ok := cache.inodeCache.Get(ino)
|
||||
require.False(t, ok)
|
||||
})
|
||||
|
||||
t.Run("entry hook", func(t *testing.T) {
|
||||
cache := m.cache
|
||||
ino := Ino(104)
|
||||
name1, name2 := cache.entryName(ino, "f1"), cache.entryName(ino, "f2")
|
||||
cache.entryCache.Add(name1, &cachedEntry{})
|
||||
cache.entryCache.Add(name2, &cachedEntry{})
|
||||
|
||||
err := m.rdb.HSet(ctx, m.entryKey(ino), "f1", "c1", "f2", "c2").Err()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok := cache.entryCache.Get(name1)
|
||||
require.False(t, ok)
|
||||
_, ok = cache.entryCache.Get(name2)
|
||||
require.False(t, ok)
|
||||
|
||||
cache.entryCache.Add(name1, &cachedEntry{})
|
||||
cache.entryCache.Add(name2, &cachedEntry{})
|
||||
err = m.rdb.HDel(ctx, m.entryKey(ino), "f1", "f2").Err()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok = cache.entryCache.Get(name1)
|
||||
require.False(t, ok)
|
||||
_, ok = cache.entryCache.Get(name2)
|
||||
require.False(t, ok)
|
||||
})
|
||||
}
|
||||
@@ -106,6 +106,25 @@ func (qm *queryMap) duration(key, originalKey string, d time.Duration) time.Dura
|
||||
}
|
||||
}
|
||||
|
||||
func (qm *queryMap) getInt(key, originalKey string, defaultValue int) int {
|
||||
val := qm.Get(key)
|
||||
if val == "" {
|
||||
oVal := qm.Get(originalKey)
|
||||
if oVal == "" {
|
||||
return defaultValue
|
||||
}
|
||||
val = oVal
|
||||
}
|
||||
|
||||
qm.Del(key)
|
||||
if i, err := strconv.ParseInt(val, 10, 32); err == nil {
|
||||
return int(i)
|
||||
} else {
|
||||
logger.Warnf("Parse int %s for key %s: %s", val, key, err)
|
||||
return defaultValue
|
||||
}
|
||||
}
|
||||
|
||||
func (qm *queryMap) pop(key string) string {
|
||||
defer qm.Del(key)
|
||||
return qm.Get(key)
|
||||
|
||||
Reference in New Issue
Block a user