This commit is contained in:
xuyuchao
2025-11-26 17:12:11 +08:00
parent 07a74d53d5
commit 7dd61023d5
2 changed files with 33 additions and 55 deletions

View File

@@ -55,18 +55,18 @@ type Port struct {
// attrCacheEntry stores cached attributes for an inode
type attrCacheEntry struct {
attr Attr // complete attributes
attr Attr // complete attributes
timestamp time.Time // cache timestamp for expiration
}
type asyncSetAttrTask struct {
ctx Context
ino Ino
set uint16
atime int64
mtime int64
atimensec uint32
mtimensec uint32
ctx Context
ino Ino
set uint16
atime int64
mtime int64
atimensec uint32
mtimensec uint32
}
// FuseOptions contains options for fuse mount, keep the same structure with `fuse.MountOptions`
@@ -1255,10 +1255,10 @@ type VFS struct {
modM sync.Mutex
modifiedAt map[Ino]time.Time
registry *prometheus.Registry
attrCache sync.Map // inode -> *attrCacheEntry
asyncSetAttrLimiter chan struct{} // 控制最大并发的信号量
attrCacheTTL time.Duration
registry *prometheus.Registry
attrCache sync.Map
asyncSetAttrLimiter chan struct{}
attrCacheTTL time.Duration
}
func NewVFS(conf *Config, m meta.Meta, store chunk.ChunkStore, registerer prometheus.Registerer, registry *prometheus.Registry) *VFS {
@@ -1266,19 +1266,19 @@ func NewVFS(conf *Config, m meta.Meta, store chunk.ChunkStore, registerer promet
writer := NewDataWriter(conf, m, store, reader)
v := &VFS{
Conf: conf,
Meta: m,
Store: store,
reader: reader,
writer: writer,
cacheFiller: NewCacheFiller(conf, m, store),
handles: make(map[Ino][]*handle),
handleIno: make(map[uint64]Ino),
modifiedAt: make(map[meta.Ino]time.Time),
Conf: conf,
Meta: m,
Store: store,
reader: reader,
writer: writer,
cacheFiller: NewCacheFiller(conf, m, store),
handles: make(map[Ino][]*handle),
handleIno: make(map[uint64]Ino),
modifiedAt: make(map[meta.Ino]time.Time),
nextfh: 1,
registry: registry,
asyncSetAttrLimiter: make(chan struct{}, 100), // 最多 100 个并发
attrCacheTTL: 10 * time.Second,
asyncSetAttrLimiter: make(chan struct{}, 100), // Up to 100 concurrents
attrCacheTTL: 10 * time.Second, // 10s TTL
}
n := getInternalNode(ConfigInode)
@@ -1305,6 +1305,7 @@ func NewVFS(conf *Config, m meta.Meta, store chunk.ChunkStore, registerer promet
_ = os.Rename(statePath, statePath+".bak")
go v.cleanupModified()
go v.cleanupAttrCache()
initVFSMetrics(v, writer, reader, registerer)
return v
}
@@ -1356,56 +1357,42 @@ func (v *VFS) asyncSetAttr(task asyncSetAttrTask) {
}
}
// cleanupAttrCache periodically removes expired cache entries (30 seconds interval)
func (v *VFS) cleanupAttrCache() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
for {
now := time.Now()
deletedCount := 0
// sync.Map Range is safe for concurrent access
v.attrCache.Range(func(key, value interface{}) bool {
entry := value.(*attrCacheEntry)
if now.Sub(entry.timestamp) > v.attrCacheTTL {
v.attrCache.Delete(key)
deletedCount++
}
return true // continue iteration
return true
})
if deletedCount > 0 {
logger.Debugf("Cleaned up %d expired attr cache entries", deletedCount)
}
time.Sleep(60 * time.Second)
}
}
// setAttrCache stores attributes in cache (lock-free)
func (v *VFS) setAttrCache(ino Ino, attr *Attr) {
if attr == nil {
return
}
entry := &attrCacheEntry{
attr: *attr, // value copy
attr: *attr,
timestamp: time.Now(),
}
v.attrCache.Store(ino, entry) // sync.Map.Store() is thread-safe
v.attrCache.Store(ino, entry)
}
// deleteAttrCache removes attributes from cache (lock-free)
func (v *VFS) deleteAttrCache(ino Ino) {
v.attrCache.Delete(ino) // sync.Map.Delete() is thread-safe
v.attrCache.Delete(ino)
}
func (v *VFS) invalidateAttr(ino Ino) {
v.modM.Lock()
v.modifiedAt[ino] = time.Now()
v.modM.Unlock()
// Clear attribute cache if async SetAttr is enabled
if v.Conf.FuseOpts != nil && v.Conf.FuseOpts.EnableWriteback {
v.deleteAttrCache(ino)
}
if v.Conf.FuseOpts != nil && v.Conf.FuseOpts.EnableWriteback {
v.deleteAttrCache(ino)
}
}
func (v *VFS) ModifiedSince(ino Ino, start time.Time) bool {

View File

@@ -145,9 +145,7 @@ func onlyTime(set int) bool {
return (set &^ (meta.SetAttrAtime | meta.SetAttrAtimeNow | meta.SetAttrMtime | meta.SetAttrMtimeNow | meta.SetAttrCtime | meta.SetAttrCtimeNow)) == 0
}
// tryAsyncSetAttr attempts to submit an async SetAttr task (non-blocking with capacity control)
func (v *VFS) tryAsyncSetAttr(ctx Context, ino Ino, set int, atime, mtime int64, atimensec, mtimensec uint32) bool {
// Build task (only time-related fields)
task := asyncSetAttrTask{
ctx: ctx,
ino: ino,
@@ -166,7 +164,6 @@ func (v *VFS) tryAsyncSetAttr(ctx Context, ino Ino, set int, atime, mtime int64,
}
}
// getAttrCache retrieves cached attributes (lock-free read)
func (v *VFS) getAttrCache(ino Ino) (*attrCacheEntry, bool) {
if cached, ok := v.attrCache.Load(ino); ok {
entry := cached.(*attrCacheEntry)
@@ -175,7 +172,6 @@ func (v *VFS) getAttrCache(ino Ino) (*attrCacheEntry, bool) {
return nil, false
}
func (v *VFS) SetAttr(ctx Context, ino Ino, set int, fh uint64, mode, uid, gid uint32, atime, mtime int64, atimensec, mtimensec uint32, size uint64) (entry *meta.Entry, err syscall.Errno) {
str := setattrStr(set, mode, uid, gid, atime, mtime, size)
defer func() {
@@ -191,13 +187,9 @@ func (v *VFS) SetAttr(ctx Context, ino Ino, set int, fh uint64, mode, uid, gid u
return
}
var attr = &Attr{}
// Check if async processing is possible (only time-related, writeback enabled)
if onlyTime(set) && v.Conf.FuseOpts != nil && v.Conf.FuseOpts.EnableWriteback {
// Try to get cached attributes
if cachedEntry, ok := v.getAttrCache(ino); ok {
// Check if cache is still valid (5s TTL)
if time.Since(cachedEntry.timestamp) <= v.attrCacheTTL {
// Prepare attr for permission check
checkAttr := Attr{}
if set&meta.SetAttrAtime != 0 {
checkAttr.Atime = atime
@@ -215,7 +207,6 @@ func (v *VFS) SetAttr(ctx Context, ino Ino, set int, fh uint64, mode, uid, gid u
if set&meta.SetAttrMtimeNow != 0 {
v.writer.UpdateMtime(ino, time.Now())
}
// Async submission successful, return cached attributes immediately
v.UpdateLength(ino, &cachedEntry.attr)
entry = &meta.Entry{Inode: ino, Attr: &cachedEntry.attr}
if onlyTime(set) && v.Conf.FuseOpts != nil && v.Conf.FuseOpts.EnableWriteback {