欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

springboot2+es7使用RestHighLevelClient

时间:2023-05-04

由于spring和es的集成并不是特别友好,es的高低版本兼容问题、api更新频率高等问题,所以我选择是官网提供的原生Client(RestHighLevelClient),但又不想去关注es的配置类以及和spring的集成配置、jar包冲突等问题,所以使用spring-boot-starter-data-elasticsearch。

一、引入依赖jar

org.springframework.boot spring-boot-starter-data-elasticsearch

二、application.properties配置

spring.elasticsearch.rest.uris=http://127.0.0.1:9200,http://127.0.0.1:9201,http://127.0.0.1:9202spring.elasticsearch.rest.connection-timeout=5sspring.elasticsearch.rest.read-timeout=30slogging.level.org.springframework.data.convert.CustomConversions=error

spring-boot-starter-data-elasticsearch中自动装配es的配置类:ElasticsearchRestClientAutoConfiguration、ElasticsearchRestClientProperties。

ElasticsearchRestClientAutoConfiguration:

@ConditionalOnClass({RestHighLevelClient.class})@ConditionalOnMissingBean({RestClient.class})@EnableConfigurationProperties({ElasticsearchRestClientProperties.class})public class ElasticsearchRestClientAutoConfiguration { @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean({RestHighLevelClient.class}) static class RestHighLevelClientConfiguration { RestHighLevelClientConfiguration() { } @Bean RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) { return new RestHighLevelClient(restClientBuilder); } } @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean({RestClientBuilder.class}) static class RestClientBuilderConfiguration { RestClientBuilderConfiguration() { } @Bean RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) { return new ElasticsearchRestClientAutoConfiguration.DefaultRestClientBuilderCustomizer(properties); } @Bean RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider builderCustomizers) { HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> { return new HttpHost[x$0]; }); RestClientBuilder builder = RestClient.builder(hosts); builder.setHttpClientConfigCallback((httpClientBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(httpClientBuilder); }); return httpClientBuilder; }); builder.setRequestConfigCallback((requestConfigBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(requestConfigBuilder); }); return requestConfigBuilder; }); builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return builder; } private HttpHost createHttpHost(String uri) { try { return this.createHttpHost(URI.create(uri)); } catch (IllegalArgumentException var3) { return HttpHost.create(uri); } } private HttpHost createHttpHost(URI uri) { if (!StringUtils.hasLength(uri.getUserInfo())) { return HttpHost.create(uri.toString()); } else { try { return HttpHost.create((new URI(uri.getScheme(), (String)null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString()); } catch (URISyntaxException var3) { throw new IllegalStateException(var3); } } } }}

ElasticsearchRestClientProperties: 

@ConfigurationProperties( prefix = "spring.elasticsearch.rest")public class ElasticsearchRestClientProperties { private List uris = new ArrayList(Collections.singletonList("http://localhost:9200")); private String username; private String password; private Duration connectionTimeout = Duration.ofSeconds(1L); private Duration readTimeout = Duration.ofSeconds(30L); public ElasticsearchRestClientProperties() { } public List getUris() { return this.uris; } public void setUris(List uris) { this.uris = uris; } public String getUsername() { return this.username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return this.password; } public void setPassword(String password) { this.password = password; } public Duration getConnectionTimeout() { return this.connectionTimeout; } public void setConnectionTimeout(Duration connectionTimeout) { this.connectionTimeout = connectionTimeout; } public Duration getReadTimeout() { return this.readTimeout; } public void setReadTimeout(Duration readTimeout) { this.readTimeout = readTimeout; }}

三、使用 ES基本操作持久层

