1、Elasticsearch的Bulk API允许批量提交index和delete请求。如:
(1)用法1
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("index1", "type1", "id1").setSource(source);
bulkRequest.add(client.prepareIndex("index2", "type2", "id2").setSource(source);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
(2)用法二
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.build();
bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1));
bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");
(3)说明
1)beforeBulk会在批量提交之前执行,可以从BulkRequest中获取请求信息request.requests()或者请求数量request.numberOfActions()。
2) 第一个afterBulk会在批量成功后执行,可以跟beforeBulk配合计算批量所需时间。
3)第二个afterBulk会在批量失败后执行。
4)在例子中,当请求超过10000个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作。