• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java KafkaServerStartable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中kafka.server.KafkaServerStartable的典型用法代码示例。如果您正苦于以下问题:Java KafkaServerStartable类的具体用法?Java KafkaServerStartable怎么用?Java KafkaServerStartable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KafkaServerStartable类属于kafka.server包,在下文中一共展示了KafkaServerStartable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: KafkaSourceEmbeddedKafka

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public KafkaSourceEmbeddedKafka(Properties properties) {
  zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
  dir = new File(System.getProperty("java.io.tmpdir"), "kafka_log-" + UUID.randomUUID());
  try {
    FileUtils.deleteDirectory(dir);
  } catch (IOException e) {
    e.printStackTrace();
  }
  Properties props = new Properties();
  props.put("zookeeper.connect",zookeeper.getConnectString());
  props.put("broker.id","1");
  props.put("host.name", "localhost");
  props.put("port", String.valueOf(serverPort));
  props.put("log.dir", dir.getAbsolutePath());
  if (properties != null) {
    props.putAll(properties);
  }
  KafkaConfig config = new KafkaConfig(props);
  kafkaServer = new KafkaServerStartable(config);
  kafkaServer.startup();
  initProducer();
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:23,代码来源:KafkaSourceEmbeddedKafka.java


示例2: startKafkaServers

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafkaServers() throws IOException {
    for (int i = 0; i < numberOfKafkaBrokers; i++) {
        int port = BASE_KAFKA_PORT + i;
        File logDir = folder.newFolder("kafka-local-" + i);

        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", String.format("%s:%s", LOCALHOST, ZOOKEEPER_PORT));
        properties.setProperty("broker.id", String.valueOf(i + 1));
        properties.setProperty("host.name", LOCALHOST);
        properties.setProperty("port", Integer.toString(port));
        properties.setProperty("log.dir", logDir.getAbsolutePath());
        properties.setProperty("log.flush.interval.messages", String.valueOf(1));

        KafkaServerStartable broker = startBroker(properties);
        brokers.add(broker);
    }
}
 
开发者ID:epam,项目名称:Lagerta,代码行数:18,代码来源:EmbeddedKafkaRule.java


示例3: startKafkaServers

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafkaServers() throws IOException {
    for (int i = 0; i < numberOfKafkaBrokers; i++) {
        int port = kafkaPort + i;
        File logDir = folder.mkSubDir(String.format("kafka-local-%s-%s", kafkaPort, i));

        Properties properties = new Properties();
        properties.setProperty(KafkaConfig.ZkConnectProp(), String.format("%s:%s", LOCALHOST, zookeeperPort));
        properties.setProperty(KafkaConfig.BrokerIdProp(), String.valueOf(i + 1));
        properties.setProperty(KafkaConfig.ListenersProp(), String.format("PLAINTEXT://%s:%s", LOCALHOST, port));
        properties.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());
        properties.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
        properties.setProperty(KafkaConfig.LogFlushIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
        properties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), String.valueOf(false));
        properties.setProperty(KafkaConfig.DeleteTopicEnableProp(), String.valueOf(true));
        properties.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(numberOfKafkaBrokers));

        KafkaServerStartable broker = startBroker(properties);
        brokers.add(broker);
    }
}
 
开发者ID:epam,项目名称:Lagerta,代码行数:21,代码来源:EmbeddedKafka.java


示例4: startKafkaServers

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafkaServers() throws IOException {
    for (int i = 0; i < numberOfKafkaBrokers; i++) {
        int port = kafkaPort + i;
        File logDir = folder.newFolder(String.format("kafka-local-%s-%s", kafkaPort, i));

        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", String.format("%s:%s", LOCALHOST, zookeeperPort));
        properties.setProperty("broker.id", String.valueOf(i + 1));
        properties.setProperty("host.name", LOCALHOST);
        properties.setProperty("port", Integer.toString(port));
        properties.setProperty("log.dir", logDir.getAbsolutePath());
        properties.setProperty("log.flush.interval.messages", String.valueOf(1));
        properties.setProperty("log.retention.ms", String.valueOf(Long.MAX_VALUE));
        properties.setProperty("controlled.shutdown.enable", String.valueOf(false));
        properties.setProperty("delete.topic.enable", String.valueOf(true));
        properties.setProperty("num.partitions", String.valueOf(numberOfKafkaBrokers));

        KafkaServerStartable broker = startBroker(properties);
        brokers.add(broker);
    }
}
 
