• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Function类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.spark.api.java.function.Function的典型用法代码示例。如果您正苦于以下问题:Java Function类的具体用法?Java Function怎么用?Java Function使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Function类属于org.apache.spark.api.java.function包,在下文中一共展示了Function类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createNGramDataFrame

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * Creates a n-gram data frame from text lines.
 * @param lines
 * @return a n-gram data frame.
 */
DataFrame createNGramDataFrame(JavaRDD<String> lines) {
	JavaRDD<Row> rows = lines.map(new Function<String, Row>(){
		private static final long serialVersionUID = -4332903997027358601L;
		
		@Override
		public Row call(String line) throws Exception {
			return RowFactory.create(Arrays.asList(line.split("\\s+")));
		}
	});
	StructType schema = new StructType(new StructField[] {
			new StructField("words",
					DataTypes.createArrayType(DataTypes.StringType), false,
					Metadata.empty()) });
	DataFrame wordDF = new SQLContext(jsc).createDataFrame(rows, schema);
	// build a bigram language model
	NGram transformer = new NGram().setInputCol("words")
			.setOutputCol("ngrams").setN(2);
	DataFrame ngramDF = transformer.transform(wordDF);
	ngramDF.show(10, false);
	return ngramDF;
}
 
开发者ID:phuonglh,项目名称:vn.vitk,代码行数:27,代码来源:NGramBuilder.java


示例2: parse

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * Parses a list of PoS-tagged sentences, each on a line and writes the result to an output 
 * file in a specified output format.
 * @param jsc
 * @param sentences
 * @param outputFileName
 * @param outuptFormat
 */
public void parse(JavaSparkContext jsc, List<String> sentences, String outputFileName, OutputFormat outputFormat) {
	JavaRDD<String> input = jsc.parallelize(sentences);
	JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction());
	JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction());
	JavaRDD<Row> rows = graphs.map(new Function<DependencyGraph, Row>() {
		private static final long serialVersionUID = -812004521983071103L;
		public Row call(DependencyGraph graph) {
			return RowFactory.create(graph.getSentence().toString(), graph.dependencies());
		}
	});
	StructType schema = new StructType(new StructField[]{
		new StructField("sentence", DataTypes.StringType, false, Metadata.empty()),	
		new StructField("dependency", DataTypes.StringType, false, Metadata.empty())
	});
	SQLContext sqlContext = new SQLContext(jsc);
	DataFrame df = sqlContext.createDataFrame(rows, schema);
	
	if (outputFormat == OutputFormat.TEXT)  
		df.select("dependency").write().text(outputFileName);
	else 
		df.repartition(1).write().json(outputFileName);
}
 
开发者ID:phuonglh,项目名称:vn.vitk,代码行数:31,代码来源:DependencyParser.java


示例3: main

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    if (args.length != 1) {
      System.err.println("Usage: JavaSleep <seconds>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaSleep");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism());
    Integer seconds = Integer.parseInt(args[0]);

    Integer[] init_val = new Integer[parallel];
    Arrays.fill(init_val, seconds);

    JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
      @Override
      public Integer call(Integer s) throws InterruptedException {
	    Thread.sleep(s * 1000);
        return 0;
      }
    });

    List<Integer> output = workload.collect();
    ctx.stop();
  }
 
开发者ID:thrill,项目名称:fst-bench,代码行数:27,代码来源:JavaSleep.java


示例4: main

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
	Flags.setFromCommandLineArgs(THE_OPTIONS, args);

	// 初始化Spark Conf.
	SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark");
	JavaSparkContext sc = new JavaSparkContext(conf);
	JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
	SQLContext sqlContext = new SQLContext(sc);

	// 初始化参数
	HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(",")));
	HashMap<String, String> kafkaParams = new HashMap<String, String>();
	kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker());

	// 从Kafka Stream获取数据
	JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
			StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

	JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
		private static final long serialVersionUID = 5266880065425088203L;

		public String call(Tuple2<String, String> tuple2) {
			return tuple2._2();
		}
	});

	JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> {
		List<ApacheAccessLog> list = new ArrayList<>();
		try {
			// 映射每一行
			list.add(ApacheAccessLog.parseFromLogLine(line));
			return list;
		} catch (RuntimeException e) {
			return list;
		}
	}).cache();

	accessLogsDStream.foreachRDD(rdd -> {

		// rdd to DataFrame
		DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
		// 写入Parquet文件
		df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());

		return null;
	});

	// 启动Streaming服务器
	jssc.start(); // 启动计算
	jssc.awaitTermination(); // 等待终止
}
 
