本文整理汇总了Java中org.apache.flink.core.fs.FileStatus类的典型用法代码示例。如果您正苦于以下问题:Java FileStatus类的具体用法?Java FileStatus怎么用?Java FileStatus使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FileStatus类属于org.apache.flink.core.fs包,在下文中一共展示了FileStatus类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createInputSplits
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public TweetFileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
FileSystem fileSystem = getFileSystem();
FileStatus[] statuses = fileSystem.listStatus(new Path(inputPath));
logger.info("Found {} files", statuses.length);
List<TweetFileInputSplit> splits = new ArrayList<>();
for (int i = 0; i < statuses.length; i++) {
FileStatus status = statuses[i];
String fileName = status.getPath().getName();
if (fileName.endsWith("edges")) {
splits.add(new TweetFileInputSplit(i, status.getPath()));
}
}
logger.info("Result number of splits: {}", splits.size());
return splits.toArray(new TweetFileInputSplit[splits.size()]);
}
开发者ID:mushketyk,项目名称:flink-examples,代码行数:19,代码来源:StanfordTweetsDataSetInputFormat.java
示例2: getFiles
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
protected List<FileStatus> getFiles() throws IOException {
// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
final FileSystem fs = this.filePath.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(this.filePath);
if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] partials = fs.listStatus(this.filePath);
for (FileStatus partial : partials) {
if (!partial.isDir()) {
files.add(partial);
}
}
} else {
files.add(pathFile);
}
return files;
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:BinaryInputFormat.java
示例3: listNewFiles
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
private List<String> listNewFiles(FileSystem fileSystem) throws IOException {
List<String> files = new ArrayList<String>();
FileStatus[] statuses = fileSystem.listStatus(new Path(path));
if (statuses == null) {
LOG.warn("Path does not exist: {}", path);
} else {
for (FileStatus status : statuses) {
Path filePath = status.getPath();
String fileName = filePath.getName();
long modificationTime = status.getModificationTime();
if (!isFiltered(fileName, modificationTime)) {
files.add(filePath.toString());
modificationTimes.put(fileName, modificationTime);
}
}
}
return files;
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:FileMonitoringFunction.java
示例4: getFileStats
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path filePath, FileSystem fs,
ArrayList<FileStatus> files) throws IOException {
// get the file info and check whether the cached statistics are still valid.
final FileStatus file = fs.getFileStatus(filePath);
long totalLength = 0;
// enumerate all files
if (file.isDir()) {
totalLength += addFilesInDir(file.getPath(), files, false);
} else {
files.add(file);
testForUnsplittable(file);
totalLength += file.getLen();
}
// check the modification time stamp
long latestModTime = 0;
for (FileStatus f : files) {
latestModTime = Math.max(f.getModificationTime(), latestModTime);
}
// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
}
// sanity check
if (totalLength <= 0) {
totalLength = BaseStatistics.SIZE_UNKNOWN;
}
return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
}
开发者ID:axbaretto,项目名称:flink,代码行数:34,代码来源:FileInputFormat.java
示例5: addFilesInDir
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
/**
* Enumerate all files in the directory and recursive if enumerateNestedFiles is true.
* @return the total length of accepted files.
*/
private long addFilesInDir(Path path, List<FileStatus> files, boolean logExcludedFiles)
throws IOException {
final FileSystem fs = path.getFileSystem();
long length = 0;
for(FileStatus dir: fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir) && enumerateNestedFiles) {
length += addFilesInDir(dir.getPath(), files, logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
}
}
}
else {
if(acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:FileInputFormat.java
示例6: createStatistics
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
/**
* Fill in the statistics. The last modification time and the total input size are prefilled.
*
* @param files
* The files that are associated with this block input format.
* @param stats
* The pre-filled statistics.
*/
protected SequentialStatistics createStatistics(List<FileStatus> files, FileBaseStatistics stats)
throws IOException {
if (files.isEmpty()) {
return null;
}
BlockInfo blockInfo = new BlockInfo();
long totalCount = 0;
for (FileStatus file : files) {
// invalid file
if (file.getLen() < blockInfo.getInfoSize()) {
continue;
}
FileSystem fs = file.getPath().getFileSystem();
try (FSDataInputStream fdis = fs.open(file.getPath(), blockInfo.getInfoSize())) {
fdis.seek(file.getLen() - blockInfo.getInfoSize());
blockInfo.read(new DataInputViewStreamWrapper(fdis));
totalCount += blockInfo.getAccumulatedRecordCount();
}
}
final float avgWidth = totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount);
return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth,
totalCount);
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:BinaryInputFormat.java
示例7: listStatus
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
final File localf = pathToFile(f);
FileStatus[] results;
if (!localf.exists()) {
return null;
}
if (localf.isFile()) {
return new FileStatus[] { new LocalFileStatus(localf, this) };
}
final String[] names = localf.list();
if (names == null) {
return null;
}
results = new FileStatus[names.length];
for (int i = 0; i < names.length; i++) {
results[i] = getFileStatus(new Path(f, names[i]));
}
return results;
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:LocalFileSystem.java
示例8: findTestFiles
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
private static List<String> findTestFiles() throws Exception {
List<String> files = new ArrayList<>();
FileSystem fs = FileSystem.getLocalFileSystem();
FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
for (FileStatus f : status) {
String file = f.getPath().toString();
if (file.endsWith(".py")) {
files.add(file);
}
}
return files;
}
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:PythonPlanBinderTest.java
示例9: getFileBlockLocations
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
throws IOException {
if (!(file instanceof HadoopFileStatus)) {
throw new IOException("file is not an instance of DistributedFileStatus");
}
final HadoopFileStatus f = (HadoopFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
start, len);
// Wrap up HDFS specific block location objects
final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
}
return distBlkLocations;
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:HadoopFileSystem.java
示例10: getFileBlockLocations
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file,
final long start, final long len) throws IOException {
if (!(file instanceof HadoopFileStatus)) {
throw new IOException(
"file is not an instance of DistributedFileStatus");
}
final HadoopFileStatus f = (HadoopFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs
.getFileBlockLocations(f.getInternalFileStatus(), start, len);
// Wrap up HDFS specific block location objects
final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
}
return distBlkLocations;
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:MapRFileSystem.java
示例11: copy
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
// TODO rewrite this to make it participate in the closable registry and the lifecycle of a task.
// we unwrap the file system to get raw streams without safety net
FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
if (!tFS.exists(targetPath)) {
if (sFS.getFileStatus(sourcePath).isDir()) {
tFS.mkdirs(targetPath);
FileStatus[] contents = sFS.listStatus(sourcePath);
for (FileStatus content : contents) {
String distPath = content.getPath().toString();
if (content.isDir()) {
if (distPath.endsWith("/")) {
distPath = distPath.substring(0, distPath.length() - 1);
}
}
String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
copy(content.getPath(), new Path(localPath), executable);
}
} else {
try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
IOUtils.copyBytes(fsInput, lfsOutput);
//noinspection ResultOfMethodCallIgnored
new File(targetPath.toString()).setExecutable(executable);
} catch (IOException ioe) {
LOG.error("could not copy file to local file cache.", ioe);
}
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:31,代码来源:FileCache.java
示例12: monitorDirAndForwardSplits
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
private void monitorDirAndForwardSplits(FileSystem fs,
SourceContext<TimestampedFileInputSplit> context) throws IOException {
assert (Thread.holdsLock(checkpointLock));
Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);
for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
long modificationTime = splits.getKey();
for (TimestampedFileInputSplit split: splits.getValue()) {
LOG.info("Forwarding split: " + split);
context.collect(split);
}
// update the global modification time
globalModificationTime = Math.max(globalModificationTime, modificationTime);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:ContinuousFileMonitoringFunction.java
示例13: getInputSplitsSortedByModTime
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
/**
* Creates the input splits to be forwarded to the downstream tasks of the
* {@link ContinuousFileReaderOperator}. Splits are sorted <b>by modification time</b> before
* being forwarded and only splits belonging to files in the {@code eligibleFiles}
* list will be processed.
* @param eligibleFiles The files to process.
*/
private Map<Long, List<TimestampedFileInputSplit>> getInputSplitsSortedByModTime(
Map<Path, FileStatus> eligibleFiles) throws IOException {
Map<Long, List<TimestampedFileInputSplit>> splitsByModTime = new TreeMap<>();
if (eligibleFiles.isEmpty()) {
return splitsByModTime;
}
for (FileInputSplit split: format.createInputSplits(readerParallelism)) {
FileStatus fileStatus = eligibleFiles.get(split.getPath());
if (fileStatus != null) {
Long modTime = fileStatus.getModificationTime();
List<TimestampedFileInputSplit> splitsToForward = splitsByModTime.get(modTime);
if (splitsToForward == null) {
splitsToForward = new ArrayList<>();
splitsByModTime.put(modTime, splitsToForward);
}
splitsToForward.add(new TimestampedFileInputSplit(
modTime, split.getSplitNumber(), split.getPath(),
split.getStart(), split.getLength(), split.getHostnames()));
}
}
return splitsByModTime;
}
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:ContinuousFileMonitoringFunction.java
示例14: getFiles
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
protected List<FileStatus> getFiles() throws IOException {
// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
final FileSystem fs = this.filePath.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(this.filePath);
if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] partials = fs.listStatus(this.filePath);
for (int i = 0; i < partials.length; i++) {
if (!partials[i].isDir()) {
files.add(partials[i]);
}
}
} else {
files.add(pathFile);
}
return files;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:22,代码来源:BinaryInputFormat.java
示例15: openInput
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
/**
* Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
* initializes the format.
*
* @param <T>
* the class of the InputFormat
* @param inputFormatClass
* the class of the InputFormat
* @param path
* the path of the file
* @param configuration
* optional configuration of the InputFormat
* @return the created {@link InputFormat}
* @throws IOException
* if an I/O error occurred while accessing the file or initializing the InputFormat.
*/
public static <T, F extends FileInputFormat<T>> F openInput(
Class<F> inputFormatClass, String path, Configuration configuration)
throws IOException
{
configuration = configuration == null ? new Configuration() : configuration;
Path normalizedPath = normalizePath(new Path(path));
final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
inputFormat.setFilePath(normalizedPath);
inputFormat.setOpenTimeout(0);
inputFormat.configure(configuration);
final FileSystem fs = FileSystem.get(normalizedPath.toUri());
FileStatus fileStatus = fs.getFileStatus(normalizedPath);
BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
return inputFormat;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:37,代码来源:FormatUtil.java
示例16: openAllInputs
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
/**
* Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
* initializes the formats.
*
* @param <T>
* the class of the InputFormat
* @param inputFormatClass
* the class of the InputFormat
* @param path
* the path of the file or to the directory containing the splits
* @param configuration
* optional configuration of the InputFormat
* @return the created {@link InputFormat}s for each file in the specified path
* @throws IOException
* if an I/O error occurred while accessing the files or initializing the InputFormat.
*/
@SuppressWarnings("unchecked")
public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
Path nephelePath = new Path(path);
FileSystem fs = nephelePath.getFileSystem();
FileStatus fileStatus = fs.getFileStatus(nephelePath);
if (!fileStatus.isDir()) {
return Arrays.asList(openInput(inputFormatClass, path, configuration));
}
FileStatus[] list = fs.listStatus(nephelePath);
List<F> formats = new ArrayList<F>();
for (int index = 0; index < list.length; index++) {
formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
}
return formats;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:33,代码来源:FormatUtil.java
示例17: getFileBlockLocations
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file,
final long start, final long len) throws IOException {
if (!(file instanceof DistributedFileStatus)) {
throw new IOException(
"file is not an instance of DistributedFileStatus");
}
final DistributedFileStatus f = (DistributedFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs
.getFileBlockLocations(f.getInternalFileStatus(), start, len);
// Wrap up HDFS specific block location objects
final DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);
}
return distBlkLocations;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:23,代码来源:MapRFileSystem.java
示例18: open
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public FSDataInputStream open(final Path f) throws IOException {
final FileStatus fileStatus = getFileStatus(f); // Will throw FileNotFoundException if f does not exist
// Make sure f is not a directory
if (fileStatus.isDir()) {
throw new IOException("Cannot open " + f.toUri() + " because it is a directory");
}
final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
if (!bop.hasBucket() || !bop.hasObject()) {
throw new IOException(f.toUri() + " cannot be opened");
}
return new S3DataInputStream(this.s3Client, bop.getBucket(), bop.getObject());
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:18,代码来源:S3FileSystem.java
示例19: getFileBlockLocations
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
throws IOException
{
if (!(file instanceof DistributedFileStatus)) {
throw new IOException("file is not an instance of DistributedFileStatus");
}
final DistributedFileStatus f = (DistributedFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
start, len);
// Wrap up HDFS specific block location objects
final DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);
}
return distBlkLocations;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:22,代码来源:DistributedFileSystem.java
示例20: getStatistics
import org.apache.flink.core.fs.FileStatus; //导入依赖的package包/类
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapredInputFormat instanceof FileInputFormat)) {
return null;
}
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:HadoopInputFormatBase.java
注:本文中的org.apache.flink.core.fs.FileStatus类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论