As Val said in the comments, you're likely sending more data at a time than your cluster can handle. It looks like you might be trying to send all your documents in one bulk request, which for a lot of documents or large documents may not work.
With _bulk
, you need to send your data to the cluster in several bulk requests and find the optimum number of documents that you can send in each bulk request, in addition to the number of bulk requests that you can send concurrently to your cluster.
There are no hard and fast rules here for the optimum size because it can vary depending on the complexity of your documents, how they are analyzed, the cluster hardware, cluster settings, index settings, etc.
The best thing to do is start with a reasonable number, say 500 documents (or some number that makes sense in your context) in one request, and then go from there. Calculating the total size in bytes of each bulk request is also a good approach to take. If the performance and throughput is insufficient then increase the number of documents, request byte size or concurrent requests until you start seeing es_rejected_execution_exception
.
NEST 5.x ships with a handy helper to make bulk requests much easier, using an IObservable<T>
and the Observable design pattern
void Main()
{
var client = new ElasticClient();
// can cancel the operation by calling .Cancel() on this
var cancellationTokenSource = new CancellationTokenSource();
// set up the bulk all observable
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
// number of concurrent requests
.MaxDegreeOfParallelism(8)
// in case of 429 response, how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(5))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(2)
// number of documents to send in each request
.Size(500)
.Index("index-name")
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
var waitHandle = new ManualResetEvent(false);
Exception ex = null;
// what to do on each call, when an exception is thrown, and
// when the bulk all completes
var bulkAllObserver = new BulkAllObserver(
onNext: bulkAllResponse =>
{
// do something after each bulk request
},
onError: exception =>
{
// do something with exception thrown
ex = exception;
waitHandle.Set();
},
onCompleted: () =>
{
// do something when all bulk operations complete
waitHandle.Set();
});
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for handle to be set.
waitHandle.WaitOne();
if (ex != null)
{
throw ex;
}
}
// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
return Enumerable.Range(1, 10000).Select(x =>
new Document
{
Id = x,
Name = $"Document {x}"
}
);
}
public class Document
{
public int Id { get; set; }
public string Name { get; set; }
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…