开发者ID:sectong,项目名称:SparkToParquet,代码行数:52,代码来源:AppMain.java


示例5: predictForOutput_LogisticRegressionModel

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
public static JavaRDD<Tuple2<Object, Object>> predictForOutput_LogisticRegressionModel(LogisticRegressionModel model, JavaRDD<LabeledPoint> data){
    JavaRDD<Tuple2<Object, Object>> FeaturesAndPrediction = data.map(
      new Function<LabeledPoint, Tuple2<Object, Object>>() {
        private static final long serialVersionUID = 1L;
        public Tuple2<Object, Object> call(LabeledPoint p) {
          Double prediction = model.predict(p.features());
          return new Tuple2<Object, Object>(p.features(), prediction);
        }
      }
    );
    return FeaturesAndPrediction;    
}
 
开发者ID:Chih-Ling-Hsu,项目名称:Spark-Machine-Learning-Modules,代码行数:13,代码来源:PredictUnit.java


示例6: main

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    if (args.length < 2) {
      System.err.println("Usage: JavaTeraSort <HDFS_INPUT> <HDFS_OUTPUT>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaTeraSort");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    JavaRDD<String> lines = ctx.textFile(args[0], 1);
    Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism());
    Integer reducer  = Integer.parseInt(IOCommon.getProperty("hibench.default.shuffle.parallelism").get());
    JavaPairRDD<String, String> words = lines.mapToPair(new PairFunction<String, String, String>() {
        @Override
        public Tuple2<String, String> call(String s) throws Exception {
            return new Tuple2<String, String>(s.substring(0, 10), s.substring(10));
        }
    });


    JavaPairRDD<String, String> sorted = words.sortByKey(true, reducer);

    JavaRDD<String> result = sorted.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> e) throws Exception {
            return e._1() + e._2();
        }
    });

    result.saveAsTextFile(args[1]);

    ctx.stop();
  }
 
开发者ID:thrill,项目名称:fst-bench,代码行数:34,代码来源:JavaTeraSort.java


示例7: performJavaStream

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
static List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input, int noIters) {
    JavaRDD<StreamVectors> streamVectorsJavaRDD = ExampleUtils.getSparkContext(appName).parallelize(input);
    for (int i = 0; i < noIters; i++) {
        streamVectorsJavaRDD = streamVectorsJavaRDD.map(new Function<StreamVectors, StreamVectors>() {
            @Override
            public StreamVectors call(StreamVectors streamVectors) throws Exception {
                streamVectors.setStartRun(System.nanoTime());
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.C[idx] = streamVectors.A[idx];
                }
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx];
                }
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx];
                }
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx];
                }
                streamVectors.setEndRun(System.nanoTime());
                return streamVectors;
            }
        });
    }
    return streamVectorsJavaRDD.collect();
}
 
开发者ID:tudorv91,项目名称:SparkJNI,代码行数:27,代码来源:StreamUtils.java


示例8: performJavaStream

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
private List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input) {
    return ExampleUtils.getSparkContext(appName).parallelize(input).map(new Function<StreamVectors, StreamVectors>() {
        @Override
        public StreamVectors call(StreamVectors streamVectors) throws Exception {
            streamVectors.setStartRun(System.nanoTime());
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.C[idx] = streamVectors.A[idx];
            }
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx];
            }
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx];
            }
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx];
            }
            streamVectors.setEndRun(System.nanoTime());
            return streamVectors;
        }
    }).collect();
}
 
开发者ID:tudorv91,项目名称:SparkJNI,代码行数:23,代码来源:StreamMain.java


示例9: call

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
@Override
public void call(JavaPairRDD<PublisherGeoKey, AggregationLog> logsRDD) throws Exception {

    if (logsRDD != null) {
        LOG.info(" Data to process in RDD:" + logsRDD.count());

        JavaRDD<AggregationResult> aggResRDD = logsRDD.map(new Function<Tuple2<PublisherGeoKey, AggregationLog>, AggregationResult>() {
            @Override
            public AggregationResult call(
                    Tuple2<PublisherGeoKey, AggregationLog> arg0)
                    throws Exception {
                PublisherGeoKey p = arg0._1;
                AggregationLog a = arg0._2;
                return new AggregationResult(new Timestamp(a.getTimestamp()),
                        p.getPublisher(), p.getGeo(), a.getImps(),
                        (int) a.getUniquesHll().estimatedSize(),
                        a.getSumBids() / a.getImps());
            }
        });
        LOG.info(" Call Data Process Partition");
        aggResRDD.foreachPartition(new SaveLogAggPartition());
    } else
        LOG.error("Data to process:" + 0);
}
 