开发者ID:epam,项目名称:Lagerta,代码行数:22,代码来源:EmbeddedKafkaRule.java


示例5: start

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
/**
 * Starts the Kafka broker.
 *
 * @throws IOException if an error occurs during initialization
 */
public synchronized void start() throws IOException {
  log.info("Starting Kafka broker on port {}", port);

  logsDir = Files.createTempDirectory(LocalKafkaBroker.class.getSimpleName());
  logsDir.toFile().deleteOnExit();

  kafkaServer = new KafkaServerStartable(new KafkaConfig(ConfigUtils.keyValueToProperties(
      "broker.id", TEST_BROKER_ID,
      "log.dirs", logsDir.toAbsolutePath(),
      "listeners", "PLAINTEXT://:" + port,
      "zookeeper.connect", "localhost:" + zkPort,
      // Above are for Kafka 0.8; following are for 0.9+
      "message.max.bytes", 1 << 26,
      "replica.fetch.max.bytes", 1 << 26
  ), false));
  kafkaServer.startup();
}
 
开发者ID:oncewang,项目名称:oryx2,代码行数:23,代码来源:LocalKafkaBroker.java


示例6: startServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public static KafkaServerStartable startServer(final int port, final int brokerId,
    final String zkStr, final Properties configuration) {
  // Create the ZK nodes for Kafka, if needed
  int indexOfFirstSlash = zkStr.indexOf('/');
  if (indexOfFirstSlash != -1) {
    String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
    String zkNodePath = zkStr.substring(indexOfFirstSlash);
    ZkClient client = new ZkClient(bareZkUrl);
    client.createPersistent(zkNodePath, true);
    client.close();
  }

  File logDir = new File("/tmp/kafka-" + Double.toHexString(Math.random()));
  logDir.mkdirs();

  configureKafkaPort(configuration, port);
  configureZkConnectionString(configuration, zkStr);
  configureBrokerId(configuration, brokerId);
  configureKafkaLogDirectory(configuration, logDir);
  KafkaConfig config = new KafkaConfig(configuration);

  KafkaServerStartable serverStartable = new KafkaServerStartable(config);
  serverStartable.startup();

  return serverStartable;
}
 
开发者ID:uber,项目名称:uReplicator,代码行数:27,代码来源:KafkaStarterUtils.java


示例7: startKafka

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafka() throws InterruptedException {
    startInNewThread(() -> {
        try {
            String kafkaConfig = TestServerManager.class.getClassLoader().getResource("server.properties").getPath();
            logger.debug("Starting Kafka server using config:" + kafkaConfig);
            String[] kafkaArgs = {kafkaConfig};
            Properties serverProps = Kafka.getPropsFromArgs(kafkaArgs);
            KafkaServerStartable kafkaServerStartable = KafkaServerStartable.fromProps(serverProps);
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    kafkaServerStartable.shutdown();
                }
            });
            kafkaServerStartable.startup();
        } catch (RuntimeException ex) {
            logger.error("Failed to start kafka", ex);
            throw ex;
        }
    }, "Kafka").join();
    logger.debug("Kafka started.");
}
 
开发者ID:badalgeek,项目名称:EventStreamAnalytics,代码行数:22,代码来源:TestServerManager.java


