Support subdir option in hadoop sdk (#6097)

This commit is contained in:
kyungwan-nam
2025-05-19 17:33:43 +09:00
committed by GitHub
parent b62fe7253d
commit 020ef65738
6 changed files with 82 additions and 3 deletions

View File

@@ -356,11 +356,12 @@ type javaConf struct {
PushLabels string `json:"pushLabels"`
PushGraphite string `json:"pushGraphite"`
Caller int `json:"caller"`
Subdir string `json:"subdir"`
SuperFS bool `json:"superFs,omitempty"`
}
func getOrCreate(name, user, group, superuser, supergroup string, superFs bool, f func() *fs.FileSystem) int64 {
func getOrCreate(name, user, group, superuser, supergroup string, conf javaConf, f func() *fs.FileSystem) int64 {
fslock.Lock()
defer fslock.Unlock()
ws := activefs[name]
@@ -381,7 +382,7 @@ func getOrCreate(name, user, group, superuser, supergroup string, superFs bool,
}
logger.Infof("JuiceFileSystem created for user:%s group:%s", user, group)
}
w := &wrapper{jfs, nil, m, user, superuser, supergroup, superFs}
w := &wrapper{jfs, nil, m, user, superuser, supergroup, conf.SuperFS}
var gs []string
if userGroupCache[name] != nil {
gs = userGroupCache[name][user]
@@ -396,6 +397,18 @@ func getOrCreate(name, user, group, superuser, supergroup string, superFs bool,
} else {
w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group))
}
// Check if the subdir is valid
if conf.Subdir != "" {
fi, err := jfs.Stat(w.ctx, conf.Subdir)
if err != 0 {
logger.Errorf("subdir %s is not valid: %v", conf.Subdir, err)
return 0
}
if !fi.IsDir() {
logger.Errorf("subdir %s is not a directory", conf.Subdir)
return 0
}
}
activefs[name] = append(ws, w)
nextFsHandle = nextFsHandle + 1
handlers[nextFsHandle] = w
@@ -467,7 +480,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
if err != nil {
logger.Fatalf("invalid json: %s", C.GoString(jsonConf))
}
return getOrCreate(name, C.GoString(user), C.GoString(group), C.GoString(superuser), C.GoString(supergroup), jConf.SuperFS, func() *fs.FileSystem {
return getOrCreate(name, C.GoString(user), C.GoString(group), C.GoString(superuser), C.GoString(supergroup), jConf, func() *fs.FileSystem {
if jConf.Debug || os.Getenv("JUICEFS_DEBUG") != "" {
utils.SetLogLevel(logrus.DebugLevel)
go func() {
@@ -634,6 +647,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
DirEntryTimeout: utils.Duration(jConf.DirEntryTimeout),
AccessLog: jConf.AccessLog,
FastResolve: jConf.FastResolve,
Subdir: jConf.Subdir,
BackupMeta: utils.Duration(jConf.BackupMeta),
BackupSkipTrash: jConf.BackupSkipTrash,
}

View File

@@ -405,6 +405,16 @@ public class JuiceFileSystemImpl extends FileSystem {
for (String key : bkeys) {
obj.put(key, Boolean.valueOf(getConf(conf, key, "false")));
}
String subdir = getConf(conf, "subdir", "");
if (subdir.equals("/")) {
subdir = "";
} else if (!subdir.startsWith("/")) {
subdir = "/" + subdir;
}
subdir = subdir.replaceAll("/+$", "");
if (!subdir.isEmpty()) {
LOG.debug("subdir {} is enabled", subdir);
}
obj.put("bucket", getConf(conf, "bucket", ""));
obj.put("storageClass", getConf(conf, "storage-class", ""));
obj.put("readOnly", Boolean.valueOf(getConf(conf, "read-only", "false")));
@@ -448,6 +458,7 @@ public class JuiceFileSystemImpl extends FileSystem {
obj.put("freeSpace", getConf(conf, "free-space", "0.1"));
obj.put("accessLog", getConf(conf, "access-log", ""));
obj.put("superFs", asSuperFs);
obj.put("subdir", subdir);
String jsonConf = obj.toString(2);
handle = lib.jfs_init(name, jsonConf, user, groupStr, superuser, supergroup);
if (handle <= 0) {

View File

@@ -1201,4 +1201,53 @@ public class JuiceFileSystemTest extends TestCase {
user1Fs.delete(d1, true);
user1Fs.delete(d2, true);
}
public void testSubdir() throws IOException, InterruptedException {
Configuration newConf = new Configuration(cfg);
newConf.set("fs.defaultFS", "jfs://test/");
newConf.set("juicefs.name", "test");
newConf.set("juicefs.test.meta", newConf.get("juicefs.dev.meta"));
// Test creating a new filesystem with an invalid subdir
newConf.set("juicefs.subdir", "nonexistent");
try {
FileSystem.newInstance(newConf);
fail("Creating filesystem should fail because the subdir must be a valid directory");
} catch (IOException ignored) {
}
// Test creating a new filesystem with a valid subdir
Path subdir = new Path("/test_subdir");
fs.delete(subdir, true);
fs.mkdirs(subdir);
fs.setPermission(subdir, new FsPermission((short) 0777));
newConf.set("juicefs.subdir", "/test_subdir");
FileSystem newFS = FileSystem.newInstance(newConf);
// Test file operations within the subdir
assertTrue(newFS.mkdirs(new Path("/test_subdir/dir")));
newFS.create(new Path("/test_subdir/dir/f")).close();
assertTrue(newFS.exists(new Path("/test_subdir/dir/f")));
// Test file operations not within the subdir
Path nonexistent = new Path("/nonexistent");
try {
newFS.exists(nonexistent);
fail("exists should not work because the path is not under the subdir");
} catch (AccessControlException e) {
assertTrue(e.getMessage().contains("Permission denied"));
}
try {
newFS.mkdirs(nonexistent);
fail("mkdirs should not work because the path is not under the subdir");
} catch (AccessControlException e) {
assertTrue(e.getMessage().contains("Permission denied"));
}
try {
newFS.create(nonexistent);
fail("create should not work because the path is not under the subdir");
} catch (AccessControlException e) {
assertTrue(e.getMessage().contains("Permission denied"));
}
newFS.close();
}
}