本文整理汇总了Java中org.apache.kafka.common.metrics.MetricsReporter类的典型用法代码示例。如果您正苦于以下问题:Java MetricsReporter类的具体用法?Java MetricsReporter怎么用?Java MetricsReporter使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MetricsReporter类属于org.apache.kafka.common.metrics包,在下文中一共展示了MetricsReporter类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: close
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public void close() throws IOException {
try {
kafkaClient.close();
} finally {
for (MetricsReporter metricsReporter: this.reporters) {
metricsReporter.close();
}
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:StreamsKafkaClient.java
示例2: MockProcessorContext
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public MockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valSerde,
final RecordCollector.Supplier collectorSupplier,
final ThreadCache cache) {
this.stateDir = stateDir;
this.keySerde = keySerde;
this.valSerde = valSerde;
recordCollectorSupplier = collectorSupplier;
metrics = new Metrics(new MetricConfig(), Collections.singletonList((MetricsReporter) new JmxReporter()), new MockTime(), true);
this.cache = cache;
streamsMetrics = new MockStreamsMetrics(metrics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:MockProcessorContext.java
示例3: testUnused
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
@Test
public void testUnused() {
Properties props = new Properties();
String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
props.put(FakeMetricsReporterConfig.EXTRA_CONFIG, "my_value");
TestConfig config = new TestConfig(props);
assertTrue("metric.extra_config should be marked unused before getConfiguredInstances is called",
config.unused().contains(FakeMetricsReporterConfig.EXTRA_CONFIG));
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
assertTrue("All defined configurations should be marked as used", config.unused().isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:AbstractConfigTest.java
示例4: testValidInputs
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
private void testValidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
TestConfig config = new TestConfig(props);
try {
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
} catch (ConfigException e) {
fail("No exceptions are expected here, valid props are :" + props);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:AbstractConfigTest.java
示例5: testInvalidInputs
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
private void testInvalidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
TestConfig config = new TestConfig(props);
try {
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
fail("Expected a config exception due to invalid props :" + props);
} catch (KafkaException e) {
// this is good
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:AbstractConfigTest.java
示例6: KafkaMonitor
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public KafkaMonitor(Map<String, Map> testProps) throws Exception {
_apps = new ConcurrentHashMap<>();
_services = new ConcurrentHashMap<>();
for (Map.Entry<String, Map> entry : testProps.entrySet()) {
String name = entry.getKey();
Map props = entry.getValue();
if (!props.containsKey(CLASS_NAME_CONFIG))
throw new IllegalArgumentException(name + " is not configured with " + CLASS_NAME_CONFIG);
String className = (String) props.get(CLASS_NAME_CONFIG);
Class<?> cls = Class.forName(className);
if (App.class.isAssignableFrom(cls)) {
App test = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_apps.put(name, test);
} else if (Service.class.isAssignableFrom(cls)) {
Service service = (Service) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_services.put(name, service);
} else {
throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
}
}
_executor = Executors.newSingleThreadScheduledExecutor();
_offlineRunnables = new ConcurrentHashMap<>();
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
metrics.addMetric(metrics.metricName("offline-runnable-count", METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return _offlineRunnables.size();
}
}
);
}
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:37,代码来源:KafkaMonitor.java
示例7: WorkerGroupMember
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public WorkerGroupMember(DistributedConfig config,
String restUrl,
ConfigBackingStore configStorage,
WorkerRebalanceListener listener,
Time time) {
try {
this.time = time;
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
time,
true,
new ApiVersions());
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
this.coordinator = new WorkerCoordinator(this.client,
config.getString(DistributedConfig.GROUP_ID_CONFIG),
config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
restUrl,
configStorage,
listener);
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Connect group member created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
stop(true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:63,代码来源:WorkerGroupMember.java
示例8: StreamsKafkaClient
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public StreamsKafkaClient(final Config streamsConfig) {
this.streamsConfig = streamsConfig;
final Time time = new SystemTime();
final Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);
final Metadata metadata = new Metadata(streamsConfig.getLong(
StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
false
);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
// TODO: This should come from the KafkaStream
reporters.add(new JmxReporter("kafka.admin"));
final Metrics metrics = new Metrics(metricConfig, reporters, time);
final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
final Selector selector = new Selector(
streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics,
time,
"kafka-client",
channelBuilder);
kafkaClient = new NetworkClient(
selector,
metadata,
streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
time,
true,
new ApiVersions());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:49,代码来源:StreamsKafkaClient.java
示例9: checkInstances
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
void checkInstances(Class<?> expectedClassPropClass, Class<?>... expectedListPropClasses) {
assertEquals(expectedClassPropClass, getConfiguredInstance("class.prop", MetricsReporter.class).getClass());
List<?> list = getConfiguredInstances("list.prop", MetricsReporter.class);
for (int i = 0; i < list.size(); i++)
assertEquals(expectedListPropClasses[i], list.get(i).getClass());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:7,代码来源:AbstractConfigTest.java
示例10: ConsumeService
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public ConsumeService(Map<String, Object> props, String name) throws Exception {
_name = name;
Map consumerPropsOverride = props.containsKey(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG)
? (Map) props.get(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG) : new HashMap<>();
ConsumeServiceConfig config = new ConsumeServiceConfig(props);
String topic = config.getString(ConsumeServiceConfig.TOPIC_CONFIG);
String zkConnect = config.getString(ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
String brokerList = config.getString(ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG);
_latencySlaMs = config.getInt(ConsumeServiceConfig.LATENCY_SLA_MS_CONFIG);
_latencyPercentileMaxMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG);
_latencyPercentileGranularityMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG);
_running = new AtomicBoolean(false);
for (String property: NONOVERRIDABLE_PROPERTIES) {
if (consumerPropsOverride.containsKey(property)) {
throw new ConfigException("Override must not contain " + property + " config.");
}
}
Properties consumerProps = new Properties();
// Assign default config. This has the lowest priority.
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "kmf-consumer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kmf-consumer-group-" + new Random().nextInt());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
if (consumerClassName.equals(NewConsumer.class.getCanonicalName()) || consumerClassName.equals(NewConsumer.class.getSimpleName())) {
consumerClassName = NewConsumer.class.getCanonicalName();
} else if (consumerClassName.equals(OldConsumer.class.getCanonicalName()) || consumerClassName.equals(OldConsumer.class.getSimpleName())) {
consumerClassName = OldConsumer.class.getCanonicalName();
// The name/value of these configs are changed in the new consumer.
consumerProps.put("auto.commit.enable", "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
}
// Assign config specified for ConsumeService.
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
consumerProps.put("zookeeper.connect", zkConnect);
// Assign config specified for consumer. This has the highest priority.
consumerProps.putAll(consumerPropsOverride);
_consumer = (KMBaseConsumer) Class.forName(consumerClassName).getConstructor(String.class, Properties.class).newInstance(topic, consumerProps);
_thread = new Thread(new Runnable() {
@Override
public void run() {
try {
consume();
} catch (Exception e) {
LOG.error(_name + "/ConsumeService failed", e);
}
}
}, _name + " consume-service");
_thread.setDaemon(true);
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", _name);
_sensors = new ConsumeMetrics(metrics, tags);
}
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:69,代码来源:ConsumeService.java
示例11: ProduceService
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public ProduceService(Map<String, Object> props, String name) throws Exception {
_name = name;
ProduceServiceConfig config = new ProduceServiceConfig(props);
_zkConnect = config.getString(ProduceServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG);
_partitioner = config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, KMPartitioner.class);
_threadsNum = config.getInt(ProduceServiceConfig.PRODUCE_THREAD_NUM_CONFIG);
_topic = config.getString(ProduceServiceConfig.TOPIC_CONFIG);
_producerId = config.getString(ProduceServiceConfig.PRODUCER_ID_CONFIG);
_produceDelayMs = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_DELAY_MS_CONFIG);
_recordSize = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_SIZE_BYTE_CONFIG);
_sync = config.getBoolean(ProduceServiceConfig.PRODUCE_SYNC_CONFIG);
_treatZeroThroughputAsUnavailable = config.getBoolean(ProduceServiceConfig.PRODUCER_TREAT_ZERO_THROUGHPUT_AS_UNAVAILABLE_CONFIG);
_partitionNum = new AtomicInteger(0);
_running = new AtomicBoolean(false);
_nextIndexPerPartition = new ConcurrentHashMap<>();
_producerPropsOverride = props.containsKey(ProduceServiceConfig.PRODUCER_PROPS_CONFIG)
? (Map) props.get(ProduceServiceConfig.PRODUCER_PROPS_CONFIG) : new HashMap<>();
for (String property: NONOVERRIDABLE_PROPERTIES) {
if (_producerPropsOverride.containsKey(property)) {
throw new ConfigException("Override must not contain " + property + " config.");
}
}
if (producerClass.equals(NewProducer.class.getCanonicalName()) || producerClass.equals(NewProducer.class.getSimpleName())) {
_producerClassName = NewProducer.class.getCanonicalName();
} else {
_producerClassName = producerClass;
}
initializeProducer();
_produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory());
_handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory());
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", _name);
_sensors = new ProduceMetrics(metrics, tags);
}
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:47,代码来源:ProduceService.java
示例12: initialize
import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public static void initialize() {
MetricConfig metricConfig = new MetricConfig().samples(100).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter("io.confluent.ksql.metrics"));
metrics = new Metrics(metricConfig, reporters, new SystemTime());
}
开发者ID:confluentinc,项目名称:ksql,代码行数:7,代码来源:MetricCollectors.java
注:本文中的org.apache.kafka.common.metrics.MetricsReporter类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论