示例8: startup

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void startup() {
  for (int i = 0; i < ports.size(); i++) {
    Integer port = ports.get(i);
    File logDir = TestUtils.constructTempDir("kafka-local");

    Properties properties = new Properties();
    properties.putAll(baseProperties);
    properties.setProperty("zookeeper.connect", zkConnection);
    properties.setProperty("broker.id", String.valueOf(i + 1));
    properties.setProperty("advertised.host.name", "127.0.0.1");
    properties.setProperty("host.name", "127.0.0.1");
    properties.setProperty("advertised.port", Integer.toString(port));
    properties.setProperty("port", Integer.toString(port));
    properties.setProperty("log.dirs", logDir.getAbsolutePath());
    properties.setProperty("offsets.topic.num.partitions", "1");
    properties.setProperty("offsets.topic.replication.factor", "1");
    properties.setProperty("log.flush.interval.messages", String.valueOf(1));

    KafkaServerStartable broker = startBroker(properties);

    brokers.add(broker);
    logDirs.add(logDir);
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:EmbeddedKafkaCluster.java


示例9: startServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr,
    final Properties configuration) {
  // Create the ZK nodes for Kafka, if needed
  int indexOfFirstSlash = zkStr.indexOf('/');
  if (indexOfFirstSlash != -1) {
    String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
    String zkNodePath = zkStr.substring(indexOfFirstSlash);
    ZkClient client = new ZkClient(bareZkUrl);
    client.createPersistent(zkNodePath, true);
    client.close();
  }

  File logDir = new File("/tmp/kafka-" + Double.toHexString(Math.random()));
  logDir.mkdirs();

  configureKafkaPort(configuration, port);
  configureZkConnectionString(configuration, zkStr);
  configureBrokerId(configuration, brokerId);
  configureKafkaLogDirectory(configuration, logDir);
  KafkaConfig config = new KafkaConfig(configuration);

  KafkaServerStartable serverStartable = new KafkaServerStartable(config);
  serverStartable.startup();

  return serverStartable;
}
 
开发者ID:uber,项目名称:chaperone,代码行数:27,代码来源:KafkaStarterUtils.java


示例10: startKafkaServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
{
  // before start, clean the kafka dir if it exists
  FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));

  Properties props = new Properties();
  props.setProperty("broker.id", "" + brokerid);
  props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
  props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
  props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
  props.setProperty("default.replication.factor", "1");
  // set this to 50000 to boost the performance so most test data are in memory before flush to disk
  props.setProperty("log.flush.interval.messages", "50000");
  if (hasMultiPartition) {
    props.setProperty("num.partitions", "2");
  } else {
    props.setProperty("num.partitions", "1");
  }

  broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
  broker[clusterid][brokerid].startup();

}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:24,代码来源:KafkaOperatorTestBase.java


示例11: startServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr,
    final Properties configuration) {
  // Create the ZK nodes for Kafka, if needed
  int indexOfFirstSlash = zkStr.indexOf('/');
  if (indexOfFirstSlash != -1) {
    String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
    String zkNodePath = zkStr.substring(indexOfFirstSlash);
    ZkClient client = new ZkClient(bareZkUrl);
    client.createPersistent(zkNodePath, true);
    client.close();
  }

  File logDir = new File("/tmp/kafka-" + Double.toHexString(Math.random()));
  logDir.mkdirs();

  configuration.put("port", Integer.toString(port));
  configuration.put("zookeeper.connect", zkStr);
  configuration.put("broker.id", Integer.toString(brokerId));
  configuration.put("log.dirs", logDir.getAbsolutePath());
  KafkaConfig config = new KafkaConfig(configuration);

  KafkaServerStartable serverStartable = new KafkaServerStartable(config);
  serverStartable.startup();

  return serverStartable;
}
 
开发者ID:Hanmourang,项目名称:Pinot,代码行数:27,代码来源:KafkaStarterUtils.java


