Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
640 views
in Technique[技术] by (71.8m points)

bulkinsert - Elasticsearch bulk insert with NEST returns es_rejected_execution_exception

I am trying to do bulk insert using .Net API in Elasticsearch and this is the error that I am getting while performing the operation;

Error   {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService$6@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""}   Nest.BulkError

Is it due to the low space in my system or the bulk insert function itself is not working? My NEST version is 5.0 and Elasticsearch version is also 5.0.

Code of the bulk insert logic;

public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
    BulkDescriptor descriptor = new BulkDescriptor();            
    foreach (var j in Enumerable.Range(0, recordList.Count)) {
        descriptor.Index<BaseData>(op => op.Document(recordList[j])
                                           .Index(listOfIndexName[j]));
    }
    var result = clientConnection.Bulk(descriptor);
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

As Val said in the comments, you're likely sending more data at a time than your cluster can handle. It looks like you might be trying to send all your documents in one bulk request, which for a lot of documents or large documents may not work.

With _bulk, you need to send your data to the cluster in several bulk requests and find the optimum number of documents that you can send in each bulk request, in addition to the number of bulk requests that you can send concurrently to your cluster.

There are no hard and fast rules here for the optimum size because it can vary depending on the complexity of your documents, how they are analyzed, the cluster hardware, cluster settings, index settings, etc.

The best thing to do is start with a reasonable number, say 500 documents (or some number that makes sense in your context) in one request, and then go from there. Calculating the total size in bytes of each bulk request is also a good approach to take. If the performance and throughput is insufficient then increase the number of documents, request byte size or concurrent requests until you start seeing es_rejected_execution_exception.

NEST 5.x ships with a handy helper to make bulk requests much easier, using an IObservable<T> and the Observable design pattern

void Main()
{
    var client = new ElasticClient();

    // can cancel the operation by calling .Cancel() on this
    var cancellationTokenSource = new CancellationTokenSource();

    // set up the bulk all observable
    var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
        // number of concurrent requests
        .MaxDegreeOfParallelism(8)
        // in case of 429 response, how long we should wait before retrying
        .BackOffTime(TimeSpan.FromSeconds(5))
        // in case of 429 response, how many times to retry before failing
        .BackOffRetries(2)
        // number of documents to send in each request
        .Size(500)
        .Index("index-name")
        .RefreshOnCompleted(),
        cancellationTokenSource.Token
    );

    var waitHandle = new ManualResetEvent(false);
    Exception ex = null;

    // what to do on each call, when an exception is thrown, and 
    // when the bulk all completes
    var bulkAllObserver = new BulkAllObserver(
        onNext: bulkAllResponse =>
        {
            // do something after each bulk request
        },
        onError: exception =>
        {
            // do something with exception thrown
            ex = exception;
            waitHandle.Set();
        },
        onCompleted: () =>
        {
            // do something when all bulk operations complete
            waitHandle.Set();
        });

    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait for handle to be set.
    waitHandle.WaitOne();

    if (ex != null)
    {
        throw ex;
    }
}

// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
    return Enumerable.Range(1, 10000).Select(x =>
        new Document
        {
            Id = x,
            Name = $"Document {x}" 
        }
    );
}

public class Document
{
    public int Id { get; set; }
    public string Name { get; set; }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...