本文整理汇总了Java中org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata类的典型用法代码示例。如果您正苦于以下问题:Java HdfsBlocksMetadata类的具体用法?Java HdfsBlocksMetadata怎么用?Java HdfsBlocksMetadata使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
HdfsBlocksMetadata类属于org.apache.hadoop.hdfs.protocol包,在下文中一共展示了HdfsBlocksMetadata类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: call
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
TraceScope scope =
Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
} catch (IOException e) {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
scope.close();
if (cdp != null) {
RPC.stopProxy(cdp);
}
}
return metadata;
}
开发者ID:naver,项目名称:hadoop,代码行数:23,代码来源:BlockStorageLocationUtil.java
示例2: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
String bpId, long[] blockIds,
List<Token<BlockTokenIdentifier>> tokens) throws IOException,
UnsupportedOperationException {
if (!getHdfsBlockLocationsEnabled) {
throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+ " is not enabled in datanode config");
}
if (blockIds.length != tokens.size()) {
throw new IOException("Differing number of blocks and tokens");
}
// Check access for each block
for (int i = 0; i < blockIds.length; i++) {
checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
}
DataNodeFaultInjector.get().getHdfsBlocksMetadata();
return data.getHdfsBlocksMetadata(bpId, blockIds);
}
开发者ID:naver,项目名称:hadoop,代码行数:23,代码来源:DataNode.java
示例3: call
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
} catch (IOException e) {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
if (cdp != null) {
RPC.stopProxy(cdp);
}
}
return metadata;
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:20,代码来源:BlockStorageLocationUtil.java
示例4: call
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
} catch (IOException e) {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
if (cdp != null) {
RPC.stopProxy(cdp);
}
}
return metadata;
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:20,代码来源:BlockStorageLocationUtil.java
示例5: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
List<Token<BlockTokenIdentifier>> tokens) throws IOException,
UnsupportedOperationException {
if (!getHdfsBlockLocationsEnabled) {
throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+ " is not enabled in datanode config");
}
if (blocks.size() != tokens.size()) {
throw new IOException("Differing number of blocks and tokens");
}
// Check access for each block
for (int i = 0; i < blocks.size(); i++) {
checkBlockToken(blocks.get(i), tokens.get(i),
BlockTokenSecretManager.AccessMode.READ);
}
return data.getHdfsBlocksMetadata(blocks);
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:19,代码来源:DataNode.java
示例6: call
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
try {
cdp = DFSUtil
.createClientDatanodeProtocolProxy(datanode, configuration, timeout,
connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
} catch (IOException e) {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
if (cdp != null) {
RPC.stopProxy(cdp);
}
}
return metadata;
}
开发者ID:hopshadoop,项目名称:hops,代码行数:21,代码来源:BlockStorageLocationUtil.java
示例7: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
List<Token<BlockTokenIdentifier>> tokens)
throws IOException, UnsupportedOperationException {
if (!getHdfsBlockLocationsEnabled) {
throw new UnsupportedOperationException(
"Datanode#getHdfsBlocksMetadata " +
" is not enabled in datanode config");
}
if (blocks.size() != tokens.size()) {
throw new IOException("Differing number of blocks and tokens");
}
// Check access for each block
for (int i = 0; i < blocks.size(); i++) {
checkBlockToken(blocks.get(i), tokens.get(i),
BlockTokenSecretManager.AccessMode.READ);
}
return data.getHdfsBlocksMetadata(blocks);
}
开发者ID:hopshadoop,项目名称:hops,代码行数:20,代码来源:DataNode.java
示例8: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
long[] blockIds,
List<Token<BlockTokenIdentifier>> tokens) throws IOException {
List<TokenProto> tokensProtos =
new ArrayList<TokenProto>(tokens.size());
for (Token<BlockTokenIdentifier> t : tokens) {
tokensProtos.add(PBHelper.convert(t));
}
// Build the request
GetHdfsBlockLocationsRequestProto request =
GetHdfsBlockLocationsRequestProto.newBuilder()
.setBlockPoolId(blockPoolId)
.addAllBlockIds(Longs.asList(blockIds))
.addAllTokens(tokensProtos)
.build();
// Send the RPC
GetHdfsBlockLocationsResponseProto response;
try {
response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
// List of volumes in the response
List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
for (ByteString bs : volumeIdsByteStrings) {
volumeIds.add(bs.toByteArray());
}
// Array of indexes into the list of volumes, one per block
List<Integer> volumeIndexes = response.getVolumeIndexesList();
// Parsed HdfsVolumeId values, one per block
return new HdfsBlocksMetadata(blockPoolId, blockIds,
volumeIds, volumeIndexes);
}
开发者ID:naver,项目名称:hadoop,代码行数:36,代码来源:ClientDatanodeProtocolTranslatorPB.java
示例9: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException {
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < volumes.volumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes
for (int i = 0; i < blocks.size(); i++) {
ExtendedBlock block = blocks.get(i);
FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
boolean isValid = false;
int volumeIndex = 0;
for (FsVolumeImpl volume : volumes.volumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
break;
}
volumeIndex++;
}
// Indicates that the block is not present, or not found in a data dir
if (!isValid) {
volumeIndex = Integer.MAX_VALUE;
}
blocksVolumeIndexes.add(volumeIndex);
}
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
blocksVolumeIds, blocksVolumeIndexes);
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:37,代码来源:FsDatasetImpl.java
示例10: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
List<Token<BlockTokenIdentifier>> tokens) throws IOException {
// Convert to proto objects
List<ExtendedBlockProto> blocksProtos =
new ArrayList<ExtendedBlockProto>(blocks.size());
List<TokenProto> tokensProtos =
new ArrayList<TokenProto>(tokens.size());
for (ExtendedBlock b : blocks) {
blocksProtos.add(PBHelper.convert(b));
}
for (Token<BlockTokenIdentifier> t : tokens) {
tokensProtos.add(PBHelper.convert(t));
}
// Build the request
GetHdfsBlockLocationsRequestProto request =
GetHdfsBlockLocationsRequestProto.newBuilder()
.addAllBlocks(blocksProtos)
.addAllTokens(tokensProtos)
.build();
// Send the RPC
GetHdfsBlockLocationsResponseProto response;
try {
response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
// List of volumes in the response
List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
for (ByteString bs : volumeIdsByteStrings) {
volumeIds.add(bs.toByteArray());
}
// Array of indexes into the list of volumes, one per block
List<Integer> volumeIndexes = response.getVolumeIndexesList();
// Parsed HdfsVolumeId values, one per block
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
volumeIds, volumeIndexes);
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:40,代码来源:ClientDatanodeProtocolTranslatorPB.java
示例11: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException {
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < volumes.volumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes
for (int i = 0; i < blockIds.length; i++) {
long blockId = blockIds[i];
boolean isValid = false;
ReplicaInfo info = volumeMap.get(poolId, blockId);
int volumeIndex = 0;
if (info != null) {
FsVolumeSpi blockVolume = info.getVolume();
for (FsVolumeImpl volume : volumes.volumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
break;
}
volumeIndex++;
}
}
// Indicates that the block is not present, or not found in a data dir
if (!isValid) {
volumeIndex = Integer.MAX_VALUE;
}
blocksVolumeIndexes.add(volumeIndex);
}
return new HdfsBlocksMetadata(poolId, blockIds,
blocksVolumeIds, blocksVolumeIndexes);
}
开发者ID:yncxcw,项目名称:FlexMap,代码行数:41,代码来源:FsDatasetImpl.java
示例12: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException {
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds =
new ArrayList<>(volumes.volumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<>(blocks.size());
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < volumes.volumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes
for (ExtendedBlock block : blocks) {
FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
boolean isValid = false;
int volumeIndex = 0;
for (FsVolumeImpl volume : volumes.volumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
break;
}
volumeIndex++;
}
// Indicates that the block is not present, or not found in a data dir
if (!isValid) {
volumeIndex = Integer.MAX_VALUE;
}
blocksVolumeIndexes.add(volumeIndex);
}
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}),
blocksVolumeIds, blocksVolumeIndexes);
}
开发者ID:hopshadoop,项目名称:hops,代码行数:37,代码来源:FsDatasetImpl.java
示例13: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
List<Token<BlockTokenIdentifier>> tokens) throws IOException {
// Convert to proto objects
List<ExtendedBlockProto> blocksProtos =
new ArrayList<>(blocks.size());
List<TokenProto> tokensProtos = new ArrayList<>(tokens.size());
for (ExtendedBlock b : blocks) {
blocksProtos.add(PBHelper.convert(b));
}
for (Token<BlockTokenIdentifier> t : tokens) {
tokensProtos.add(PBHelper.convert(t));
}
// Build the request
GetHdfsBlockLocationsRequestProto request =
GetHdfsBlockLocationsRequestProto.newBuilder()
.addAllBlocks(blocksProtos).addAllTokens(tokensProtos).build();
// Send the RPC
GetHdfsBlockLocationsResponseProto response;
try {
response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
// List of volumes in the response
List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
List<byte[]> volumeIds = new ArrayList<>(volumeIdsByteStrings.size());
for (ByteString bs : volumeIdsByteStrings) {
volumeIds.add(bs.toByteArray());
}
// Array of indexes into the list of volumes, one per block
List<Integer> volumeIndexes = response.getVolumeIndexesList();
// Parsed HdfsVolumeId values, one per block
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}),
volumeIds, volumeIndexes);
}
开发者ID:hopshadoop,项目名称:hops,代码行数:37,代码来源:ClientDatanodeProtocolTranslatorPB.java
示例14: getBlockStorageLocations
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
/**
* Get block location information about a list of {@link HdfsBlockLocation}.
* Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
* get {@link BlockStorageLocation}s for blocks returned by
* {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
* .
*
* This is done by making a round of RPCs to the associated datanodes, asking
* the volume of each block replica. The returned array of
* {@link BlockStorageLocation} expose this information as a
* {@link VolumeId}.
*
* @param blockLocations
* target blocks on which to query volume location information
* @return volumeBlockLocations original block array augmented with additional
* volume location information for each replica.
*/
public BlockStorageLocation[] getBlockStorageLocations(
List<BlockLocation> blockLocations) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException {
if (!getConf().getHdfsBlocksMetadataEnabled) {
throw new UnsupportedOperationException("Datanode-side support for " +
"getVolumeBlockLocations() must also be enabled in the client " +
"configuration.");
}
// Downcast blockLocations and fetch out required LocatedBlock(s)
List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
for (BlockLocation loc : blockLocations) {
if (!(loc instanceof HdfsBlockLocation)) {
throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
"expected to be passed HdfsBlockLocations");
}
HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
blocks.add(hdfsLoc.getLocatedBlock());
}
// Re-group the LocatedBlocks to be grouped by datanodes, with the values
// a list of the LocatedBlocks on the datanode.
Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks =
new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
for (LocatedBlock b : blocks) {
for (DatanodeInfo info : b.getLocations()) {
if (!datanodeBlocks.containsKey(info)) {
datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
}
List<LocatedBlock> l = datanodeBlocks.get(info);
l.add(b);
}
}
// Make RPCs to the datanodes to get volume locations for its replicas
TraceScope scope =
Trace.startSpan("getBlockStorageLocations", traceSampler);
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
try {
metadatas = BlockStorageLocationUtil.
queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
getConf().getFileBlockStorageLocationsNumThreads,
getConf().getFileBlockStorageLocationsTimeoutMs,
getConf().connectToDnViaHostname);
if (LOG.isTraceEnabled()) {
LOG.trace("metadata returned: "
+ Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
}
} finally {
scope.close();
}
// Regroup the returned VolumeId metadata to again be grouped by
// LocatedBlock rather than by datanode
Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
.associateVolumeIdsWithBlocks(blocks, metadatas);
// Combine original BlockLocations with new VolumeId information
BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
.convertToVolumeBlockLocations(blocks, blockVolumeIds);
return volumeBlockLocations;
}
开发者ID:naver,项目名称:hadoop,代码行数:80,代码来源:DFSClient.java
示例15: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes();
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < curVolumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes
for (int i = 0; i < blockIds.length; i++) {
long blockId = blockIds[i];
boolean isValid = false;
ReplicaInfo info = volumeMap.get(poolId, blockId);
int volumeIndex = 0;
if (info != null) {
FsVolumeSpi blockVolume = info.getVolume();
for (FsVolumeImpl volume : curVolumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
break;
}
volumeIndex++;
}
}
// Indicates that the block is not present, or not found in a data dir
if (!isValid) {
volumeIndex = Integer.MAX_VALUE;
}
blocksVolumeIndexes.add(volumeIndex);
}
return new HdfsBlocksMetadata(poolId, blockIds,
blocksVolumeIds, blocksVolumeIndexes);
}
开发者ID:naver,项目名称:hadoop,代码行数:42,代码来源:FsDatasetImpl.java
示例16: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds)
throws IOException {
throw new UnsupportedOperationException();
}
开发者ID:naver,项目名称:hadoop,代码行数:6,代码来源:SimulatedFSDataset.java
示例17: getHdfsBlocksMetadata
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException {
return new HdfsBlocksMetadata(null, null, null, null);
}
开发者ID:naver,项目名称:hadoop,代码行数:5,代码来源:ExternalDatasetImpl.java
示例18: getBlockStorageLocations
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; //导入依赖的package包/类
/**
* Get block location information about a list of {@link HdfsBlockLocation}.
* Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
* get {@link BlockStorageLocation}s for blocks returned by
* {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
* .
*
* This is done by making a round of RPCs to the associated datanodes, asking
* the volume of each block replica. The returned array of
* {@link BlockStorageLocation} expose this information as a
* {@link VolumeId}.
*
* @param blockLocations
* target blocks on which to query volume location information
* @return volumeBlockLocations original block array augmented with additional
* volume location information for each replica.
*/
public BlockStorageLocation[] getBlockStorageLocations(
List<BlockLocation> blockLocations) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException {
if (!getConf().getHdfsBlocksMetadataEnabled) {
throw new UnsupportedOperationException("Datanode-side support for " +
"getVolumeBlockLocations() must also be enabled in the client " +
"configuration.");
}
// Downcast blockLocations and fetch out required LocatedBlock(s)
List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
for (BlockLocation loc : blockLocations) {
if (!(loc instanceof HdfsBlockLocation)) {
throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
"expected to be passed HdfsBlockLocations");
}
HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
blocks.add(hdfsLoc.getLocatedBlock());
}
// Re-group the LocatedBlocks to be grouped by datanodes, with the values
// a list of the LocatedBlocks on the datanode.
Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks =
new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
for (LocatedBlock b : blocks) {
for (DatanodeInfo info : b.getLocations()) {
if (!datanodeBlocks.containsKey(info)) {
datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
}
List<LocatedBlock> l = datanodeBlocks.get(info);
l.add(b);
}
}
// Make RPCs to the datanodes to get volume locations for its replicas
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
.queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
getConf().getFileBlockStorageLocationsNumThreads,
getConf().getFileBlockStorageLocationsTimeoutMs,
getConf().connectToDnViaHostname);
if (LOG.isTraceEnabled()) {
LOG.trace("metadata returned: "
+ Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
}
// Regroup the returned VolumeId metadata to again be grouped by
// LocatedBlock rather than by datanode
Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
.associateVolumeIdsWithBlocks(blocks, metadatas);
// Combine original BlockLocations with new VolumeId information
BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
.convertToVolumeBlockLocations(blocks, blockVolumeIds);
return volumeBlockLocations;
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:74,代码来源:DFSClient.java
注:本文中的org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论