示例12: TestKafkaServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public TestKafkaServer(Properties properties) {
	String path = properties.getProperty("log.dir");
	locate = Files.createTempDir();
	File dir = new File(locate, path).getAbsoluteFile();
	properties.setProperty("log.dir", dir.getAbsolutePath());

	kafkaConfig = new KafkaConfig(properties);
	System.out.println("num.partitions =" + kafkaConfig.numPartitions());
	System.out.println("background.threads ="
			+ kafkaConfig.backgroundThreads());
	kafkaServer = new KafkaServerStartable(kafkaConfig);
	try {
		kafkaServer.startup();
	} catch (Exception e) {
		e.printStackTrace();
	}
	System.out.println("embedded kafka is up");
}
 
开发者ID:pulsarIO,项目名称:jetstream,代码行数:19,代码来源:TestKafkaServer.java


示例13: setupKafkaBroker

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void setupKafkaBroker() {
    try {
        // mock zookeeper
        zkTestServer = new TestingServer(2181);
        // mock kafka
        Properties props = new Properties();
        props.put("broker.id", "0");
        props.put("host.name", "localhost");
        props.put("port", "9092");
        props.put("log.dir", "/tmp/tmp_kafka_dir");
        props.put("zookeeper.connect", zkTestServer.getConnectString());
        props.put("replica.socket.timeout.ms", "1500");
        KafkaConfig config = new KafkaConfig(props);
        kafkaServer = new KafkaServerStartable(config);
        kafkaServer.startup();

        // create "sensordata" topic
        ZkClient zkClient = new ZkClient(zkTestServer.getConnectString(), 10000, 10000, ZKStringSerializer$.MODULE$);
        AdminUtils.createTopic(zkClient, "sensordata", 1, 1, new Properties());
        zkClient.close();
    } catch (Exception e) {
        log.error("Error running local Kafka broker / Zookeeper", e);
    }
}
 
开发者ID:wso2,项目名称:product-cep,代码行数:25,代码来源:KafkaTestCase.java


示例14: EmbeddedKafka

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
EmbeddedKafka(EmbeddedZookeeper zookeeper, int kafkaPort) throws IOException {
  this.zookeeper = checkNotNull(zookeeper, "zookeeper is null");

  this.port = kafkaPort;
  this.kafkaDataDir = Files.createTempDir();

  Properties properties = new Properties();
  properties.setProperty("broker.id", "0");
  properties.setProperty("host.name", "localhost");
  properties.setProperty("num.partitions", "2");
  properties.setProperty("log.flush.interval.messages", "10000");
  properties.setProperty("log.flush.interval.ms", "1000");
  properties.setProperty("log.retention.minutes", "60");
  properties.setProperty("log.segment.bytes", "1048576");
  properties.setProperty("auto.create.topics.enable", "false");
  properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
  properties.setProperty("port", Integer.toString(port));
  properties.setProperty("log.dirs", kafkaDataDir.getAbsolutePath());
  properties.setProperty("zookeeper.connect", zookeeper.getConnectString());

  KafkaConfig config = new KafkaConfig(properties);
  this.kafka = new KafkaServerStartable(config);
}
 
开发者ID:apache,项目名称:tajo,代码行数:24,代码来源:EmbeddedKafka.java


示例15: LocalKafkaServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public LocalKafkaServer() throws IOException {

		while (new File(logDir).exists()) {
			FileUtils.deleteDirectory(new File(logDir));
		}

		Properties props = new Properties();
		props.put("broker.id", nodeId);
		props.put("port", port);
		props.put("log.dir", logDir);
		props.put("zookeeper.connect", zkConnect);
		props.put("host.name", "127.0.0.1");
		KafkaConfig conf = new KafkaConfig(props);

                zkUtils = ZkUtils.apply(props.getProperty("zookeeper.connect"),
                          30000,
                          30000,
                          JaasUtils.isZkSecurityEnabled());


		server = new KafkaServerStartable(conf);
		server.startup();
	}
 
开发者ID:blackberry,项目名称:Krackle,代码行数:24,代码来源:LocalKafkaServer.java


