本文整理汇总了Java中org.apache.cassandra.utils.ConcurrentBiMap类的典型用法代码示例。如果您正苦于以下问题:Java ConcurrentBiMap类的具体用法?Java ConcurrentBiMap怎么用?Java ConcurrentBiMap使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ConcurrentBiMap类属于org.apache.cassandra.utils包,在下文中一共展示了ConcurrentBiMap类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: processDocument
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
public Document[] processDocument(Document document) {
T doc = convertDoc(document);
ConcurrentBiMap<Document, T> oldBatch = null;
int size;
synchronized (batchLock) {
if (this.batch.size() >= batchSize) {
oldBatch = takeBatch();
}
size = this.batch.size();
this.batch.put(document, doc);
}
if (oldBatch != null) {
sendBatch(oldBatch);
}
if (scheduledSend != null) {
scheduledSend.cancel(false);
}
scheduledSend = sender.schedule(() -> sendBatch(takeBatch()), sendPartialBatchAfterMs, TimeUnit.MILLISECONDS);
log.info(Status.BATCHED.getMarker(), "{} queued in postition {} for sending to solr. " +
"Will be sent within {} milliseconds.", document.getId(), size, sendPartialBatchAfterMs);
return new Document[0];
}
开发者ID:nsoft,项目名称:jesterj,代码行数:25,代码来源:BatchProcessor.java
示例2: sendBatch
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
private void sendBatch(ConcurrentBiMap<Document, T> oldBatch) {
// there's a small window where the same BiMap could be grabbed by a timer and a full batch causing a double
// send. Thus we have a lock to ensure that the oldBatch.clear() in the finally is called
// before the second thread tries to send the same batch. We tolerate this because it means batches can fill up
// while sending is in progress.
synchronized (sendLock) {
if (oldBatch.size() == 0) {
return;
}
try {
batchOperation(oldBatch);
} catch (Exception e) {
// we may have a single bad document...
//noinspection ConstantConditions
if (exceptionIndicatesDocumentIssue(e)) {
individualFallbackOperation(oldBatch, e);
} else {
perDocumentFailure(oldBatch, e);
}
} finally {
ThreadContext.remove(JesterJAppender.JJ_INGEST_DOCID);
ThreadContext.remove(JesterJAppender.JJ_INGEST_SOURCE_SCANNER);
oldBatch.clear();
}
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:27,代码来源:BatchProcessor.java
示例3: individualFallbackOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
@Override
protected void individualFallbackOperation(ConcurrentBiMap<Document, ActionRequest> oldBatch, Exception e) {
Map<ActionFuture, ActionRequest> futures = new HashMap<>();
for (ActionRequest request : oldBatch.values()) {
if (request instanceof UpdateRequest) {
futures.put(getClient().update((UpdateRequest) request), request);
} else if (request instanceof DeleteRequest) {
futures.put(getClient().delete((DeleteRequest) request), request);
} else if (request instanceof IndexRequest) {
futures.put(getClient().index((IndexRequest) request), request);
} else {
throw new IllegalStateException("Should only have generated index, update and delete " +
"actions, but found" + request.getClass());
}
}
for (ActionFuture individualRetry : futures.keySet()) {
handleRetryResult(e, futures, individualRetry, oldBatch);
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:21,代码来源:ElasticSender.java
示例4: batchOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
@Override
protected void batchOperation(ConcurrentBiMap<Document, ActionRequest> oldBatch) throws Exception {
BulkRequestBuilder builder = getClient().prepareBulk();
for (ActionRequest request : oldBatch.values()) {
if (request instanceof UpdateRequest) {
builder.add((UpdateRequest) request);
} else if (request instanceof DeleteRequest) {
builder.add((DeleteRequest) request);
} else if (request instanceof IndexRequest) {
builder.add((IndexRequest) request);
} else {
throw new IllegalStateException("Should only have generated index, update and delete " +
"actions, but found" + request.getClass());
}
}
BulkResponse bulkResponse = builder.get();
if (bulkResponse.hasFailures()) {
throw new ESBulkFail();
} else {
for (Document doc : oldBatch.keySet()) {
log.info("Successfully sent {} to elastic", doc.getId());
}
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:25,代码来源:ElasticSender.java
示例5: individualFallbackOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
@Override
protected void individualFallbackOperation(ConcurrentBiMap<Document, SolrInputDocument> oldBatch, Exception e) {
// TODO: send in bisected batches to avoid massive traffic down due to one doc when batches are large
for (Document document : oldBatch.keySet()) {
putIdInThreadContext(document);
try {
SolrInputDocument doc = oldBatch.get(document);
if (doc instanceof Delete) {
getSolrClient().deleteById(oldBatch.inverse().get(doc).getId());
log().info(Status.INDEXED.getMarker(), "{} deleted from solr successfully", document.getId());
} else {
getSolrClient().add(doc);
log().info(Status.INDEXED.getMarker(), "{} sent to solr successfully", document.getId());
}
} catch (IOException | SolrServerException e1) {
log().info(Status.ERROR.getMarker(), "{} could not be sent to solr because of {}", document.getId(), e1.getMessage());
log().error("Error sending to with solr!", e1);
}
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:21,代码来源:SendToSolrCloudProcessor.java
示例6: testPerBatchOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
@Test
public void testPerBatchOperation() throws IOException, SolrServerException {
ConcurrentBiMap<Document, SolrInputDocument> biMap = expect3Docs();
expect(proc.getParams()).andReturn(null).anyTimes();
expect(proc.getSolrClient()).andReturn(solrClientMock).anyTimes();
Capture<List<SolrInputDocument>> addCap = newCapture();
Capture<List<String>> delCap = newCapture();
expect(solrClientMock.add(capture(addCap))).andReturn(updateResponseMock);
expect(solrClientMock.deleteById(capture(delCap))).andReturn(deleteResponseMock);
expectLogging();
replay();
proc.batchOperation(biMap);
assertEquals("42", delCap.getValue().get(0));
assertTrue(addCap.getValue().contains(inputDocMock));
assertFalse(addCap.getValue().contains(inputDocMock2));
assertTrue(addCap.getValue().contains(inputDocMock3));
}
开发者ID:nsoft,项目名称:jesterj,代码行数:18,代码来源:SendToSolrCloudProcessorTest.java
示例7: takeBatch
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
private ConcurrentBiMap<Document, T> takeBatch() {
synchronized (batchLock) {
ConcurrentBiMap<Document, T> oldBatch = this.batch;
this.batch = new ConcurrentBiMap<>();
log.info("took batch {} with size {}",oldBatch.toString(), oldBatch.size());
return oldBatch;
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:9,代码来源:BatchProcessor.java
示例8: perDocumentFailure
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
protected void perDocumentFailure(ConcurrentBiMap<Document, ?> oldBatch, Exception e) {
// something's wrong with the network etc all documents must be errored out:
for (Document doc : oldBatch.keySet()) {
putIdInThreadContext(doc);
perDocFailLogging(e, doc);
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:8,代码来源:BatchProcessor.java
示例9: handleRetryResult
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
void handleRetryResult(Exception e, Map<ActionFuture, ActionRequest> futures, ActionFuture individualRetry, ConcurrentBiMap<Document, ActionRequest> oldBatch) {
ActionRequest request = futures.get(individualRetry);
Document document = oldBatch.inverse().get(request);
String id = document.getId();
putIdInThreadContext(document);
try {
ActionWriteResponse resp = (ActionWriteResponse) individualRetry.actionGet();
checkResponse(document, resp);
} catch (Exception ex) {
log.info(Status.ERROR.getMarker(), "{} could not be sent to elastic because of {}", id, ex.getMessage());
log.error("Error sending to elastic!", e);
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:14,代码来源:ElasticSender.java
示例10: batchOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
@Override
protected void batchOperation(ConcurrentBiMap<Document, SolrInputDocument> oldBatch) throws SolrServerException, IOException {
List<String> deletes = oldBatch.keySet().stream()
.filter(doc -> doc.getOperation() == Document.Operation.DELETE)
.map(Document::getId)
.collect(Collectors.toList());
if (deletes.size() > 0) {
getSolrClient().deleteById(deletes);
}
List<SolrInputDocument> adds = oldBatch.keySet().stream()
.filter(doc -> doc.getOperation() != Document.Operation.DELETE)
.map(oldBatch::get)
.collect(Collectors.toList());
if (adds.size() > 0) {
Map<String, String> params = getParams();
if (params == null) {
getSolrClient().add(adds);
} else {
UpdateRequest req = new UpdateRequest();
req.add(adds);
// always true right now, but pattern for addtional global params...
for (String s : params.keySet()) {
req.setParam(s, params.get(s));
}
getSolrClient().request(req);
}
}
for (Document document : oldBatch.keySet()) {
putIdInThreadContext(document);
if (document.getOperation() == Document.Operation.DELETE) {
log().info(Status.INDEXED.getMarker(), "{} deleted from solr successfully", document.getId());
} else {
log().info(Status.INDEXED.getMarker(), "{} sent to solr successfully", document.getId());
}
}
}
开发者ID:nsoft,项目名称:jesterj,代码行数:37,代码来源:SendToSolrCloudProcessor.java
示例11: testPerBatchOperationWithChain
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
@Test
public void testPerBatchOperationWithChain() throws IOException, SolrServerException {
ConcurrentBiMap<Document, SolrInputDocument> biMap = expect3Docs();
Map<String,String> params = new HashMap<>();
params.put("update.chain", "myCustomChain");
expect(proc.getParams()).andReturn(params).anyTimes();
expect(proc.getSolrClient()).andReturn(solrClientMock).anyTimes();
Capture<UpdateRequest> addCap = newCapture();
Capture<List<String>> delCap = newCapture();
//noinspection ConstantConditions
expect(solrClientMock.request(capture(addCap), eq(null))).andReturn(namedListMock);
expect(solrClientMock.deleteById(capture(delCap))).andReturn(deleteResponseMock);
expectLogging();
replay();
proc.batchOperation(biMap);
assertEquals("42", delCap.getValue().get(0));
UpdateRequest updateRequest = addCap.getValue();
List<SolrInputDocument> documents = updateRequest.getDocuments();
assertTrue(documents.contains(inputDocMock));
assertFalse(documents.contains(inputDocMock2));
assertTrue(documents.contains(inputDocMock3));
ModifiableSolrParams reqParams = updateRequest.getParams();
assertEquals("myCustomChain", reqParams.get("update.chain"));
}
开发者ID:nsoft,项目名称:jesterj,代码行数:29,代码来源:SendToSolrCloudProcessorTest.java
示例12: expect3Docs
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
private ConcurrentBiMap<Document, SolrInputDocument> expect3Docs() {
ConcurrentBiMap<Document, SolrInputDocument> biMap = new ConcurrentBiMap<>();
biMap.put(docMock, inputDocMock);
biMap.put(docMock2, inputDocMock2);
biMap.put(docMock3, inputDocMock3);
expect(docMock.getOperation()).andReturn(Document.Operation.NEW).anyTimes();
expect(docMock2.getOperation()).andReturn(Document.Operation.DELETE).anyTimes();
expect(docMock3.getOperation()).andReturn(Document.Operation.UPDATE).anyTimes();
expect(docMock.getId()).andReturn("41").anyTimes();
expect(docMock2.getId()).andReturn("42").anyTimes();
expect(docMock3.getId()).andReturn("43").anyTimes();
return biMap;
}
开发者ID:nsoft,项目名称:jesterj,代码行数:14,代码来源:SendToSolrCloudProcessorTest.java
示例13: individualFallbackOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
/**
* If the bulk request fails it might be just one document that's causing a problem, try each document individually
*
* @param oldBatch The batch for which to handle failures. This will have been detached from this object and will
* become eligible for garbage collection after this method returns, so do not add objects to it.
* @param e the exception reported with the failure
*/
protected abstract void individualFallbackOperation(ConcurrentBiMap<Document, T> oldBatch, Exception e);
开发者ID:nsoft,项目名称:jesterj,代码行数:9,代码来源:BatchProcessor.java
示例14: batchOperation
import org.apache.cassandra.utils.ConcurrentBiMap; //导入依赖的package包/类
protected abstract void batchOperation(ConcurrentBiMap<Document, T> documentTConcurrentBiMap) throws Exception;
开发者ID:nsoft,项目名称:jesterj,代码行数:2,代码来源:BatchProcessor.java
注:本文中的org.apache.cassandra.utils.ConcurrentBiMap类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论