HCFS 1.0.2

This commit is contained in:
Chris Lu
2018-12-14 09:16:21 -08:00
parent 42b8f1145a
commit 21315f709d
6 changed files with 605 additions and 79 deletions

View File

@@ -5,17 +5,27 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -259,4 +269,333 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
return path.makeQualified(uri, workingDirectory);
}
/**
* Concat existing files together.
* @param trg the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
@Override
public void concat(final Path trg, final Path [] psrcs) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Truncate the file in the indicated path to the indicated size.
* <ul>
* <li>Fails if path is a directory.</li>
* <li>Fails if path does not exist.</li>
* <li>Fails if path is not closed.</li>
* <li>Fails if new size is greater than current size.</li>
* </ul>
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
*
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
@Override
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
}
/**
* See {@link FileContext#createSymlink(Path, Path, boolean)}.
*/
@Override
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
}
/**
* See {@link AbstractFileSystem#supportsSymlinks()}.
*/
@Override
public boolean supportsSymlinks() {
return false;
}
/**
* Create a snapshot.
* @param path The directory where snapshots will be taken.
* @param snapshotName The name of the snapshot
* @return the snapshot path.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
*/
@Override
public Path createSnapshot(Path path, String snapshotName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support createSnapshot");
}
/**
* Rename a snapshot.
* @param path The directory path where the snapshot was taken
* @param snapshotOldName Old name of the snapshot
* @param snapshotNewName New name of the snapshot
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
/**
* Delete a snapshot of a directory.
* @param path The directory that the to-be-deleted snapshot belongs to
* @param snapshotName The name of the snapshot
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void deleteSnapshot(Path path, String snapshotName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support deleteSnapshot");
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support modifyAclEntries");
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
* @param aclSpec List describing entries to remove
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeAclEntries");
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeDefaultAcl(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeDefaultAcl");
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
* @param path Path to modify
* @throws IOException if an ACL could not be removed
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeAcl(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeAcl");
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
* for user, group, and others for compatibility with permission bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setAcl");
}
/**
* Gets the ACL of a file or directory.
*
* @param path Path to get
* @return AclStatus describing the ACL of the file or directory
* @throws IOException if an ACL could not be read
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public AclStatus getAclStatus(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getAclStatus");
}
/**
* Set an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
* @param name xattr name.
* @param value xattr value.
* @param flag xattr set flag
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}
/**
* Get an xattr name and value for a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attribute
* @param name xattr name.
* @return byte[] xattr value.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttr");
}
/**
* Get all of the xattr name/value pairs for a file or directory.
* Only those xattrs which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttrs");
}
/**
* Get all of the xattrs name/value pairs for a file or directory.
* Only those xattrs which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @param names XAttr names.
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttrs");
}
/**
* Get all of the xattr names for a file or directory.
* Only those xattr names which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @return List{@literal <String>} of the XAttr names of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public List<String> listXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support listXAttrs");
}
/**
* Remove an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to remove extended attribute
* @param name xattr name
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
@Override
public void removeXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeXAttr");
}
}

View File

@@ -7,6 +7,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
@@ -23,10 +24,12 @@ public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient;
public SeaweedFileSystemStore(String host, int port) {
int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
filerClient = new FilerClient(filerGrpcClient);
}
public static String getParentDirectory(Path path) {
@@ -49,23 +52,12 @@ public class SeaweedFileSystemStore {
permission,
umask);
long now = System.currentTimeMillis() / 1000L;
FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(getParentDirectory(path))
.setEntry(FilerProto.Entry.newBuilder()
.setName(path.getName())
.setIsDirectory(true)
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setMtime(now)
.setCrtime(now)
.setFileMode(permissionToMode(permission, true))
.setUserName(currentUser.getUserName())
.addAllGroupName(Arrays.asList(currentUser.getGroupNames())))
);
FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build());
return true;
return filerClient.mkdirs(
path.toUri().getPath(),
permissionToMode(permission, true),
currentUser.getUserName(),
currentUser.getGroupNames()
);
}
public FileStatus[] listEntries(final Path path) {
@@ -73,7 +65,7 @@ public class SeaweedFileSystemStore {
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = lookupEntries(path);
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
for (FilerProto.Entry entry : entries) {
@@ -84,16 +76,6 @@ public class SeaweedFileSystemStore {
return fileStatuses.toArray(new FileStatus[0]);
}
private List<FilerProto.Entry> lookupEntries(Path path) {
LOG.debug("listEntries path: {}", path);
return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path.toUri().getPath())
.setLimit(100000)
.build()).getEntriesList();
}
public FileStatus getFileStatus(final Path path) {
FilerProto.Entry entry = lookupEntry(path);
@@ -106,32 +88,24 @@ public class SeaweedFileSystemStore {
return fileStatus;
}
public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) {
public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
path,
String.valueOf(isDirectroy),
String.valueOf(isDirectory),
String.valueOf(recursive));
if (path.isRoot()) {
return true;
}
if (recursive && isDirectroy) {
List<FilerProto.Entry> entries = lookupEntries(path);
if (recursive && isDirectory) {
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
for (FilerProto.Entry entry : entries) {
deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), recursive);
deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true);
}
}
FilerProto.DeleteEntryResponse response =
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(getParentDirectory(path))
.setName(path.getName())
.setIsDirectory(isDirectroy)
.setIsDeleteData(true)
.build());
return true;
return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive);
}
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
@@ -151,18 +125,8 @@ public class SeaweedFileSystemStore {
private FilerProto.Entry lookupEntry(Path path) {
String directory = getParentDirectory(path);
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
try {
FilerProto.LookupDirectoryEntryResponse response =
filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(directory)
.setName(path.getName())
.build());
return response.getEntry();
} catch (io.grpc.StatusRuntimeException e) {
return null;
}
}
public void rename(Path source, Path destination) {
@@ -186,9 +150,16 @@ public class SeaweedFileSystemStore {
LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination);
FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build());
if (!isDirectoryCreated) {
return false;
}
if (entry.getIsDirectory()) {
Path entryPath = new Path(oldParent, entry.getName());
List<FilerProto.Entry> entries = lookupEntries(entryPath);
List<FilerProto.Entry> entries = filerClient.listEntries(entryPath.toUri().getPath());
for (FilerProto.Entry ent : entries) {
boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName()));
if (!isSucess) {
@@ -197,20 +168,9 @@ public class SeaweedFileSystemStore {
}
}
FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(getParentDirectory(destination))
.setEntry(newEntry)
.build());
return filerClient.deleteEntry(
oldParent.toUri().getPath(), entry.getName(), false, false);
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(oldParent.toUri().getPath())
.setName(entry.getName())
.setIsDirectory(entry.getIsDirectory())
.setIsDeleteData(false)
.build());
return true;
}
public OutputStream createFile(final Path path,
@@ -253,6 +213,7 @@ public class SeaweedFileSystemStore {
.setCrtime(now)
.setMtime(now)
.setUserName(userGroupInformation.getUserName())
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
}
@@ -306,10 +267,7 @@ public class SeaweedFileSystemStore {
LOG.debug("setOwner path:{} entry:{}", path, entryBuilder);
filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
.setDirectory(getParentDirectory(path))
.setEntry(entryBuilder)
.build());
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
@@ -332,10 +290,7 @@ public class SeaweedFileSystemStore {
LOG.debug("setPermission path:{} entry:{}", path, entryBuilder);
filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
.setDirectory(getParentDirectory(path))
.setEntry(entryBuilder)
.build());
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
}