本文整理汇总了Java中backtype.storm.utils.LocalState类的典型用法代码示例。如果您正苦于以下问题:Java LocalState类的具体用法?Java LocalState怎么用?Java LocalState使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
LocalState类属于backtype.storm.utils包,在下文中一共展示了LocalState类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: WorkerHeartbeatRunable
import backtype.storm.utils.LocalState; //导入依赖的package包/类
public WorkerHeartbeatRunable(WorkerData workerData) {
this.workerData = workerData;
this.conf = workerData.getStormConf();
this.worker_id = workerData.getWorkerId();
this.port = workerData.getPort();
this.topologyId = workerData.getTopologyId();
this.task_ids = new CopyOnWriteArraySet<Integer>(workerData.getTaskids());
this.shutdown = workerData.getShutdown();
String key = Config.WORKER_HEARTBEAT_FREQUENCY_SECS;
frequence = JStormUtils.parseInt(conf.get(key), 10);
this.workerStates = new HashMap<String, LocalState>();
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:17,代码来源:WorkerHeartbeatRunable.java
示例2: SyncProcessEvent
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* @param conf
* @param localState
* @param workerThreadPids
* @param supervisorId
* @param sharedContext
* @param workerThreadPidsReadLock
* @param workerThreadPidsWriteLock
*/
public SyncProcessEvent(String supervisorId, Map conf,
LocalState localState,
ConcurrentHashMap<String, String> workerThreadPids,
IContext sharedContext) {
this.supervisorId = supervisorId;
this.conf = conf;
this.localState = localState;
this.workerThreadPids = workerThreadPids;
// right now, sharedContext is null
this.sharedContext = sharedContext;
this.sandBoxMaker = new SandBoxMaker(conf);
if (ConfigExtension.isEnableCgroup(conf)) {
cgroupManager = new CgroupManager(conf);
}
}
开发者ID:greeenSY,项目名称:Tstream,代码行数:32,代码来源:SyncProcessEvent.java
示例3: SyncProcessEvent
import backtype.storm.utils.LocalState; //导入依赖的package包/类
public SyncProcessEvent(String supervisorId, Map conf, LocalState localState,
ConcurrentHashMap<String, String> workerThreadPids,
IContext sharedContext, WorkerReportError workerReportError,
StormClusterState stormClusterState) {
this.supervisorId = supervisorId;
this.conf = conf;
this.localState = localState;
this.workerThreadPids = workerThreadPids;
// right now, sharedContext is null
this.sharedContext = sharedContext;
this.sandBoxMaker = new SandBoxMaker(conf);
this.workerIdToStartTimeAndPort = new HashMap<>();
this.needDownloadTopologies = new AtomicReference<>();
this.isJstormOnYarn = JStormUtils.parseBoolean(System.getProperty("jstorm-on-yarn"), false) ||
ConfigExtension.isJStormOnYarn(conf);
if (ConfigExtension.isEnableCgroup(conf)) {
cgroupManager = new CgroupManager(conf);
}
this.killingWorkers = new HashMap<>();
this.upgradingTopologyPorts = new HashMap<>();
this.workerReportError = workerReportError;
this.stormClusterState = stormClusterState;
}
开发者ID:alibaba,项目名称:jstorm,代码行数:25,代码来源:SyncProcessEvent.java
示例4: Heartbeat
import backtype.storm.utils.LocalState; //导入依赖的package包/类
@SuppressWarnings({"rawtypes", "unchecked"})
public Heartbeat(Map conf, StormClusterState stormClusterState, String supervisorId,
LocalState localState) {
String myHostName = JStormServerUtils.getHostName(conf);
this.stormClusterState = stormClusterState;
this.supervisorId = supervisorId;
this.conf = conf;
this.myHostName = myHostName;
this.startTime = TimeUtils.current_time_secs();
this.frequency = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
this.hbUpdateTrigger = new AtomicBoolean(true);
this.localState = localState;
this.healthStatus = HealthStatus.INFO;
initSupervisorInfo(conf);
LOG.info("Successfully inited supervisor heartbeat thread, " + supervisorInfo);
}
开发者ID:alibaba,项目名称:jstorm,代码行数:19,代码来源:Heartbeat.java
示例5: readWorkerHeartbeat
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get worker heartbeat by workerid
*
* @param conf
* @param workerId
* @returns WorkerHeartbeat
* @throws IOException
*/
public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId)
throws Exception {
try {
LocalState ls = StormConfig.worker_state(conf, workerId);
return (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
} catch (IOException e) {
LOG.error("Failed to get worker Heartbeat", e);
return null;
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:22,代码来源:SyncProcessEvent.java
示例6: SyncSupervisorEvent
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* @param conf
* @param processEventManager
* @param syncSupEventManager
* @param stormClusterState
* @param supervisorId
* @param localState
* @param syncProcesses
*/
public SyncSupervisorEvent(String supervisorId, Map conf,
EventManager processEventManager, EventManager syncSupEventManager,
StormClusterState stormClusterState, LocalState localState,
SyncProcessEvent syncProcesses) {
this.syncProcesses = syncProcesses;
this.processEventManager = processEventManager;
this.syncSupEventManager = syncSupEventManager;
this.stormClusterState = stormClusterState;
this.conf = conf;
this.supervisorId = supervisorId;
this.localState = localState;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:24,代码来源:SyncSupervisorEvent.java
示例7: worker_state
import backtype.storm.utils.LocalState; //导入依赖的package包/类
public static LocalState worker_state(Map conf, String id)
throws IOException {
String path = worker_heartbeats_root(conf, id);
LocalState rtn = new LocalState(path);
return rtn;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:9,代码来源:StormConfig.java
示例8: readWorkerHeartbeat
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get worker heartbeat by workerid
*
* @param conf
* @param workerId
* @returns WorkerHeartbeat
* @throws IOException
*/
public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) throws Exception {
try {
LocalState ls = StormConfig.worker_state(conf, workerId);
return (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
} catch (Exception e) {
LOG.error("Failed to get worker Heartbeat", e);
return null;
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:21,代码来源:SyncProcessEvent.java
示例9: SyncSupervisorEvent
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* @param conf
* @param processEventManager
* @param syncSupEventManager
* @param stormClusterState
* @param supervisorId
* @param localState
* @param syncProcesses
*/
public SyncSupervisorEvent(String supervisorId, Map conf, EventManager processEventManager, EventManager syncSupEventManager,
StormClusterState stormClusterState, LocalState localState, SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
this.syncProcesses = syncProcesses;
this.processEventManager = processEventManager;
this.syncSupEventManager = syncSupEventManager;
this.stormClusterState = stormClusterState;
this.conf = conf;
this.supervisorId = supervisorId;
this.localState = localState;
this.heartbeat = heartbeat;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:22,代码来源:SyncSupervisorEvent.java
示例10: getWorkerState
import backtype.storm.utils.LocalState; //导入依赖的package包/类
private LocalState getWorkerState() throws IOException {
LocalState state = workerStates.get(worker_id);
if (state == null) {
state = StormConfig.worker_state(conf, worker_id);
workerStates.put(worker_id, state);
}
return state;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:9,代码来源:WorkerHeartbeatRunable.java
示例11: readWorkerHeartbeat
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get worker heartbeat by workerId
*
* @param conf conf
* @param workerId worker id
* @return WorkerHeartbeat
*/
public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) throws Exception {
try {
LocalState ls = StormConfig.worker_state(conf, workerId);
return (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
} catch (Exception e) {
LOG.error("Failed to get heartbeat for worker:{}", workerId, e);
return null;
}
}
开发者ID:alibaba,项目名称:jstorm,代码行数:17,代码来源:SyncProcessEvent.java
示例12: SyncSupervisorEvent
import backtype.storm.utils.LocalState; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public SyncSupervisorEvent(String supervisorId, Map conf, EventManager syncSupEventManager,
StormClusterState stormClusterState, LocalState localState,
SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
this.syncProcesses = syncProcesses;
this.syncSupEventManager = syncSupEventManager;
this.stormClusterState = stormClusterState;
this.conf = conf;
this.supervisorId = supervisorId;
this.localState = localState;
this.heartbeat = heartbeat;
}
开发者ID:alibaba,项目名称:jstorm,代码行数:13,代码来源:SyncSupervisorEvent.java
示例13: getWorkerState
import backtype.storm.utils.LocalState; //导入依赖的package包/类
private LocalState getWorkerState() throws IOException {
LocalState state = workerStates.get(workerId);
if (state == null) {
state = StormConfig.worker_state(conf, workerId);
workerStates.put(workerId, state);
}
return state;
}
开发者ID:alibaba,项目名称:jstorm,代码行数:9,代码来源:WorkerHeartbeatRunable.java
示例14: doHeartbeat
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* do heartbeat and update LocalState
*/
public void doHeartbeat() throws IOException {
int curTime = TimeUtils.current_time_secs();
WorkerHeartbeat hb = new WorkerHeartbeat(curTime, topologyId, taskIds, port);
LOG.debug("Doing heartbeat:" + workerId + ",port:" + port + ",hb" + hb.toString());
LocalState state = getWorkerState();
state.put(Common.LS_WORKER_HEARTBEAT, hb);
}
开发者ID:alibaba,项目名称:jstorm,代码行数:13,代码来源:WorkerHeartbeatRunable.java
示例15: LocalStateShim
import backtype.storm.utils.LocalState; //导入依赖的package包/类
public LocalStateShim(String localDir) throws IOException {
_state = new LocalState(localDir);
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:4,代码来源:LocalStateShim.java
示例16: SyncProcessEvent
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* @param conf
* @param localState
* @param workerThreadPids
* @param supervisorId
* @param sharedContext
* @param workerThreadPidsReadLock
* @param workerThreadPidsWriteLock
*/
public SyncProcessEvent(String supervisorId, Map conf,
LocalState localState,
ConcurrentHashMap<String, String> workerThreadPids,
IContext sharedContext) {
this.supervisorId = supervisorId;
this.conf = conf;
this.localState = localState;
this.workerThreadPids = workerThreadPids;
// right now, sharedContext is null
this.sharedContext = sharedContext;
this.sandBoxMaker = new SandBoxMaker(conf);
this.workerIdToStartTimeAndPort = new HashMap<String, Pair<Integer, Integer>>();
if (ConfigExtension.isEnableCgroup(conf)) {
cgroupManager = new CgroupManager(conf);
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:34,代码来源:SyncProcessEvent.java
示例17: getLocalWorkerStats
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get localstat approved workerId's map
*
* @return Map<workerid [workerheart, state]> [workerheart, state] is also a
* map, key is "workheartbeat" and "state"
* @param conf
* @param localState
* @param assignedTasks
* @throws IOException
* @pdOid 11c9bebb-d082-4c51-b323-dd3d5522a649
*/
public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf,
LocalState localState, Map<Integer, LocalAssignment> assignedTasks)
throws Exception {
Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>();
int now = TimeUtils.current_time_secs();
/**
* Get Map<workerId, WorkerHeartbeat> from
* local_dir/worker/ids/heartbeat
*/
Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat
.entrySet()) {
String workerid = entry.getKey().toString();
WorkerHeartbeat whb = entry.getValue();
State state = null;
if (whb == null) {
state = State.notStarted;
} else if (matchesAssignment(whb, assignedTasks) == false) {
// workerId isn't approved or
// isn't assigned task
state = State.disallowed;
} else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf
.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {//
state = State.timedOut;
} else {
if (isWorkerDead(workerid)) {
state = State.timedOut;
}else {
state = State.valid;
}
}
if (state != State.valid) {
LOG.info("Worker:" + workerid + " state:" + state
+ " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
} else {
LOG.debug("Worker:" + workerid + " state:" + state
+ " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
}
workeridHbstate.put(workerid, new StateHeartbeat(state, whb));
}
return workeridHbstate;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:71,代码来源:SyncProcessEvent.java
示例18: getLocalWorkerStats
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get localstat approved workerId's map
*
* @return Map<workerid [workerheart, state]> [workerheart, state] is also a
* map, key is "workheartbeat" and "state"
* @param conf
* @param localState
* @param assignedTasks
* @throws IOException
* @pdOid 11c9bebb-d082-4c51-b323-dd3d5522a649
*/
@SuppressWarnings("unchecked")
public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf,
LocalState localState, Map<Integer, LocalAssignment> assignedTasks)
throws Exception {
Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>();
int now = TimeUtils.current_time_secs();
/**
* Get Map<workerId, WorkerHeartbeat> from
* local_dir/worker/ids/heartbeat
*/
Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat
.entrySet()) {
String workerid = entry.getKey().toString();
WorkerHeartbeat whb = entry.getValue();
State state = null;
if (whb == null) {
state = State.notStarted;
} else if (matchesAssignment(whb, assignedTasks) == false) {
// workerId isn't approved or
// isn't assigned task
state = State.disallowed;
} else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf
.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {//
state = State.timedOut;
} else {
if (isWorkerDead(workerid)) {
state = State.timedOut;
}else {
state = State.valid;
}
}
if (state != State.valid) {
LOG.info("Worker:" + workerid + " state:" + state
+ " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
} else {
LOG.debug("Worker:" + workerid + " state:" + state
+ " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
}
workeridHbstate.put(workerid, new StateHeartbeat(state, whb));
}
return workeridHbstate;
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:72,代码来源:SyncProcessEvent.java
示例19: getLocalWorkerStats
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get localstat approved workerId's map
*
* @return Map<workerid [workerheart, state]> [workerheart, state] is also a
* map, key is "workheartbeat" and "state"
* @param conf
* @param localState
* @param assignedTasks
* @throws IOException
* @pdOid 11c9bebb-d082-4c51-b323-dd3d5522a649
*/
@SuppressWarnings("unchecked")
public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf,
LocalState localState, Map<Integer, LocalAssignment> assignedTasks)
throws Exception {
Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>();
int now = TimeUtils.current_time_secs();
/**
* Get Map<workerId, WorkerHeartbeat> from
* local_dir/worker/ids/heartbeat
*/
Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat
.entrySet()) {
String workerid = entry.getKey().toString();
WorkerHeartbeat whb = entry.getValue();
State state = null;
if (whb == null) {
state = State.notStarted;
} else if (matchesAssignment(whb, assignedTasks) == false) {
// workerId isn't approved or
// isn't assigned task
state = State.disallowed;
} else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf
.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {//
state = State.timedOut;
} else {
state = State.valid;
}
if (state != State.valid) {
LOG.info("Worker:" + workerid + " state:" + state
+ " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
} else {
LOG.debug("Worker:" + workerid + " state:" + state
+ " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
}
workeridHbstate.put(workerid, new StateHeartbeat(state, whb));
}
return workeridHbstate;
}
开发者ID:greeenSY,项目名称:Tstream,代码行数:68,代码来源:SyncProcessEvent.java
示例20: getLocalWorkerStats
import backtype.storm.utils.LocalState; //导入依赖的package包/类
/**
* get localStat of approved workerId's map
*
* @return Map[workerId, [worker heartbeat, state]]
*/
public Map<String, StateHeartbeat> getLocalWorkerStats(
Map conf, LocalState localState, Map<Integer, LocalAssignment> assignedTasks) throws Exception {
Map<String, StateHeartbeat> workerIdHbState = new HashMap<>();
int now = TimeUtils.current_time_secs();
/**
* Get Map<workerId, WorkerHeartbeat> from local_dir/worker/ids/heartbeat
*/
Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
String workerId = entry.getKey();
WorkerHeartbeat whb = entry.getValue();
State state;
if (whb == null) {
state = State.notStarted;
Pair<Integer, Integer> timeToPort = this.workerIdToStartTimeAndPort.get(workerId);
if (timeToPort != null) {
LocalAssignment localAssignment = assignedTasks.get(timeToPort.getSecond());
if (localAssignment == null) {
LOG.info("Following worker doesn't exist in the assignment, remove port=" + timeToPort.getSecond());
state = State.disallowed;
// workerId is disallowed ,so remove it from workerIdToStartTimeAndPort
Integer port = this.workerIdToStartTimeAndPort.get(workerId).getSecond();
this.workerIdToStartTimeAndPort.remove(workerId);
this.portToWorkerId.remove(port);
}
}
} else if (!matchesAssignment(whb, assignedTasks)) {
// workerId isn't approved or isn't assigned task
state = State.disallowed;
} else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
if (!killingWorkers.containsKey(workerId)) {
String outTimeInfo = " it is likely to be out of memory, the worker heartbeat has timed out ";
workerReportError.report(whb.getTopologyId(), whb.getPort(),
whb.getTaskIds(), outTimeInfo, ErrorConstants.CODE_WORKER_TIMEOUT);
}
state = State.timedOut;
} else {
if (isWorkerDead(workerId)) {
if (!killingWorkers.containsKey(workerId)) {
String workerDeadInfo = "Worker is dead ";
workerReportError.report(whb.getTopologyId(), whb.getPort(),
whb.getTaskIds(), workerDeadInfo, ErrorConstants.CODE_WORKER_DEAD);
}
state = State.timedOut;
} else {
state = State.valid;
}
}
if (state != State.valid) {
if (!killingWorkers.containsKey(workerId))
LOG.info("Worker:" + workerId + " state:" + state + " WorkerHeartbeat:" + whb +
" assignedTasks:" + assignedTasks + " at supervisor time-secs " + now);
} else {
LOG.debug("Worker:" + workerId + " state:" + state + " WorkerHeartbeat: " + whb
+ " at supervisor time-secs " + now);
}
workerIdHbState.put(workerId, new StateHeartbeat(state, whb));
}
return workerIdHbState;
}
开发者ID:alibaba,项目名称:jstorm,代码行数:72,代码来源:SyncProcessEvent.java
注:本文中的backtype.storm.utils.LocalState类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论