本文整理汇总了Java中kafka.server.KafkaServerStartable类的典型用法代码示例。如果您正苦于以下问题:Java KafkaServerStartable类的具体用法?Java KafkaServerStartable怎么用?Java KafkaServerStartable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaServerStartable类属于kafka.server包,在下文中一共展示了KafkaServerStartable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: KafkaSourceEmbeddedKafka
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public KafkaSourceEmbeddedKafka(Properties properties) {
zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
dir = new File(System.getProperty("java.io.tmpdir"), "kafka_log-" + UUID.randomUUID());
try {
FileUtils.deleteDirectory(dir);
} catch (IOException e) {
e.printStackTrace();
}
Properties props = new Properties();
props.put("zookeeper.connect",zookeeper.getConnectString());
props.put("broker.id","1");
props.put("host.name", "localhost");
props.put("port", String.valueOf(serverPort));
props.put("log.dir", dir.getAbsolutePath());
if (properties != null) {
props.putAll(properties);
}
KafkaConfig config = new KafkaConfig(props);
kafkaServer = new KafkaServerStartable(config);
kafkaServer.startup();
initProducer();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:23,代码来源:KafkaSourceEmbeddedKafka.java
示例2: startKafkaServers
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafkaServers() throws IOException {
for (int i = 0; i < numberOfKafkaBrokers; i++) {
int port = BASE_KAFKA_PORT + i;
File logDir = folder.newFolder("kafka-local-" + i);
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", String.format("%s:%s", LOCALHOST, ZOOKEEPER_PORT));
properties.setProperty("broker.id", String.valueOf(i + 1));
properties.setProperty("host.name", LOCALHOST);
properties.setProperty("port", Integer.toString(port));
properties.setProperty("log.dir", logDir.getAbsolutePath());
properties.setProperty("log.flush.interval.messages", String.valueOf(1));
KafkaServerStartable broker = startBroker(properties);
brokers.add(broker);
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:18,代码来源:EmbeddedKafkaRule.java
示例3: startKafkaServers
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafkaServers() throws IOException {
for (int i = 0; i < numberOfKafkaBrokers; i++) {
int port = kafkaPort + i;
File logDir = folder.mkSubDir(String.format("kafka-local-%s-%s", kafkaPort, i));
Properties properties = new Properties();
properties.setProperty(KafkaConfig.ZkConnectProp(), String.format("%s:%s", LOCALHOST, zookeeperPort));
properties.setProperty(KafkaConfig.BrokerIdProp(), String.valueOf(i + 1));
properties.setProperty(KafkaConfig.ListenersProp(), String.format("PLAINTEXT://%s:%s", LOCALHOST, port));
properties.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());
properties.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
properties.setProperty(KafkaConfig.LogFlushIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
properties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), String.valueOf(false));
properties.setProperty(KafkaConfig.DeleteTopicEnableProp(), String.valueOf(true));
properties.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(numberOfKafkaBrokers));
KafkaServerStartable broker = startBroker(properties);
brokers.add(broker);
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:21,代码来源:EmbeddedKafka.java
示例4: startKafkaServers
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafkaServers() throws IOException {
for (int i = 0; i < numberOfKafkaBrokers; i++) {
int port = kafkaPort + i;
File logDir = folder.newFolder(String.format("kafka-local-%s-%s", kafkaPort, i));
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", String.format("%s:%s", LOCALHOST, zookeeperPort));
properties.setProperty("broker.id", String.valueOf(i + 1));
properties.setProperty("host.name", LOCALHOST);
properties.setProperty("port", Integer.toString(port));
properties.setProperty("log.dir", logDir.getAbsolutePath());
properties.setProperty("log.flush.interval.messages", String.valueOf(1));
properties.setProperty("log.retention.ms", String.valueOf(Long.MAX_VALUE));
properties.setProperty("controlled.shutdown.enable", String.valueOf(false));
properties.setProperty("delete.topic.enable", String.valueOf(true));
properties.setProperty("num.partitions", String.valueOf(numberOfKafkaBrokers));
KafkaServerStartable broker = startBroker(properties);
brokers.add(broker);
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:22,代码来源:EmbeddedKafkaRule.java
示例5: start
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
/**
* Starts the Kafka broker.
*
* @throws IOException if an error occurs during initialization
*/
public synchronized void start() throws IOException {
log.info("Starting Kafka broker on port {}", port);
logsDir = Files.createTempDirectory(LocalKafkaBroker.class.getSimpleName());
logsDir.toFile().deleteOnExit();
kafkaServer = new KafkaServerStartable(new KafkaConfig(ConfigUtils.keyValueToProperties(
"broker.id", TEST_BROKER_ID,
"log.dirs", logsDir.toAbsolutePath(),
"listeners", "PLAINTEXT://:" + port,
"zookeeper.connect", "localhost:" + zkPort,
// Above are for Kafka 0.8; following are for 0.9+
"message.max.bytes", 1 << 26,
"replica.fetch.max.bytes", 1 << 26
), false));
kafkaServer.startup();
}
开发者ID:oncewang,项目名称:oryx2,代码行数:23,代码来源:LocalKafkaBroker.java
示例6: startServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public static KafkaServerStartable startServer(final int port, final int brokerId,
final String zkStr, final Properties configuration) {
// Create the ZK nodes for Kafka, if needed
int indexOfFirstSlash = zkStr.indexOf('/');
if (indexOfFirstSlash != -1) {
String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
String zkNodePath = zkStr.substring(indexOfFirstSlash);
ZkClient client = new ZkClient(bareZkUrl);
client.createPersistent(zkNodePath, true);
client.close();
}
File logDir = new File("/tmp/kafka-" + Double.toHexString(Math.random()));
logDir.mkdirs();
configureKafkaPort(configuration, port);
configureZkConnectionString(configuration, zkStr);
configureBrokerId(configuration, brokerId);
configureKafkaLogDirectory(configuration, logDir);
KafkaConfig config = new KafkaConfig(configuration);
KafkaServerStartable serverStartable = new KafkaServerStartable(config);
serverStartable.startup();
return serverStartable;
}
开发者ID:uber,项目名称:uReplicator,代码行数:27,代码来源:KafkaStarterUtils.java
示例7: startKafka
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void startKafka() throws InterruptedException {
startInNewThread(() -> {
try {
String kafkaConfig = TestServerManager.class.getClassLoader().getResource("server.properties").getPath();
logger.debug("Starting Kafka server using config:" + kafkaConfig);
String[] kafkaArgs = {kafkaConfig};
Properties serverProps = Kafka.getPropsFromArgs(kafkaArgs);
KafkaServerStartable kafkaServerStartable = KafkaServerStartable.fromProps(serverProps);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
kafkaServerStartable.shutdown();
}
});
kafkaServerStartable.startup();
} catch (RuntimeException ex) {
logger.error("Failed to start kafka", ex);
throw ex;
}
}, "Kafka").join();
logger.debug("Kafka started.");
}
开发者ID:badalgeek,项目名称:EventStreamAnalytics,代码行数:22,代码来源:TestServerManager.java
示例8: startup
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void startup() {
for (int i = 0; i < ports.size(); i++) {
Integer port = ports.get(i);
File logDir = TestUtils.constructTempDir("kafka-local");
Properties properties = new Properties();
properties.putAll(baseProperties);
properties.setProperty("zookeeper.connect", zkConnection);
properties.setProperty("broker.id", String.valueOf(i + 1));
properties.setProperty("advertised.host.name", "127.0.0.1");
properties.setProperty("host.name", "127.0.0.1");
properties.setProperty("advertised.port", Integer.toString(port));
properties.setProperty("port", Integer.toString(port));
properties.setProperty("log.dirs", logDir.getAbsolutePath());
properties.setProperty("offsets.topic.num.partitions", "1");
properties.setProperty("offsets.topic.replication.factor", "1");
properties.setProperty("log.flush.interval.messages", String.valueOf(1));
KafkaServerStartable broker = startBroker(properties);
brokers.add(broker);
logDirs.add(logDir);
}
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:EmbeddedKafkaCluster.java
示例9: startServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr,
final Properties configuration) {
// Create the ZK nodes for Kafka, if needed
int indexOfFirstSlash = zkStr.indexOf('/');
if (indexOfFirstSlash != -1) {
String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
String zkNodePath = zkStr.substring(indexOfFirstSlash);
ZkClient client = new ZkClient(bareZkUrl);
client.createPersistent(zkNodePath, true);
client.close();
}
File logDir = new File("/tmp/kafka-" + Double.toHexString(Math.random()));
logDir.mkdirs();
configureKafkaPort(configuration, port);
configureZkConnectionString(configuration, zkStr);
configureBrokerId(configuration, brokerId);
configureKafkaLogDirectory(configuration, logDir);
KafkaConfig config = new KafkaConfig(configuration);
KafkaServerStartable serverStartable = new KafkaServerStartable(config);
serverStartable.startup();
return serverStartable;
}
开发者ID:uber,项目名称:chaperone,代码行数:27,代码来源:KafkaStarterUtils.java
示例10: startKafkaServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
{
// before start, clean the kafka dir if it exists
FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
Properties props = new Properties();
props.setProperty("broker.id", "" + brokerid);
props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
props.setProperty("default.replication.factor", "1");
// set this to 50000 to boost the performance so most test data are in memory before flush to disk
props.setProperty("log.flush.interval.messages", "50000");
if (hasMultiPartition) {
props.setProperty("num.partitions", "2");
} else {
props.setProperty("num.partitions", "1");
}
broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
broker[clusterid][brokerid].startup();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:24,代码来源:KafkaOperatorTestBase.java
示例11: startServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr,
final Properties configuration) {
// Create the ZK nodes for Kafka, if needed
int indexOfFirstSlash = zkStr.indexOf('/');
if (indexOfFirstSlash != -1) {
String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
String zkNodePath = zkStr.substring(indexOfFirstSlash);
ZkClient client = new ZkClient(bareZkUrl);
client.createPersistent(zkNodePath, true);
client.close();
}
File logDir = new File("/tmp/kafka-" + Double.toHexString(Math.random()));
logDir.mkdirs();
configuration.put("port", Integer.toString(port));
configuration.put("zookeeper.connect", zkStr);
configuration.put("broker.id", Integer.toString(brokerId));
configuration.put("log.dirs", logDir.getAbsolutePath());
KafkaConfig config = new KafkaConfig(configuration);
KafkaServerStartable serverStartable = new KafkaServerStartable(config);
serverStartable.startup();
return serverStartable;
}
开发者ID:Hanmourang,项目名称:Pinot,代码行数:27,代码来源:KafkaStarterUtils.java
示例12: TestKafkaServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public TestKafkaServer(Properties properties) {
String path = properties.getProperty("log.dir");
locate = Files.createTempDir();
File dir = new File(locate, path).getAbsoluteFile();
properties.setProperty("log.dir", dir.getAbsolutePath());
kafkaConfig = new KafkaConfig(properties);
System.out.println("num.partitions =" + kafkaConfig.numPartitions());
System.out.println("background.threads ="
+ kafkaConfig.backgroundThreads());
kafkaServer = new KafkaServerStartable(kafkaConfig);
try {
kafkaServer.startup();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("embedded kafka is up");
}
开发者ID:pulsarIO,项目名称:jetstream,代码行数:19,代码来源:TestKafkaServer.java
示例13: setupKafkaBroker
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private void setupKafkaBroker() {
try {
// mock zookeeper
zkTestServer = new TestingServer(2181);
// mock kafka
Properties props = new Properties();
props.put("broker.id", "0");
props.put("host.name", "localhost");
props.put("port", "9092");
props.put("log.dir", "/tmp/tmp_kafka_dir");
props.put("zookeeper.connect", zkTestServer.getConnectString());
props.put("replica.socket.timeout.ms", "1500");
KafkaConfig config = new KafkaConfig(props);
kafkaServer = new KafkaServerStartable(config);
kafkaServer.startup();
// create "sensordata" topic
ZkClient zkClient = new ZkClient(zkTestServer.getConnectString(), 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, "sensordata", 1, 1, new Properties());
zkClient.close();
} catch (Exception e) {
log.error("Error running local Kafka broker / Zookeeper", e);
}
}
开发者ID:wso2,项目名称:product-cep,代码行数:25,代码来源:KafkaTestCase.java
示例14: EmbeddedKafka
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
EmbeddedKafka(EmbeddedZookeeper zookeeper, int kafkaPort) throws IOException {
this.zookeeper = checkNotNull(zookeeper, "zookeeper is null");
this.port = kafkaPort;
this.kafkaDataDir = Files.createTempDir();
Properties properties = new Properties();
properties.setProperty("broker.id", "0");
properties.setProperty("host.name", "localhost");
properties.setProperty("num.partitions", "2");
properties.setProperty("log.flush.interval.messages", "10000");
properties.setProperty("log.flush.interval.ms", "1000");
properties.setProperty("log.retention.minutes", "60");
properties.setProperty("log.segment.bytes", "1048576");
properties.setProperty("auto.create.topics.enable", "false");
properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
properties.setProperty("port", Integer.toString(port));
properties.setProperty("log.dirs", kafkaDataDir.getAbsolutePath());
properties.setProperty("zookeeper.connect", zookeeper.getConnectString());
KafkaConfig config = new KafkaConfig(properties);
this.kafka = new KafkaServerStartable(config);
}
开发者ID:apache,项目名称:tajo,代码行数:24,代码来源:EmbeddedKafka.java
示例15: LocalKafkaServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public LocalKafkaServer() throws IOException {
while (new File(logDir).exists()) {
FileUtils.deleteDirectory(new File(logDir));
}
Properties props = new Properties();
props.put("broker.id", nodeId);
props.put("port", port);
props.put("log.dir", logDir);
props.put("zookeeper.connect", zkConnect);
props.put("host.name", "127.0.0.1");
KafkaConfig conf = new KafkaConfig(props);
zkUtils = ZkUtils.apply(props.getProperty("zookeeper.connect"),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
server = new KafkaServerStartable(conf);
server.startup();
}
开发者ID:blackberry,项目名称:Krackle,代码行数:24,代码来源:LocalKafkaServer.java
示例16: startKafkaServer
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
private static KafkaServerStartable startKafkaServer(int kafkaPort, int zkPort) {
File logDir;
try {
logDir = java.nio.file.Files.createTempDirectory("kafka-logs").toFile();
} catch (IOException e) {
throw new RuntimeException("Unable to create Kafka temp dirs!", e);
}
LOGGER.info("Kafka log dir: " + logDir.getAbsolutePath());
logDir.deleteOnExit();
Properties kafkaBrokerConfig = new Properties();
kafkaBrokerConfig.setProperty("zookeeper.connect", "127.0.0.1:" + zkPort);
kafkaBrokerConfig.setProperty("broker.id", "0");
kafkaBrokerConfig.setProperty("host.name", "127.0.0.1");
kafkaBrokerConfig.setProperty("port", Integer.toString(kafkaPort));
kafkaBrokerConfig.setProperty("log.dir", logDir.getAbsolutePath());
kafkaBrokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
KafkaServerStartable broker = new KafkaServerStartable(new KafkaConfig(kafkaBrokerConfig));
broker.startup();
return broker;
}
开发者ID:DDTH,项目名称:ddth-kafka,代码行数:24,代码来源:QndEmbeddedServer.java
示例17: setup
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
@Override
public void setup()
throws Exception {
LOGGER.info("Starting up Kafka Server...");
FileUtils.deleteDirectory(KafkaTestUtils.DEFAULT_LOG_DIR);
final boolean success = KafkaTestUtils.DEFAULT_LOG_DIR.mkdir();
if (!success) {
LOGGER.warn("Unable to create Kafka log dir [" + KafkaTestUtils.DEFAULT_LOG_DIR.getAbsolutePath() + "]");
}
final KafkaConfig config = KafkaTestUtils.getKafkaBrokerConfig();
kafkaServer = new KafkaServerStartable(
config);
kafkaServer.startup();
Thread.sleep(3000);
}
开发者ID:locationtech,项目名称:geowave,代码行数:19,代码来源:KafkaTestEnvironment.java
示例18: KafkaLocal
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public KafkaLocal(Properties kafkaProperties) throws IOException, InterruptedException{
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
//start local zookeeper
System.out.println("starting local zookeeper...");
zookeeper = new ZookeeperLocal();
try {
zookeeper.startzkServer(2181);
}
catch (Exception e) {
throw new RuntimeException("Error starting local zookeeper server", e);
}
System.out.println("done");
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
System.out.println("starting local kafka broker...");
kafka.startup();
System.out.println("done");
}
开发者ID:Produban,项目名称:openbus,代码行数:24,代码来源:KafkaLocal.java
示例19: start
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void start(Properties props) throws Exception {
Integer port = getZkPort(props);
zk = new TestingServer(port, tempDir); // this starts the zk server
System.out.println("Started ZooKeeper: ");
System.out.println("--> Temp Directory: " + zk.getTempDirectory());
props.put("log.dirs", tempDir.getAbsolutePath());
KafkaConfig kafkaConfig = new KafkaConfig(props);
kafka = new KafkaServerStartable(kafkaConfig);
kafka.startup();
System.out.println("Started KAFKA: ");
}
开发者ID:telstra,项目名称:open-kilda,代码行数:14,代码来源:TestUtils.java
示例20: start
import kafka.server.KafkaServerStartable; //导入依赖的package包/类
public void start(Properties properties) throws Exception {
Integer port = getZkPort(properties);
zk = new TestingServer(port);
zk.start();
KafkaConfig kafkaConfig = new KafkaConfig(properties);
kafka = new KafkaServerStartable(kafkaConfig);
kafka.startup();
}
开发者ID:telstra,项目名称:open-kilda,代码行数:10,代码来源:Original.java
注:本文中的kafka.server.KafkaServerStartable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论