本文整理汇总了Java中org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas类的典型用法代码示例。如果您正苦于以下问题:Java NumberReplicas类的具体用法?Java NumberReplicas怎么用?Java NumberReplicas使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
NumberReplicas类属于org.apache.hadoop.hdfs.server.namenode.FSNamesystem包,在下文中一共展示了NumberReplicas类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: waitForExcessReplicasToChange
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
private void waitForExcessReplicasToChange(
FSNamesystem namesystem,
Block block,
int newReplicas) throws Exception
{
NumberReplicas num;
long startChecking = System.currentTimeMillis();
do {
LOG.info("Waiting for a replica to become excess");
namesystem.readLock();
try {
num = namesystem.countNodes(block);
} finally {
namesystem.readUnlock();
}
Thread.sleep(100);
if (System.currentTimeMillis() - startChecking > 30000) {
fail("Timed out waiting for excess replicas to change");
}
} while (num.excessReplicas() != newReplicas);
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:23,代码来源:TestUnderReplicatedBlocks.java
示例2: waitForExcessReplicasToBeDeleted
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
private void waitForExcessReplicasToBeDeleted(FSNamesystem namesystem,
Block block, DataNode dn) throws Exception {
NumberReplicas num;
long startChecking = System.currentTimeMillis();
do {
LOG.info("Waiting for the excess replica to be deleted");
dn.scheduleNSBlockReceivedAndDeleted(0);
namesystem.readLock();
try {
num = namesystem.countNodes(block);
} finally {
namesystem.readUnlock();
}
Thread.sleep(100);
if (System.currentTimeMillis() - startChecking > 30000) {
fail("Timed out waiting for excess replicas to be deleted");
}
} while (num.excessReplicas() != 0);
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:21,代码来源:TestUnderReplicatedBlocks.java
示例3: waitForExcessReplicasToChange
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
private void waitForExcessReplicasToChange(
FSNamesystem namesystem,
Block block,
int newReplicas) throws Exception
{
NumberReplicas num;
long startChecking = System.currentTimeMillis();
do {
namesystem.readLock();
try {
num = namesystem.countNodes(block);
LOG.info("We have " + num.excessReplicas() + " excess replica");
} finally {
namesystem.readUnlock();
}
Thread.sleep(100);
if (System.currentTimeMillis() - startChecking > 30000) {
namesystem.metaSave("TestNodeCount.meta");
LOG.warn("Dumping meta into log directory");
fail("Timed out waiting for excess replicas to change");
}
} while (num.excessReplicas() != newReplicas);
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:25,代码来源:TestNodeCount.java
示例4: processPendingReplications
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
/**
* If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue
*/
void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) {
namesystem.writeLock();
try {
for (int i = 0; i < timedOutItems.length; i++) {
NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
num.liveReplicas())) {
neededReplications.add(timedOutItems[i],
num.liveReplicas(),
num.decommissionedReplicas(),
getReplication(timedOutItems[i]));
}
}
} finally {
namesystem.writeUnlock();
}
/* If we know the target datanodes where the replication timedout,
* we could invoke decBlocksScheduled() on it. Its ok for now.
*/
}
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:28,代码来源:BlockManager.java
示例5: countNodes
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
/**
* Return the number of nodes that are live and decommissioned.
*/
NumberReplicas countNodes(Block b) {
int count = 0;
int live = 0;
int corrupt = 0;
int excess = 0;
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
count++;
} else {
Collection<Block> blocksExcess =
excessReplicateMap.get(node.getStorageID());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
live++;
}
}
}
return new NumberReplicas(live, count, corrupt, excess);
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:29,代码来源:BlockManager.java
示例6: logBlockReplicationInfo
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
NumberReplicas num) {
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
INode fileINode = blocksMap.getINode(block);
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
StringBuilder nodeList = new StringBuilder();
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
nodeList.append(node.name);
nodeList.append(" ");
}
FSNamesystem.LOG.info("Block: " + block + ", Expected Replicas: "
+ curExpectedReplicas + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + fileINode.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode.name + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress());
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:23,代码来源:BlockManager.java
示例7: updateNeededReplications
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
void updateNeededReplications(Block block, int curReplicasDelta,
int expectedReplicasDelta) {
namesystem.writeLock();
try {
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl
.decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
oldExpectedReplicas);
}
} finally {
namesystem.writeUnlock();
}
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:21,代码来源:BlockManager.java
示例8: waitForExcessReplicasToChange
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
private void waitForExcessReplicasToChange(
FSNamesystem namesystem,
Block block,
int newReplicas) throws Exception
{
NumberReplicas num;
long startChecking = System.currentTimeMillis();
do {
namesystem.readLock();
try {
num = namesystem.countNodes(block);
} finally {
namesystem.readUnlock();
}
Thread.sleep(100);
if (System.currentTimeMillis() - startChecking > 30000) {
fail("Timed out waiting for excess replicas to change");
}
} while (num.excessReplicas() != newReplicas);
}
开发者ID:iVCE,项目名称:RDFS,代码行数:22,代码来源:TestUnderReplicatedBlocks.java
示例9: processMisReplicatedBlocks
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
/**
* For each block in the name-node verify whether it belongs to any file,
* over or under replicated. Place it into the respective queue.
*/
void processMisReplicatedBlocks() {
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
namesystem.writeLock();
try {
neededReplications.clear();
for (BlockInfo block : blocksMap.getBlocks()) {
INodeFile fileINode = block.getINode();
if (fileINode == null) {
// block does not belong to any file
nrInvalid++;
addToInvalidates(block);
continue;
}
// calculate current replication
short expectedReplication = fileINode.getReplication();
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num
.decommissionedReplicas(), expectedReplication)) {
nrUnderReplicated++;
}
}
if (numCurrentReplica > expectedReplication) {
// over-replicated block
nrOverReplicated++;
processOverReplicatedBlock(block, expectedReplication, null, null);
}
}
} finally {
namesystem.writeUnlock();
}
FSNamesystem.LOG.info("Total number of blocks = " + blocksMap.size());
FSNamesystem.LOG.info("Number of invalid blocks = " + nrInvalid);
FSNamesystem.LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
FSNamesystem.LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:44,代码来源:BlockManager.java
示例10: checkReplication
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
void checkReplication(Block block, int numExpectedReplicas) {
// filter out containingNodes that are marked for decommission.
NumberReplicas number = countNodes(block);
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
neededReplications.add(block,
number.liveReplicas(),
number.decommissionedReplicas,
numExpectedReplicas);
}
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:11,代码来源:BlockManager.java
示例11: testUnderReplicationWithDecommissionDataNode
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
public void testUnderReplicationWithDecommissionDataNode() throws Exception {
final Configuration conf = new Configuration();
final short REPLICATION_FACTOR = (short)1;
File f = new File(HOST_FILE_PATH);
if (f.exists()) {
f.delete();
}
f.createNewFile();
conf.set("dfs.hosts.exclude", HOST_FILE_PATH);
LOG.info("Start the cluster");
final MiniDFSCluster cluster =
new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
try {
final FSNamesystem namesystem = cluster.getNameNode().namesystem;
final FileSystem fs = cluster.getFileSystem();
DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
namesystem.heartbeats.toArray(
new DatanodeDescriptor[REPLICATION_FACTOR]);
assertEquals(1, datanodes.length);
// populate the cluster with a one block file
final Path FILE_PATH = new Path("/testfile2");
DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
// shutdown the datanode
DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
assertEquals(1, namesystem.getMissingBlocksCount()); // one missing block
assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());
// Make the only datanode to be decommissioned
LOG.info("Decommission the datanode " + dnprop);
addToExcludeFile(namesystem.getConf(), datanodes);
namesystem.refreshNodes(namesystem.getConf());
// bring up the datanode
cluster.restartDataNode(dnprop);
// Wait for block report
LOG.info("wait for its block report to come in");
NumberReplicas num;
long startTime = System.currentTimeMillis();
do {
namesystem.readLock();
try {
num = namesystem.countNodes(block);
} finally {
namesystem.readUnlock();
}
Thread.sleep(1000);
LOG.info("live: " + num.liveReplicas()
+ "Decom: " + num.decommissionedReplicas());
} while (num.decommissionedReplicas() != 1 &&
System.currentTimeMillis() - startTime < 30000);
assertEquals("Decommissioning Replicas doesn't reach 1",
1, num.decommissionedReplicas());
assertEquals(1, namesystem.getNonCorruptUnderReplicatedBlocks());
assertEquals(0, namesystem.getMissingBlocksCount());
} finally {
cluster.shutdown();
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:63,代码来源:TestUnderReplicatedBlocks.java
示例12: testRaidMissingBlocksByTakingDownDataNode
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
/**
* Take down a datanode to generate raid missing blocks, and then bring it back
* will restore the missing blocks.
*/
@Test
public void testRaidMissingBlocksByTakingDownDataNode() throws IOException, InterruptedException {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
try {
cluster = new MiniDFSCluster(conf, 1, true, null);
final FSNamesystem namesystem = cluster.getNameNode().namesystem;
final DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem());
String filePath = "/test/file1";
RaidCodec rsCodec = RaidCodec.getCodec("rs");
RaidDFSUtil.constructFakeRaidFile(dfs, filePath, rsCodec);
DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
namesystem.heartbeats.toArray(
new DatanodeDescriptor[1]);
assertEquals(1, datanodes.length);
// shutdown the datanode
DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
assertEquals(rsCodec.numStripeBlocks, namesystem.getRaidMissingBlocksCount());
assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block
assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());
// bring up the datanode
cluster.restartDataNode(dnprop);
// Wait for block report
LOG.info("wait for its block report to come in");
NumberReplicas num;
FileStatus stat = dfs.getFileStatus(new Path(filePath));
LocatedBlocks blocks = dfs.getClient().
getLocatedBlocks(filePath, 0, stat.getLen());
long startTime = System.currentTimeMillis();
do {
Thread.sleep(1000);
int totalCount = 0;
namesystem.readLock();
try {
for (LocatedBlock block : blocks.getLocatedBlocks()) {
num = namesystem.countNodes(block.getBlock());
totalCount += num.liveReplicas();
}
if (totalCount == rsCodec.numDataBlocks) {
break;
} else {
LOG.info("wait for block report, received total replicas: " + totalCount);
}
} finally {
namesystem.readUnlock();
}
} while (System.currentTimeMillis() - startTime < 30000);
assertEquals(0, namesystem.getRaidMissingBlocksCount());
assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block
assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:66,代码来源:TestRaidMissingBlocksQueue.java
示例13: testInvalidateMultipleReplicas
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
public void testInvalidateMultipleReplicas() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster =
new MiniDFSCluster(conf, 5, true, null);
final int FILE_LEN = 123;
final String pathStr = "/testInvalidateMultipleReplicas";
try {
FileSystem fs = cluster.getFileSystem();
Path path = new Path(pathStr);
cluster.waitActive();
// create a small file on 3 nodes
DFSTestUtil.createFile(fs, path, 123, (short)3, 0);
DFSTestUtil.waitReplication(fs, path, (short)3);
NameNode nn = cluster.getNameNode();
LocatedBlocks located = nn.getBlockLocations(pathStr, 0, FILE_LEN);
// Get the original block locations
List<LocatedBlock> blocks = located.getLocatedBlocks();
LocatedBlock firstBlock = blocks.get(0);
DatanodeInfo[] locations = firstBlock.getLocations();
assertEquals("Should have 3 good blocks", 3, locations.length);
nn.getNamesystem().stallReplicationWork();
DatanodeInfo[] badLocations = new DatanodeInfo[2];
badLocations[0] = locations[0];
badLocations[1] = locations[1];
// Report some blocks corrupt
LocatedBlock badLBlock = new LocatedBlock(
firstBlock.getBlock(), badLocations);
nn.reportBadBlocks(new LocatedBlock[] {badLBlock});
nn.getNamesystem().restartReplicationWork();
DFSTestUtil.waitReplication(fs, path, (short)3);
NumberReplicas num = nn.getNamesystem().countNodes(
firstBlock.getBlock());
assertEquals(0, num.corruptReplicas());
} finally {
cluster.shutdown();
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:46,代码来源:TestNodeCount.java
示例14: metaSave
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
void metaSave(PrintWriter out) {
//
// Dump contents of neededReplication
//
synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
for (Block block : neededReplications) {
List<DatanodeDescriptor> containingNodes =
new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
chooseSourceDatanode(block, containingNodes, numReplicas);
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
if (block instanceof BlockInfo) {
String fileName = ((BlockInfo)block).getINode().getFullPathName();
out.print(fileName + ": ");
}
// l: == live:, d: == decommissioned c: == corrupt e: == excess
out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
" l: " + numReplicas.liveReplicas() +
" d: " + numReplicas.decommissionedReplicas() +
" c: " + numReplicas.corruptReplicas() +
" e: " + numReplicas.excessReplicas() + ") ");
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block);
for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
jt.hasNext();) {
DatanodeDescriptor node = jt.next();
String state = "";
if (corruptNodes != null && corruptNodes.contains(node)) {
state = "(corrupt)";
} else if (node.isDecommissioned() ||
node.isDecommissionInProgress()) {
state = "(decommissioned)";
}
out.print(" " + node + state + " : ");
}
out.println("");
}
}
//
// Dump blocks from pendingReplication
//
pendingReplications.metaSave(out);
//
// Dump blocks that are waiting to be deleted
//
dumpRecentInvalidateSets(out);
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:58,代码来源:BlockManager.java
示例15: chooseSourceDatanode
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
/**
* Parse the data-nodes the block belongs to and choose one,
* which will be the replication source.
*
* We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
* since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source.
* Otherwise we choose a random node among those that did not reach their
* replication limit.
*
* In addition form a list of all nodes containing the block
* and calculate its replication numbers.
*/
private DatanodeDescriptor chooseSourceDatanode(
Block block,
List<DatanodeDescriptor> containingNodes,
NumberReplicas numReplicas) {
containingNodes.clear();
DatanodeDescriptor srcNode = null;
int live = 0;
int decommissioned = 0;
int corrupt = 0;
int excess = 0;
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
while(it.hasNext()) {
DatanodeDescriptor node = it.next();
Collection<Block> excessBlocks =
excessReplicateMap.get(node.getStorageID());
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt++;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned++;
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++;
} else {
live++;
}
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue;
if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
continue; // already reached replication limit
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;
// never use already decommissioned nodes
if(node.isDecommissioned())
continue;
// we prefer nodes that are in DECOMMISSION_INPROGRESS state
if(node.isDecommissionInProgress() || srcNode == null) {
srcNode = node;
continue;
}
if(srcNode.isDecommissionInProgress())
continue;
// switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
if(r.nextBoolean())
srcNode = node;
}
if(numReplicas != null)
numReplicas.initialize(live, decommissioned, corrupt, excess);
return srcNode;
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:69,代码来源:BlockManager.java
示例16: isReplicationInProgress
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
/**
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
*/
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0;
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
final Block block = it.next();
INode fileINode = blocksMap.getINode(block);
if (fileINode != null) {
NumberReplicas num = countNodes(block);
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
if (curExpectedReplicas > curReplicas) {
//Log info about one block for this node which needs replication
if (!status) {
status = true;
logBlockReplicationInfo(block, srcNode, num);
}
underReplicatedBlocks++;
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++;
}
if (fileINode.isUnderConstruction()) {
underReplicatedInOpenFiles++;
}
}
if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0) {
//
// These blocks have been reported from the datanode
// after the startDecommission method has been executed. These
// blocks were in flight when the decommissioning was started.
//
neededReplications.add(block,
curReplicas,
num.decommissionedReplicas(),
curExpectedReplicas);
}
}
}
}
srcNode.decommissioningStatus.set(underReplicatedBlocks,
decommissionOnlyReplicas,
underReplicatedInOpenFiles);
return status;
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:54,代码来源:BlockManager.java
示例17: testUnderReplicationWithDecommissionDataNode
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; //导入依赖的package包/类
public void testUnderReplicationWithDecommissionDataNode() throws Exception {
final Configuration conf = new Configuration();
final short REPLICATION_FACTOR = (short)1;
File f = new File(HOST_FILE_PATH);
if (f.exists()) {
f.delete();
}
conf.set("dfs.hosts.exclude", HOST_FILE_PATH);
LOG.info("Start the cluster");
final MiniDFSCluster cluster =
new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
try {
final FSNamesystem namesystem = cluster.getNameNode().namesystem;
final FileSystem fs = cluster.getFileSystem();
DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
namesystem.heartbeats.toArray(
new DatanodeDescriptor[REPLICATION_FACTOR]);
assertEquals(1, datanodes.length);
// populate the cluster with a one block file
final Path FILE_PATH = new Path("/testfile2");
DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
// shutdown the datanode
DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
assertEquals(1, namesystem.getMissingBlocksCount()); // one missing block
assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());
// Make the only datanode to be decommissioned
LOG.info("Decommission the datanode " + dnprop);
addToExcludeFile(namesystem.getConf(), datanodes);
namesystem.refreshNodes(namesystem.getConf());
// bring up the datanode
cluster.restartDataNode(dnprop);
// Wait for block report
LOG.info("wait for its block report to come in");
NumberReplicas num;
long startTime = System.currentTimeMillis();
do {
namesystem.readLock();
try {
num = namesystem.countNodes(block);
} finally {
namesystem.readUnlock();
}
Thread.sleep(1000);
LOG.info("live: " + num.liveReplicas()
+ "Decom: " + num.decommissionedReplicas());
} while (num.decommissionedReplicas() != 1 &&
System.currentTimeMillis() - startTime < 30000);
assertEquals("Decommissioning Replicas doesn't reach 1",
1, num.decommissionedReplicas());
assertEquals(1, namesystem.getNonCorruptUnderReplicatedBlocks());
assertEquals(0, namesystem.getMissingBlocksCount());
} finally {
cluster.shutdown();
}
}
开发者ID:iVCE,项目名称:RDFS,代码行数:62,代码来源:TestUnderReplicatedBlocks.java
注:本文中的org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论