本文整理汇总了Java中backtype.storm.scheduler.INimbus类的典型用法代码示例。如果您正苦于以下问题:Java INimbus类的具体用法?Java INimbus怎么用?Java INimbus使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
INimbus类属于backtype.storm.scheduler包,在下文中一共展示了INimbus类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: createNimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
private NimbusData createNimbusData(Map conf, INimbus inimbus)
throws Exception {
TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() {
@Override
public void expire(Object key, Object val) {
try {
LOG.info("Close file " + String.valueOf(key));
if (val != null) {
if (val instanceof Channel) {
Channel channel = (Channel) val;
channel.close();
} else if (val instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) val;
is.close();
}
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
};
int file_copy_expiration_secs = JStormUtils.parseInt(
conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
TimeCacheMap<Object, Object> uploaders = new TimeCacheMap<Object, Object>(
file_copy_expiration_secs, expiredCallback);
TimeCacheMap<Object, Object> downloaders = new TimeCacheMap<Object, Object>(
file_copy_expiration_secs, expiredCallback);
// Callback callback=new TimerCallBack();
// StormTimer timer=Timer.mkTimerTimer(callback);
NimbusData data = new NimbusData(conf, downloaders, uploaders, inimbus);
return data;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:39,代码来源:NimbusServer.java
示例2: main
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// read configuration files
@SuppressWarnings("rawtypes")
/**
* 读取配置文件和启动参数
* 首先会读取默认配置jstorm-server-X.X.X.X.jar中的defaults.yaml
* 如果没有设置storm.conf.file参数
* 会默认读取storm.yaml
*/
Map config = Utils.readStormConfig();
/**
* 此处貌似是后续功能?
*/
JStormServerUtils.startTaobaoJvmMonitor();
NimbusServer instance = new NimbusServer();
/**
* TODO:后续查看一下Inimbus代码
*/
INimbus iNimbus = new DefaultInimbus();
/**
* 启动服务
*/
instance.launchServer(config, iNimbus);
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:30,代码来源:NimbusServer.java
示例3: createNimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
/**
* 1.此处定义一个TimcacheMap缓存失效时候的callback函数,关闭channel和stream
* 建立上传和下载两个缓存,缓存流
* 新建一个Nimbus对象
*/
@SuppressWarnings("rawtypes")
private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception {
TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() {
@Override
public void expire(Object key, Object val) {
try {
LOG.info("Close file " + String.valueOf(key));
if (val != null) {
if (val instanceof Channel) {
Channel channel = (Channel) val;
channel.close();
} else if (val instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) val;
is.close();
}
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
};
int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
TimeCacheMap<Object, Object> uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs,
expiredCallback);
TimeCacheMap<Object, Object> downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs,
expiredCallback);
// Callback callback=new TimerCallBack();
// StormTimer timer=Timer.mkTimerTimer(callback);
NimbusData data = new NimbusData(conf, downloaders, uploaders, inimbus);
return data;
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:43,代码来源:NimbusServer.java
示例4: launchServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
private void launchServer(final Map conf, INimbus inimbus) {
LOG.info("Begin to start nimbus with conf " + conf);
try {
// 1. check whether mode is distributed or not
StormConfig.validate_distributed_mode(conf);
createPid(conf);
initShutdownHook();
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
initFollowerThread(conf);
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
hs = new Httpserver(port, conf);
hs.start();
initContainerHBThread(conf);
while (!data.isLeader())
Utils.sleep(5000);
init(conf);
} catch (Throwable e) {
LOG.error("Fail to run nimbus ", e);
} finally {
cleanup();
}
LOG.info("Quit nimbus");
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:37,代码来源:NimbusServer.java
示例5: createNimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception {
// Callback callback=new TimerCallBack();
// StormTimer timer=Timer.mkTimerTimer(callback);
return new NimbusData(conf, inimbus);
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:8,代码来源:NimbusServer.java
示例6: createNimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
private NimbusData createNimbusData(Map conf, INimbus inimbus)
throws Exception {
TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() {
@Override
public void expire(Object key, Object val) {
try {
LOG.info("Close file " + String.valueOf(key));
if (val != null) {
if (val instanceof Channel) {
Channel channel = (Channel) val;
channel.close();
} else if (val instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) val;
is.close();
}
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
};
int file_copy_expiration_secs = JStormUtils.parseInt(
conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
TimeCacheMap<Object, Object> uploaders = new TimeCacheMap<Object, Object>(
file_copy_expiration_secs, expiredCallback);
TimeCacheMap<Object, Object> downloaders = new TimeCacheMap<Object, Object>(
file_copy_expiration_secs, expiredCallback);
// Callback callback=new TimerCallBack();
// StormTimer timer=Timer.mkTimerTimer(callback);
NimbusData data = new NimbusData(conf, downloaders, uploaders, inimbus);
return data;
}
开发者ID:greeenSY,项目名称:Tstream,代码行数:40,代码来源:NimbusServer.java
示例7: main
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
// read configuration files
@SuppressWarnings("rawtypes")
Map config = Utils.readStormConfig();
JStormServerUtils.startTaobaoJvmMonitor();
NimbusServer instance = new NimbusServer();
INimbus iNimbus = new DefaultInimbus();
instance.launchServer(config, iNimbus);
}
开发者ID:alibaba,项目名称:jstorm,代码行数:11,代码来源:NimbusServer.java
示例8: launchServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
private void launchServer(final Map conf, INimbus inimbus) {
LOG.info("Begin to start nimbus with conf " + conf);
try {
// 1. check whether mode is distributed or not
StormConfig.validate_distributed_mode(conf);
createPid(conf);
initShutdownHook();
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
initFollowerThread(conf);
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
hs = new Httpserver(port, conf);
hs.start();
initContainerHBThread(conf);
serviceHandler = new ServiceHandler(data);
initThrift(conf);
} catch (Throwable e) {
if (e instanceof OutOfMemoryError) {
LOG.error("Halting due to out of memory error...");
}
LOG.error("Fail to run nimbus ", e);
} finally {
cleanup();
}
LOG.info("Quit nimbus");
}
开发者ID:alibaba,项目名称:jstorm,代码行数:38,代码来源:NimbusServer.java
示例9: launcherLocalServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus) throws Exception {
LOG.info("Begin to start nimbus on local model");
StormConfig.validate_local_mode(conf);
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
init(conf);
serviceHandler = new ServiceHandler(data);
return serviceHandler;
}
开发者ID:alibaba,项目名称:jstorm,代码行数:10,代码来源:NimbusServer.java
示例10: getInimubs
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public INimbus getInimubs() {
return inimubs;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:NimbusData.java
示例11: launchServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
private void launchServer(final Map conf, INimbus inimbus) {
LOG.info("Begin to start nimbus with conf " + conf);
try {
// 1. check whether mode is distributed or not
/**
* 判断启动模式是分布式还是本地
* 判断依据为配置项storm.cluster.mode
* local : 本地
* distributed : 分布式
*/
StormConfig.validate_distributed_mode(conf);
createPid(conf);
initShutdownHook();
/**
* 尚未开发?
*/
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
initFollowerThread(conf);
/**
* 在7621端口启动httpServer
*/
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
hs = new Httpserver(port, conf);
hs.start();
initContainerHBThread(conf);
while (!data.isLeader())
Utils.sleep(5000);
initUploadMetricThread(data);
init(conf);
} catch (Throwable e) {
LOG.error("Fail to run nimbus ", e);
} finally {
cleanup();
}
LOG.info("Quit nimbus");
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:51,代码来源:NimbusServer.java
示例12: getInimubs
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public INimbus getInimubs() {
return inimubs;
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:4,代码来源:NimbusData.java
示例13: createNimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception {
// Callback callback=new TimerCallBack();
// StormTimer timer=Timer.mkTimerTimer(callback);
return new NimbusData(conf, inimbus);
}
开发者ID:alibaba,项目名称:jstorm,代码行数:7,代码来源:NimbusServer.java
示例14: NimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings({"unchecked", "rawtypes"})
public NimbusData(final Map conf, INimbus inimbus) throws Exception {
this.conf = conf;
createFileHandler();
mkBlobCacheMap();
this.nimbusHostPortInfo = NimbusInfo.fromConf(conf);
this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo);
this.isLaunchedCleaner = false;
this.isLaunchedMonitor = false;
this.submittedCount = new AtomicInteger(0);
this.stormClusterState = Cluster.mk_storm_cluster_state(conf);
createCache();
this.taskHeartbeatsCache = new ConcurrentHashMap<>();
this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
this.statusTransition = new StatusTransition(this);
this.startTime = TimeUtils.current_time_secs();
this.inimubs = inimbus;
localMode = StormConfig.local_mode(conf);
this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
this.clusterName = ConfigExtension.getClusterName(conf);
pendingSubmitTopologies = new TimeCacheMap<>(JStormUtils.MIN_10);
topologyTaskTimeout = new ConcurrentHashMap<>();
tasksHeartbeat = new ConcurrentHashMap<>();
// init nimbus metric reporter
this.metricsReporter = new JStormMetricsReporter(this);
// metrics thread will be started in NimbusServer
this.metricRunnable = ClusterMetricsRunnable.mkInstance(this);
String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf);
this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass);
if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) {
String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string);
} else {
nimbusNotify = null;
}
}
开发者ID:alibaba,项目名称:jstorm,代码行数:54,代码来源:NimbusData.java
示例15: main
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// read configuration files
Map config = Utils.readStormConfig();
JStormServerUtils.startTaobaoJvmMonitor();
NimbusServer instance = new NimbusServer();
INimbus iNimbus = new DefaultInimbus();
instance.launchServer(config, iNimbus);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:14,代码来源:NimbusServer.java
示例16: launchServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
private void launchServer(final Map conf, INimbus inimbus) {
LOG.info("Begin to start nimbus with conf " + conf);
try {
// 1. check whether mode is distributed or not
StormConfig.validate_distributed_mode(conf);
createPid(conf);
initShutdownHook();
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
initFollowerThread(conf);
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
hs = new Httpserver(port, conf);
hs.start();
initContainerHBThread(conf);
while (!data.isLeader())
Utils.sleep(5000);
initUploadMetricThread(data);
init(conf);
} catch (Throwable e) {
LOG.error("Fail to run nimbus ", e);
} finally {
cleanup();
}
LOG.info("Quit nimbus");
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:39,代码来源:NimbusServer.java
示例17: launcherLocalServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus)
throws Exception {
LOG.info("Begin to start nimbus on local model");
StormConfig.validate_local_mode(conf);
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
init(conf);
return serviceHandler;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:15,代码来源:NimbusServer.java
示例18: NimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public NimbusData(Map conf, TimeCacheMap<Object, Object> downloaders,
TimeCacheMap<Object, Object> uploaders, INimbus inimbus)
throws Exception {
this.conf = conf;
this.downloaders = downloaders;
this.uploaders = uploaders;
this.submittedCount = new AtomicInteger(0);
this.stormClusterState = Cluster.mk_storm_cluster_state(conf);
this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
this.statusTransition = new StatusTransition(this);
this.startTime = TimeUtils.current_time_secs();
this.inimubs = inimbus;
this.groupToTopology = new HashMap<String, Map<String, Map<ThriftResourceType, Integer>>>();
this.groupToResource = new ConcurrentHashMap<String, Map<ThriftResourceType, Integer>>();
this.groupToUsedResource = new ConcurrentHashMap<String, Map<ThriftResourceType, Integer>>();
new ReentrantLock();
localMode = StormConfig.local_mode(conf);
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:32,代码来源:NimbusData.java
示例19: launcherLocalServer
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus) throws Exception {
LOG.info("Begin to start nimbus on local model");
StormConfig.validate_local_mode(conf);
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
data = createNimbusData(conf, inimbus);
init(conf);
return serviceHandler;
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:14,代码来源:NimbusServer.java
示例20: NimbusData
import backtype.storm.scheduler.INimbus; //导入依赖的package包/类
@SuppressWarnings({ "unchecked", "rawtypes" })
public NimbusData(Map conf, TimeCacheMap<Object, Object> downloaders, TimeCacheMap<Object, Object> uploaders,
INimbus inimbus) throws Exception {
this.conf = conf;
this.downloaders = downloaders;
this.uploaders = uploaders;
this.submittedCount = new AtomicInteger(0);
this.stormClusterState = Cluster.mk_storm_cluster_state(conf);
this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
this.statusTransition = new StatusTransition(this);
this.startTime = TimeUtils.current_time_secs();
this.inimubs = inimbus;
this.groupToTopology = new HashMap<String, Map<String, Map<ThriftResourceType, Integer>>>();
this.groupToResource = new ConcurrentHashMap<String, Map<ThriftResourceType, Integer>>();
this.groupToUsedResource = new ConcurrentHashMap<String, Map<ThriftResourceType, Integer>>();
new ReentrantLock();
localMode = StormConfig.local_mode(conf);
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:32,代码来源:NimbusData.java
注:本文中的backtype.storm.scheduler.INimbus类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论