示例16: startKafkaServer

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private static KafkaServerStartable startKafkaServer(int kafkaPort, int zkPort) {
    File logDir;
    try {
        logDir = java.nio.file.Files.createTempDirectory("kafka-logs").toFile();
    } catch (IOException e) {
        throw new RuntimeException("Unable to create Kafka temp dirs!", e);
    }

    LOGGER.info("Kafka log dir: " + logDir.getAbsolutePath());
    logDir.deleteOnExit();

    Properties kafkaBrokerConfig = new Properties();
    kafkaBrokerConfig.setProperty("zookeeper.connect", "127.0.0.1:" + zkPort);
    kafkaBrokerConfig.setProperty("broker.id", "0");
    kafkaBrokerConfig.setProperty("host.name", "127.0.0.1");
    kafkaBrokerConfig.setProperty("port", Integer.toString(kafkaPort));
    kafkaBrokerConfig.setProperty("log.dir", logDir.getAbsolutePath());
    kafkaBrokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));

    KafkaServerStartable broker = new KafkaServerStartable(new KafkaConfig(kafkaBrokerConfig));
    broker.startup();
    return broker;
}
 
开发者ID:DDTH,项目名称:ddth-kafka,代码行数:24,代码来源:QndEmbeddedServer.java


示例17: setup

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
@Override
public void setup()
		throws Exception {
	LOGGER.info("Starting up Kafka Server...");

	FileUtils.deleteDirectory(KafkaTestUtils.DEFAULT_LOG_DIR);

	final boolean success = KafkaTestUtils.DEFAULT_LOG_DIR.mkdir();
	if (!success) {
		LOGGER.warn("Unable to create Kafka log dir [" + KafkaTestUtils.DEFAULT_LOG_DIR.getAbsolutePath() + "]");
	}
	final KafkaConfig config = KafkaTestUtils.getKafkaBrokerConfig();
	kafkaServer = new KafkaServerStartable(
			config);

	kafkaServer.startup();
	Thread.sleep(3000);
}
 
开发者ID:locationtech,项目名称:geowave,代码行数:19,代码来源:KafkaTestEnvironment.java


示例18: KafkaLocal

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public KafkaLocal(Properties kafkaProperties) throws IOException, InterruptedException{
	KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
	
	//start local zookeeper
	System.out.println("starting local zookeeper...");
	zookeeper = new ZookeeperLocal();
       try {
           zookeeper.startzkServer(2181);
       }
       catch (Exception e) {
           throw new RuntimeException("Error starting local zookeeper server", e);
       }

	System.out.println("done");
	
	//start local kafka broker
	kafka = new KafkaServerStartable(kafkaConfig);
	System.out.println("starting local kafka broker...");
	kafka.startup();
	System.out.println("done");


}
 
开发者ID:Produban,项目名称:openbus,代码行数:24,代码来源:KafkaLocal.java


示例19: start

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void start(Properties props) throws Exception {
    Integer port = getZkPort(props);

    zk = new TestingServer(port, tempDir); // this starts the zk server
    System.out.println("Started ZooKeeper: ");
    System.out.println("--> Temp Directory: " + zk.getTempDirectory());

    props.put("log.dirs", tempDir.getAbsolutePath());
    KafkaConfig kafkaConfig = new KafkaConfig(props);
    kafka = new KafkaServerStartable(kafkaConfig);
    kafka.startup();
    System.out.println("Started KAFKA: ");
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:14,代码来源:TestUtils.java


示例20: start

import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void start(Properties properties) throws Exception {
    Integer port = getZkPort(properties);
    zk = new TestingServer(port);
    zk.start();

    KafkaConfig kafkaConfig = new KafkaConfig(properties);
    kafka = new KafkaServerStartable(kafkaConfig);
    kafka.startup();
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:10,代码来源:Original.java



注:本文中的kafka.server.KafkaServerStartable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Client类代码示例发布时间:2022-05-21
下一篇:
Java MultiPoint类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap