本文整理汇总了Java中org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream类的典型用法代码示例。如果您正苦于以下问题:Java DFSDataInputStream类的具体用法?Java DFSDataInputStream怎么用?Java DFSDataInputStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
DFSDataInputStream类属于org.apache.hadoop.hdfs.DFSClient包,在下文中一共展示了DFSDataInputStream类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testRead
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public void testRead() throws Exception{
for(int i = 0; i < TEST_FILE_NUM; ++i) {
String file = "/tmp" + i +".txt";
DFSTestUtil.createFile(fs, new Path(file), FILE_LEN, (short)5, 1L);
DFSDataInputStream in = (DFSDataInputStream)fs.open(new Path(file));
int numOfRead = 0;
while(in.read() > 0){
numOfRead ++;
}
assertEquals(FILE_LEN * (i+1),
metrics.readSize.getCurrentIntervalValue());
assertEquals(numOfRead * (i+1),
metrics.readOps.getCurrentIntervalValue());
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:17,代码来源:TestDFSClientMetrics.java
示例2: runPreadTest
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public void runPreadTest(Configuration conf) throws Exception {
DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(conf);
String fileName = "/test";
Path p = new Path(fileName);
for (int pri = 0; pri < 8; pri++) {
createFile(p, pri);
ioprioClass = ioprioData = 0;
DFSDataInputStream in = (DFSDataInputStream) fs.open(p);
byte[] buffer = new byte[BLOCK_SIZE * 2];
ReadOptions options = new ReadOptions();
options.setIoprio(NativeIO.IOPRIO_CLASS_BE, pri);
in.read(BLOCK_SIZE / 2, buffer, 0, BLOCK_SIZE / 2, options);
if (NativeIO.isAvailable()) {
assertTrue(NativeIO.isIoprioPossible());
assertEquals(NativeIO.IOPRIO_CLASS_BE, ioprioClass);
assertEquals(pri, ioprioData);
}
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:23,代码来源:TestIoprio.java
示例3: collectSrcBlocksChecksum
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public void collectSrcBlocksChecksum(ChecksumStore ckmStore)
throws IOException {
if (ckmStore == null) {
return;
}
LOG.info("Store the checksums of source blocks into checksumStore");
for (int i = 0; i < streams.length; i++) {
if (streams[i] != null &&
streams[i] instanceof DFSDataInputStream &&
!(streams[i] instanceof RaidUtils.ZeroInputStream)) {
DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
Long newVal = checksums[i].getValue();
ckmStore.putIfAbsentChecksum(stream.getCurrentBlock(), newVal);
}
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:17,代码来源:ParallelStreamReader.java
示例4: checkFile
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
static void checkFile(Path p, int expectedsize, final Configuration conf
) throws IOException, InterruptedException {
//open the file with another user account
final String username = UserGroupInformation.getCurrentUser().getShortUserName()
+ "_" + ++userCount;
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(username,
new String[] {"supergroup"});
final FileSystem fs = DFSTestUtil.getFileSystemAs(ugi, conf);
final DFSDataInputStream in = (DFSDataInputStream)fs.open(p);
//Check visible length
Assert.assertTrue(in.getVisibleLength() >= expectedsize);
//Able to read?
for(int i = 0; i < expectedsize; i++) {
Assert.assertEquals((byte)i, (byte)in.read());
}
in.close();
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:24,代码来源:TestReadWhileWriting.java
示例5: readTillEnd
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK,
long endOffset, int toRead)
throws IOException {
int numRead = 0;
while (numRead < toRead) {
int readLen = toRead - numRead;
if (in instanceof DFSDataInputStream) {
int available = (int)(endOffset - ((DFSDataInputStream)in).getPos());
if (available < readLen) {
readLen = available;
}
}
int nread = readLen > 0? in.read(buf, numRead, readLen): 0;
if (nread < 0) {
if (eofOK) {
// EOF hit, fill with zeros
Arrays.fill(buf, numRead, toRead, (byte)0);
break;
} else {
// EOF hit, throw.
throw new IOException("Premature EOF");
}
} else if (nread == 0) {
// reach endOffset, fill with zero;
Arrays.fill(buf, numRead, toRead, (byte)0);
break;
} else {
numRead += nread;
}
}
// return 0 if we read a ZeroInputStream
if (in instanceof ZeroInputStream) {
return 0;
}
return numRead;
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:38,代码来源:RaidUtils.java
示例6: ParallelStreamReader
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
/**
* Reads data from multiple streams in parallel and puts the data in a queue.
*
* @param streams
* The input streams to read from.
* @param bufSize
* The amount of data to read from each stream in each go.
* @param numThreads
* Number of threads to use for parallelism.
* @param boundedBuffer
* The queue to place the results in.
*/
public ParallelStreamReader(Progressable reporter, InputStream[] streams,
int bufSize, int numThreads, int boundedBufferCapacity,
long maxBytesPerStream) throws IOException {
this.reporter = reporter;
this.streams = new InputStream[streams.length];
this.endOffsets = new long[streams.length];
for (int i = 0; i < streams.length; i++) {
this.streams[i] = streams[i];
if (this.streams[i] instanceof DFSDataInputStream) {
DFSDataInputStream stream = (DFSDataInputStream) this.streams[i];
// in directory raiding, the block size for each input stream
// might be different, so we need to determine the endOffset of
// each stream by their own block size.
List<LocatedBlock> blocks = stream.getAllBlocks();
if (blocks.size() == 0) {
this.endOffsets[i] = Long.MAX_VALUE;
} else {
this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize();
}
} else {
this.endOffsets[i] = Long.MAX_VALUE;
}
streams[i] = null; // Take over ownership of streams.
}
this.bufSize = bufSize;
this.boundedBuffer =
new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
if (numThreads > streams.length) {
this.numThreads = streams.length;
} else {
this.numThreads = numThreads;
}
this.remainingBytesPerStream = maxBytesPerStream;
this.slots = new Semaphore(this.numThreads);
this.readPool = Executors.newFixedThreadPool(this.numThreads);
this.mainThread = new MainThread();
}
开发者ID:hopshadoop,项目名称:hops,代码行数:50,代码来源:ParallelStreamReader.java
示例7: readTillEnd
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK,
long endOffset, int toRead) throws IOException {
int numRead = 0;
while (numRead < toRead) {
int readLen = toRead - numRead;
if (in instanceof DFSDataInputStream) {
int available = (int) (endOffset - ((DFSDataInputStream) in).getPos());
if (available < readLen) {
readLen = available;
}
}
int nread = readLen > 0 ? in.read(buf, numRead, readLen) : 0;
if (nread < 0) {
if (eofOK) {
// EOF hit, fill with zeros
Arrays.fill(buf, numRead, toRead, (byte) 0);
break;
} else {
// EOF hit, throw.
throw new IOException("Premature EOF");
}
} else if (nread == 0) {
// reach endOffset, fill with zero;
Arrays.fill(buf, numRead, toRead, (byte) 0);
break;
} else {
numRead += nread;
}
}
// return 0 if we read a ZeroInputStream
if (in instanceof ZeroInputStream) {
return 0;
}
return numRead;
}
开发者ID:hopshadoop,项目名称:hops,代码行数:37,代码来源:RaidUtils.java
示例8: checkFileContentDirect
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
/**
* Verifies that reading a file with the direct read(ByteBuffer) api gives the
* expected set of bytes.
*/
static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
int readOffset) throws IOException {
DFSDataInputStream stm = (DFSDataInputStream) fs.open(name);
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
IOUtils.skipFully(stm, readOffset);
actual.limit(3);
//Read a small number of bytes first.
int nread = stm.read(actual);
actual.limit(nread + 2);
nread += stm.read(actual);
// Read across chunk boundary
actual.limit(Math.min(actual.capacity(), nread + 517));
nread += stm.read(actual);
checkData(arrayFromByteBuffer(actual), readOffset, expected, nread,
"A few bytes");
//Now read rest of it
actual.limit(actual.capacity());
while (actual.hasRemaining()) {
int nbytes = stm.read(actual);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
}
nread += nbytes;
}
checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
stm.close();
}
开发者ID:hopshadoop,项目名称:hops,代码行数:38,代码来源:TestShortCircuitLocalRead.java
示例9: open
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
@SuppressWarnings("deprecation")
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
statistics.incrementReadOps(1);
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:8,代码来源:DistributedFileSystem.java
示例10: getFirstBlock
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
DFSDataInputStream in =
(DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
in.readByte();
return in.getCurrentBlock();
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:7,代码来源:DFSTestUtil.java
示例11: testUnfavoredNodes
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public void testUnfavoredNodes() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean("dfs.client.block.location.renewal.enabled", false);
MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
try {
FileSystem fs = cluster.getFileSystem();
DistributedFileSystem dfs = DFSUtil.convertToDFS(fs);
TestCase.assertNotNull(dfs);
Path path = new Path("/testUnfavoredNodes");
FSDataOutputStream stm = fs
.create(path, true, 4096, (short) 2, (long) 2048);
stm.write(new byte[4096]);
stm.close();
FSDataInputStream is = fs.open(path);
DFSDataInputStream dis = (DFSDataInputStream) is;
TestCase.assertNotNull(dis);
is.read(new byte[1024]);
DatanodeInfo currentDn1 = dis.getCurrentDatanode();
dis.setUnfavoredNodes(Arrays.asList(new DatanodeInfo[] { currentDn1 }));
is.read(new byte[512]);
DatanodeInfo currentDn2 = dis.getCurrentDatanode();
TestCase.assertTrue(!currentDn2.equals(currentDn1));
dis.setUnfavoredNodes(Arrays.asList(new DatanodeInfo[] { currentDn1, currentDn2 }));
is.read(new byte[512]);
TestCase.assertEquals(currentDn1, dis.getCurrentDatanode());
is.read(new byte[1024]);
TestCase.assertEquals(dis.getAllBlocks().get(1).getLocations()[0],
dis.getCurrentDatanode());
}
finally {
if (cluster != null) {cluster.shutdown();}
}
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:42,代码来源:TestDistributedFileSystem.java
示例12: ParallelStreamReader
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
/**
* Reads data from multiple streams in parallel and puts the data in a queue.
* @param streams The input streams to read from.
* @param bufSize The amount of data to read from each stream in each go.
* @param numThreads Number of threads to use for parallelism.
* @param boundedBuffer The queue to place the results in.
*/
public ParallelStreamReader(
Progressable reporter,
InputStream[] streams,
int bufSize,
int numThreads,
int boundedBufferCapacity,
long maxBytesPerStream,
boolean computeChecksum,
OutputStream[] outs) throws IOException {
this.reporter = reporter;
this.computeChecksum = computeChecksum;
this.streams = new InputStream[streams.length];
this.endOffsets = new long[streams.length];
if (computeChecksum) {
this.checksums = new CRC32[streams.length];
}
this.outs = outs;
for (int i = 0; i < streams.length; i++) {
this.streams[i] = streams[i];
if (this.streams[i] instanceof DFSDataInputStream) {
DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
// in directory raiding, the block size for each input stream
// might be different, so we need to determine the endOffset of
// each stream by their own block size.
List<LocatedBlock> blocks = stream.getAllBlocks();
if (blocks.size() == 0) {
this.endOffsets[i] = Long.MAX_VALUE;
if (computeChecksum) {
this.checksums[i] = null;
}
} else {
long blockSize = blocks.get(0).getBlockSize();
this.endOffsets[i] = stream.getPos() + blockSize;
if (computeChecksum) {
this.checksums[i] = new CRC32();
}
}
} else {
this.endOffsets[i] = Long.MAX_VALUE;
if (computeChecksum) {
this.checksums[i] = null;
}
}
streams[i] = null; // Take over ownership of streams.
}
this.bufSize = bufSize;
this.boundedBuffer =
new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
if (numThreads > streams.length) {
this.numThreads = streams.length;
} else {
this.numThreads = numThreads;
}
this.remainingBytesPerStream = maxBytesPerStream;
this.slots = new Semaphore(this.numThreads);
ThreadFactory ParallelStreamReaderFactory = new ThreadFactoryBuilder()
.setNameFormat("ParallelStreamReader-read-pool-%d")
.build();
this.readPool = Executors.newFixedThreadPool(this.numThreads, ParallelStreamReaderFactory);
this.mainThread = new MainThread();
mainThread.setName("ParallelStreamReader-main");
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:71,代码来源:ParallelStreamReader.java
示例13: getAllBlocks
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
throws IOException {
return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
}
开发者ID:Seagate,项目名称:hadoop-on-lustre,代码行数:5,代码来源:DFSTestUtil.java
示例14: reportChecksumFailure
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
/**
* We need to find the blocks that didn't match. Likely only one
* is corrupt but we will report both to the namenode. In the future,
* we can consider figuring out exactly which block is corrupt.
*/
public boolean reportChecksumFailure(Path f,
FSDataInputStream in, long inPos,
FSDataInputStream sums, long sumsPos) {
if(!(in instanceof DFSDataInputStream && sums instanceof DFSDataInputStream))
throw new IllegalArgumentException("Input streams must be types " +
"of DFSDataInputStream");
LocatedBlock lblocks[] = new LocatedBlock[2];
// Find block in data stream.
DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
Block dataBlock = dfsIn.getCurrentBlock();
if (dataBlock == null) {
LOG.error("Error: Current block in data stream is null! ");
return false;
}
DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
lblocks[0] = new LocatedBlock(dataBlock, dataNode);
LOG.info("Found checksum error in data stream at block="
+ dataBlock + " on datanode="
+ dataNode[0].getName());
// Find block in checksum stream
DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
Block sumsBlock = dfsSums.getCurrentBlock();
if (sumsBlock == null) {
LOG.error("Error: Current block in checksum stream is null! ");
return false;
}
DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
LOG.info("Found checksum error in checksum stream at block="
+ sumsBlock + " on datanode="
+ sumsNode[0].getName());
// Ask client to delete blocks.
dfs.reportChecksumFailure(f.toString(), lblocks);
return true;
}
开发者ID:cumulusyebl,项目名称:cumulus,代码行数:47,代码来源:DistributedFileSystem.java
示例15: ParallelStreamReader
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; //导入依赖的package包/类
/**
* Reads data from multiple streams in parallel and puts the data in a queue.
* @param streams The input streams to read from.
* @param bufSize The amount of data to read from each stream in each go.
* @param numThreads Number of threads to use for parallelism.
* @param boundedBuffer The queue to place the results in.
*/
public ParallelStreamReader(
Progressable reporter,
InputStream[] streams,
int bufSize,
int numThreads,
int boundedBufferCapacity,
long maxBytesPerStream) throws IOException {
this.reporter = reporter;
this.streams = new InputStream[streams.length];
this.endOffsets = new long[streams.length];
for (int i = 0; i < streams.length; i++) {
this.streams[i] = streams[i];
if (this.streams[i] instanceof DFSDataInputStream) {
DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
// in directory raiding, the block size for each input stream
// might be different, so we need to determine the endOffset of
// each stream by their own block size.
List<LocatedBlock> blocks = stream.getAllBlocks();
if (blocks.size() == 0) {
this.endOffsets[i] = Long.MAX_VALUE;
} else {
this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize();
}
} else {
this.endOffsets[i] = Long.MAX_VALUE;
}
streams[i] = null; // Take over ownership of streams.
}
this.bufSize = bufSize;
this.boundedBuffer =
new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
if (numThreads > streams.length) {
this.numThreads = streams.length;
} else {
this.numThreads = numThreads;
}
this.remainingBytesPerStream = maxBytesPerStream;
this.slots = new Semaphore(this.numThreads);
this.readPool = Executors.newFixedThreadPool(this.numThreads);
this.mainThread = new MainThread();
}
开发者ID:iVCE,项目名称:RDFS,代码行数:50,代码来源:ParallelStreamReader.java
注:本文中的org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论