开发者ID:splicemachine,项目名称:splice-community-sample-code,代码行数:25,代码来源:SaveLogAggRDD.java


示例10: main

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }

    class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }

    JavaRDD<String> lines = sc.textFile("src/main/resources/compressed.gz");
    JavaRDD<Integer> lineLengths = lines.map(new GetLength());
    // Printing an RDD
    lineLengths.foreach(x-> System.out.println(x));

    int totalLength = lineLengths.reduce(new Sum());

    System.out.println(totalLength);
}
 
开发者ID:knoldus,项目名称:Sparkathon,代码行数:26,代码来源:PassingFunctions.java


示例11: compile

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * Create an appropriate {@link Function}-based predicate for deploying the given {@link PredicateDescriptor}
 * on Apache Spark.
 *
 * @param predicateDescriptor describes the function
 * @param operator            that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 * @param operatorContext     contains optimization information for the {@code operator}
 * @param inputs              that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 */
public <Type> Function<Type, Boolean> compile(
        PredicateDescriptor<Type> predicateDescriptor,
        SparkExecutionOperator operator,
        OptimizationContext.OperatorContext operatorContext,
        ChannelInstance[] inputs) {
    final Predicate<Type> javaImplementation = predicateDescriptor.getJavaImplementation();
    if (javaImplementation instanceof PredicateDescriptor.ExtendedSerializablePredicate) {
        return new ExtendedPredicateAdapater<>(
                (PredicateDescriptor.ExtendedSerializablePredicate<Type>) javaImplementation,
                new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
        );
    } else {
        return new PredicateAdapter<>(javaImplementation);
    }
}
 
开发者ID:daqcri,项目名称:rheem,代码行数:25,代码来源:FunctionCompiler.java


示例12: toTaggedSentence

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
private JavaRDD<String> toTaggedSentence(DataFrame output) {
	return output.javaRDD().map(new Function<Row, String>() {
		private static final long serialVersionUID = 4208643510231783579L;
		@Override
		public String call(Row row) throws Exception {
			String[] tokens = row.getString(0).trim().split("\\s+");
			String[] tags = row.getString(1).trim().split("\\s+");
			if (tokens.length != tags.length) {
				System.err.println("Incompatible lengths!");
				return null;
			}
			StringBuilder sb = new StringBuilder(64);
			for (int j = 0; j < tokens.length; j++) {
				sb.append(tokens[j]);
				sb.append('/');
				sb.append(tags[j]);
				sb.append(' ');
			}
			return sb.toString().trim();
		}
	});
}
 
开发者ID:phuonglh,项目名称:vn.vitk,代码行数:23,代码来源:Tagger.java


示例13: numCharacters

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * Counts the number of non-space characters in this data set. This utility method 
 * is used to check the tokenization result.
 * @param lines
 * @return number of characters
 */
int numCharacters(JavaRDD<String> lines) {
	JavaRDD<Integer> lengths = lines.map(new Function<String, Integer>() {
		private static final long serialVersionUID = -2189399343462982586L;
		@Override
		public Integer call(String line) throws Exception {
			line = line.replaceAll("[\\s_]+", "");
			return line.length();
		}
	});
	return lengths.reduce(new Function2<Integer, Integer, Integer>() {
		private static final long serialVersionUID = -8438072946884289401L;

		@Override
		public Integer call(Integer e0, Integer e1) throws Exception {
			return e0 + e1;
		}
	});
}
 
开发者ID:phuonglh,项目名称:vn.vitk,代码行数:25,代码来源:Tokenizer.java


示例14: toPairFlatMapFunction

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/** {@link KV} to pair flatmap function. */
public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() {
  return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() {
    @Override
    public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<KV<K, V>, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(KV<K, V> kv) {
                  return new Tuple2<>(kv.getKey(), kv.getValue());
                }
              });
      return outputItr;
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:TranslationUtils.java