@Repository@Slf4jpublic class EsRepository { @Resource private RestHighLevelClient highLevelClient; public boolean existIndex(String index) { try { return highLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}", index, e); } return Boolean.FALSE; } public boolean createIndex(String index, Map columnMap) { if (existIndex(index)) { return Boolean.FALSE; } CreateIndexRequest request = new CreateIndexRequest(index); if (columnMap != null && columnMap.size() > 0) { Map source = new HashMap<>(); source.put("properties", columnMap); request.mapping(source); } try { highLevelClient.indices().create(request, RequestOptions.DEFAULT); return Boolean.TRUE; } catch (IOException e) { log.error("es持久层异常!index={}, columnMap={}", index, columnMap, e); } return Boolean.FALSE; } public boolean deleteIndex(String index) { try { if (existIndex(index)) { AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); return response.isAcknowledged(); } } catch (Exception e) { log.error("es持久层异常!index={}", index, e); } return Boolean.FALSE; } public boolean insert(String index, String jsonString) { IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(new Snowflake().nextIdStr()); indexRequest.source(jsonString, XContentType.JSON); try { log.info("indexRequest={}", indexRequest); IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("indexResponse={}", indexResponse); return Boolean.TRUE; } catch (IOException e) { log.error("es持久层异常!index={}, jsonString={}", index, jsonString, e); } return Boolean.FALSE; } public boolean update(String index, Map dataMap) { UpdateRequest updateRequest = new UpdateRequest(index, dataMap.remove("id").toString()); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.doc(dataMap); try { highLevelClient.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}, dataMap={}", index, dataMap, e); return Boolean.FALSE; } return Boolean.TRUE; } public boolean delete(String index, String id) { DeleteRequest deleteRequest = new DeleteRequest(index, id); try { highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}, id={}", index, id, e); return Boolean.FALSE; } return Boolean.TRUE; }}

ES查询持久层

@Repository@Slf4jpublic class EsSearchRepository { @Resource private RestHighLevelClient highLevelClient; public EsQueryRespPO> searchPage(EsQueryReqPO queryPO) { // 默认分页参数设置 if (queryPO.getPageNum() == null) { queryPO.setPageNum(1); } if (queryPO.getPageSize() == null) { queryPO.setPageSize(10); } // 设置索引 SearchRequest searchRequest = new SearchRequest(queryPO.getIndex()); // 封装查询源对象 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder).indicesOptions(IndicesOptions .fromOptions(true, true, true, false)); // 查询条件 sourceBuilder.query(queryPO.getQuery()); // 排序字段 if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) { FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort()); sourceBuilder.sort(order); } // 开始行数,默认0 sourceBuilder.from((queryPO.getPageNum() - 1) * queryPO.getPageSize()); // 页大小,默认10 sourceBuilder.size(queryPO.getPageSize()); // 查询结果 SearchResponse searchResponse = null; try { log.info("es查询请求对象:searchRequest={}", searchRequest); log.info("es查询请求对象source:sourceBuilder={}", searchRequest.source()); // 执行搜索,返回搜索响应信息 searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); log.info("es查询响应结果:searchResponse={}", searchResponse); } catch (IOException e) { log.error("es查询,IO异常!searchRequest={}", searchRequest, e); // 异常处理 return EsQueryRespPO.error("es查询,IO异常!"); } if (RestStatus.OK.equals(searchResponse.status())) { // 解析对象 SearchHit[] hits = searchResponse.getHits().getHits(); // 获取source List> sourceList = Arrays.stream(hits).map(SearchHit::getSourceAsMap).collect(Collectors.toList()); long totalHits = searchResponse.getHits().getTotalHits().value; return EsQueryRespPO.success(sourceList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits); } else { log.error("es查询返回的状态码异常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); return EsQueryRespPO.error("es查询返回的状态码异常!"); } } public EsQueryRespPO searchAggregation(EsQueryReqPO queryPO) { // 设置索引 SearchRequest searchRequest = new SearchRequest(queryPO.getIndex()); // 封装查询源对象 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder).indicesOptions(IndicesOptions .fromOptions(true, true, true, false)); // 查询条件 sourceBuilder.query(queryPO.getQuery()); // 排序字段 if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) { FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort()); sourceBuilder.sort(order); } // 页大小0,只返回聚合结果 sourceBuilder.size(0); // 设置聚合查询,可以设置多个聚合查询条件,只要聚合查询命名不同就行 // 聚合分组条件 sourceBuilder.aggregation(queryPO.getTermsAggregation()); // 聚合统计条件 sourceBuilder.aggregation(queryPO.getCardinalityAggregation()); // 查询结果 SearchResponse searchResponse = null; try { log.info("es查询请求对象:searchRequest={}", searchRequest); log.info("es查询请求对象source:sourceBuilder={}", searchRequest.source()); // 执行搜索,返回搜索响应信息 searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); log.info("es查询响应结果:searchResponse={}", searchResponse); } catch (IOException e) { log.error("es查询,IO异常!searchRequest={}", searchRequest, e); return EsQueryRespPO.error("es查询,IO异常!"); } if (RestStatus.OK.equals(searchResponse.status())) { // 解析对象 Aggregations aggregations = searchResponse.getAggregations(); long docTotal = searchResponse.getHits().getTotalHits().value; // 遍历terms聚合结果 Terms terms = aggregations.get(queryPO.getTermsAggregation().getName()); List bucketList = terms.getBuckets().stream().map(bucket -> { String key = bucket.getKeyAsString(); long docCount = bucket.getDocCount(); return new AggregationBucketPO(key, docCount, docTotal); }).collect(Collectors.toList()); // 总数量 Cardinality cardinality = aggregations.get(queryPO.getCardinalityAggregation().getName()); long totalHits = cardinality.getValue(); return EsQueryRespPO.success(bucketList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits); } else { log.error("es查询返回的状态码异常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); return EsQueryRespPO.error("es查询返回的状态码异常!"); } }}

其中,EsQueryReqPO、EsQueryRespPO、AggregationBucketPO等类如下:

@Datapublic class EsQueryReqPO { String[] index; QueryBuilder query; String sortField; SortOrder sort; private Integer pageNum; private Integer pageSize; private TermsAggregationBuilder termsAggregation; private CardinalityAggregationBuilder cardinalityAggregation;}

@Data@NoArgsConstructor@AllArgsConstructorpublic class EsQueryRespPO { private Boolean success; private String message; private Integer pageNum; private Integer pageSize; private Long totalSize; private List sourceList; public static EsQueryRespPO success(List sourceList, Integer pageNum, Integer pageSize, Long totalSize) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO<>(); esQueryRespPO.setSuccess(true); esQueryRespPO.setSourceList(sourceList); esQueryRespPO.setPageNum(pageNum); esQueryRespPO.setPageSize(pageSize); esQueryRespPO.setTotalSize(totalSize); return esQueryRespPO; } public static EsQueryRespPO error() { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage("es查询异常"); return esQueryRespPO; } public static EsQueryRespPO error(String message) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage(message); return esQueryRespPO; }}

@Data@NoArgsConstructor@AllArgsConstructorpublic class AggregationBucketPO { private String key; private Long docCount; private Long docTotal;}

其它

如果没有用spring-boot-starter-data-elasticsearch来自动注入es组件,那么需要自己做es client的注入,es配置类如下:

@Configurationpublic class EsClientConfig { @Value("${spring.elasticsearch.rest.uris}") private List uris; @Bean public RestHighLevelClient restHighLevelClient() { List httpHostList = uris.stream().map(HttpHost::create).collect(Collectors.toList()); HttpHost[] httpHost = new HttpHost[uris.size()]; httpHostList.toArray(httpHost); RestClientBuilder clientBuilder = RestClient.builder(httpHost); return new RestHighLevelClient(clientBuilder); }}

Snowflake是hutool包里的,导包:

cn.hutool hutool-all 5.7.14

聚合查询的测试用例:

@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTest(classes = StartApplication.class)public class EsTest { @Resource private EsSearchRepository esSearchRepository; @Test public void testSearchAggregation() { // 查询对象的封装 EsQueryReqPO queryPO = new EsQueryReqPO(); queryPO.setIndex(new String[]{"yzh1", "yzh2"}); queryPO.setPageNum(1); queryPO.setPageSize(10); // 时间戳范围 QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("timestamp") .from(System.currentTimeMillis() - 1000) .to(System.currentTimeMillis()); // 登录标识 QueryBuilder queryBuilder2 = QueryBuilders.termQuery("name", "yang"); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder1).must(queryBuilder2); queryPO.setQuery(queryBuilder); // 根据userName分组。创建terms桶聚合,聚合名字=terms_by_userName, 字段=payload.userName.keyword TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("terms_by_userName").field("payload.userName.keyword"); termsAggregationBuilder.size(queryPO.getPageSize() * queryPO.getPageNum()); termsAggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", null) .from((queryPO.getPageNum() - 1) * queryPO.getPageSize()).size(queryPO.getPageSize())); queryPO.setTermsAggregation(termsAggregationBuilder); // 根据userName聚合count统计、cardinality名=count_userName, 字段=payload.userName.keyword CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders .cardinality("count_userName").field("payload.userName.keyword"); queryPO.setCardinalityAggregation(cardinalityAggregationBuilder); // 执行查询 EsQueryRespPO esQueryRespPO = esSearchRepository.searchAggregation(queryPO); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。