示例15: fromPairFlatMapFunction

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/** A pair to {@link KV} flatmap function . */
static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() {
  return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() {
    @Override
    public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
      final Iterator<KV<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() {
                @Override
                public KV<K, V> apply(Tuple2<K, V> t2) {
                  return KV.of(t2._1(), t2._2());
                }
              });
      return outputItr;
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:TranslationUtils.java


示例16: pairFunctionToPairFlatMapFunction

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an
 * {@link Iterator} input. This is particularly useful because it allows to use functions written
 * for mapToPair functions in flatmapToPair functions.
 *
 * @param pairFunction the {@link PairFunction} to adapt.
 * @param <T> the input type.
 * @param <K> the output key type.
 * @param <V> the output value type.
 * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies
 *     the {@link PairFunction} on every element.
 */
public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction(
    final PairFunction<T, K, V> pairFunction) {
  return new PairFlatMapFunction<Iterator<T>, K, V>() {

    @Override
    public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<T, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(T t) {
                  try {
                    return pairFunction.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:TranslationUtils.java


示例17: functionToFlatMapFunction

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link
 * Iterator} input. This is particularly useful because it allows to use functions written for map
 * functions in flatmap functions.
 *
 * @param func the {@link Function} to adapt.
 * @param <InputT> the input type.
 * @param <OutputT> the output type.
 * @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the
 *     {@link Function} on every element.
 */
public static <InputT, OutputT>
    FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction(
        final Function<InputT, OutputT> func) {
  return new FlatMapFunction<Iterator<InputT>, OutputT>() {

    @Override
    public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception {
      final Iterator<OutputT> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<InputT, OutputT>() {

                @Override
                public OutputT apply(InputT t) {
                  try {
                    return func.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:TranslationUtils.java


示例18: fromByteFunctionIterable

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
/**
 * A function wrapper for converting a byte array pair to a key-value pair, where
 * values are {@link Iterable}.
 *
 * @param keyCoder Coder to deserialize keys.
 * @param valueCoder Coder to deserialize values.
 * @param <K>   The type of the key being deserialized.
 * @param <V>   The type of the value being deserialized.
 * @return A function that accepts a pair of byte arrays and returns a key-value pair.
 */
public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
    fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
    @Override
    public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
      return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
        Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
          @Override
          public V apply(byte[] bytes) {
            return fromByteArray(bytes, valueCoder);
          }
        }));
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:CoderHelpers.java


示例19: call

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
@Override
public void call(JavaRDD<String> rdd) throws Exception {

	JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
		private static final long serialVersionUID = 5167089361335095997L;

		@Override
		public Row call(String msg) {
			Row row = RowFactory.create(msg);
			return row;
		}
	});
	// Create Schema
	StructType schema = DataTypes.createStructType(
			new StructField[] { DataTypes.createStructField("Message", DataTypes.StringType, true) });

	// Get Spark 2.0 session
	SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
	Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);
	msgDataFrame.show();
}
 
开发者ID:jgperrin,项目名称:net.jgp.labs.spark,代码行数:22,代码来源:RowProcessor.java


示例20: testEsRDDZRead

import org.apache.spark.api.java.function.Function; //导入依赖的package包/类
public void testEsRDDZRead() throws Exception {
        String target = "spark-test/java-basic-read";

        RestUtils.touch("spark-test");
        RestUtils.postData(target, "{\"message\" : \"Hello World\",\"message_date\" : \"2014-05-25\"}".getBytes());
        RestUtils.postData(target, "{\"message\" : \"Goodbye World\",\"message_date\" : \"2014-05-25\"}".getBytes());
        RestUtils.refresh("spark-test");

//        JavaRDD<scala.collection.Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, target);
//        JavaRDD messages = esRDD.filter(new Function<scala.collection.Map<String, Object>, Boolean>() {
//            public Boolean call(scala.collection.Map<String, Object> map) {
//                for (Entry<String, Object> entry: JavaConversions.asJavaMap(map).entrySet()) {
//                    if (entry.getValue().toString().contains("message")) {
//                        return Boolean.TRUE;
//                    }
//                }
//                return Boolean.FALSE;
//            }
//        });

        JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, target).values();
        System.out.println(esRDD.collect());
        JavaRDD<Map<String, Object>> messages = esRDD.filter(new Function<Map<String, Object>, Boolean>() {
            @Override
            public Boolean call(Map<String, Object> map) throws Exception {
                return map.containsKey("message");
            }
        });

        // jdk8
        //esRDD.filter(m -> m.stream().filter(v -> v.contains("message")));

        assertThat((int) messages.count(), is(2));
        System.out.println(messages.take(10));
        System.out.println(messages);
    }
 
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:37,代码来源:AbstractJavaEsSparkTest.java



注:本文中的org.apache.spark.api.java.function.Function类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MapColor类代码示例发布时间:2022-05-20
下一篇:
Java LocalValidatorFactoryBean类代码示例发布时间:2022-05-20
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap