由于spring和es的集成并不是特别友好,es的高低版本兼容问题、api更新频率高等问题,所以我选择是官网提供的原生Client(RestHighLevelClient),但又不想去关注es的配置类以及和spring的集成配置、jar包冲突等问题,所以使用spring-boot-starter-data-elasticsearch。
一、引入依赖jar
org.springframework.boot spring-boot-starter-data-elasticsearch
二、application.properties配置
spring.elasticsearch.rest.uris=http://127.0.0.1:9200,http://127.0.0.1:9201,http://127.0.0.1:9202spring.elasticsearch.rest.connection-timeout=5sspring.elasticsearch.rest.read-timeout=30slogging.level.org.springframework.data.convert.CustomConversions=error
spring-boot-starter-data-elasticsearch中自动装配es的配置类:ElasticsearchRestClientAutoConfiguration、ElasticsearchRestClientProperties。
ElasticsearchRestClientAutoConfiguration:
@ConditionalOnClass({RestHighLevelClient.class})@ConditionalOnMissingBean({RestClient.class})@EnableConfigurationProperties({ElasticsearchRestClientProperties.class})public class ElasticsearchRestClientAutoConfiguration { @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean({RestHighLevelClient.class}) static class RestHighLevelClientConfiguration { RestHighLevelClientConfiguration() { } @Bean RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) { return new RestHighLevelClient(restClientBuilder); } } @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean({RestClientBuilder.class}) static class RestClientBuilderConfiguration { RestClientBuilderConfiguration() { } @Bean RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) { return new ElasticsearchRestClientAutoConfiguration.DefaultRestClientBuilderCustomizer(properties); } @Bean RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider builderCustomizers) { HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> { return new HttpHost[x$0]; }); RestClientBuilder builder = RestClient.builder(hosts); builder.setHttpClientConfigCallback((httpClientBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(httpClientBuilder); }); return httpClientBuilder; }); builder.setRequestConfigCallback((requestConfigBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(requestConfigBuilder); }); return requestConfigBuilder; }); builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return builder; } private HttpHost createHttpHost(String uri) { try { return this.createHttpHost(URI.create(uri)); } catch (IllegalArgumentException var3) { return HttpHost.create(uri); } } private HttpHost createHttpHost(URI uri) { if (!StringUtils.hasLength(uri.getUserInfo())) { return HttpHost.create(uri.toString()); } else { try { return HttpHost.create((new URI(uri.getScheme(), (String)null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString()); } catch (URISyntaxException var3) { throw new IllegalStateException(var3); } } } }}
ElasticsearchRestClientProperties:
@ConfigurationProperties( prefix = "spring.elasticsearch.rest")public class ElasticsearchRestClientProperties { private List uris = new ArrayList(Collections.singletonList("http://localhost:9200")); private String username; private String password; private Duration connectionTimeout = Duration.ofSeconds(1L); private Duration readTimeout = Duration.ofSeconds(30L); public ElasticsearchRestClientProperties() { } public List getUris() { return this.uris; } public void setUris(List uris) { this.uris = uris; } public String getUsername() { return this.username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return this.password; } public void setPassword(String password) { this.password = password; } public Duration getConnectionTimeout() { return this.connectionTimeout; } public void setConnectionTimeout(Duration connectionTimeout) { this.connectionTimeout = connectionTimeout; } public Duration getReadTimeout() { return this.readTimeout; } public void setReadTimeout(Duration readTimeout) { this.readTimeout = readTimeout; }}
三、使用 ES基本操作持久层
@Repository@Slf4jpublic class EsRepository { @Resource private RestHighLevelClient highLevelClient; public boolean existIndex(String index) { try { return highLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}", index, e); } return Boolean.FALSE; } public boolean createIndex(String index, Map columnMap) { if (existIndex(index)) { return Boolean.FALSE; } CreateIndexRequest request = new CreateIndexRequest(index); if (columnMap != null && columnMap.size() > 0) { Map source = new HashMap<>(); source.put("properties", columnMap); request.mapping(source); } try { highLevelClient.indices().create(request, RequestOptions.DEFAULT); return Boolean.TRUE; } catch (IOException e) { log.error("es持久层异常!index={}, columnMap={}", index, columnMap, e); } return Boolean.FALSE; } public boolean deleteIndex(String index) { try { if (existIndex(index)) { AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); return response.isAcknowledged(); } } catch (Exception e) { log.error("es持久层异常!index={}", index, e); } return Boolean.FALSE; } public boolean insert(String index, String jsonString) { IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(new Snowflake().nextIdStr()); indexRequest.source(jsonString, XContentType.JSON); try { log.info("indexRequest={}", indexRequest); IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("indexResponse={}", indexResponse); return Boolean.TRUE; } catch (IOException e) { log.error("es持久层异常!index={}, jsonString={}", index, jsonString, e); } return Boolean.FALSE; } public boolean update(String index, Map dataMap) { UpdateRequest updateRequest = new UpdateRequest(index, dataMap.remove("id").toString()); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.doc(dataMap); try { highLevelClient.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}, dataMap={}", index, dataMap, e); return Boolean.FALSE; } return Boolean.TRUE; } public boolean delete(String index, String id) { DeleteRequest deleteRequest = new DeleteRequest(index, id); try { highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}, id={}", index, id, e); return Boolean.FALSE; } return Boolean.TRUE; }}
ES查询持久层
@Repository@Slf4jpublic class EsSearchRepository { @Resource private RestHighLevelClient highLevelClient; public EsQueryRespPO
其中,EsQueryReqPO、EsQueryRespPO、AggregationBucketPO等类如下:
@Datapublic class EsQueryReqPO { String[] index; QueryBuilder query; String sortField; SortOrder sort; private Integer pageNum; private Integer pageSize; private TermsAggregationBuilder termsAggregation; private CardinalityAggregationBuilder cardinalityAggregation;}
@Data@NoArgsConstructor@AllArgsConstructorpublic class EsQueryRespPO { private Boolean success; private String message; private Integer pageNum; private Integer pageSize; private Long totalSize; private List sourceList; public static EsQueryRespPO success(List sourceList, Integer pageNum, Integer pageSize, Long totalSize) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO<>(); esQueryRespPO.setSuccess(true); esQueryRespPO.setSourceList(sourceList); esQueryRespPO.setPageNum(pageNum); esQueryRespPO.setPageSize(pageSize); esQueryRespPO.setTotalSize(totalSize); return esQueryRespPO; } public static EsQueryRespPO error() { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage("es查询异常"); return esQueryRespPO; } public static EsQueryRespPO error(String message) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage(message); return esQueryRespPO; }}
@Data@NoArgsConstructor@AllArgsConstructorpublic class AggregationBucketPO { private String key; private Long docCount; private Long docTotal;}
其它
如果没有用spring-boot-starter-data-elasticsearch来自动注入es组件,那么需要自己做es client的注入,es配置类如下:
@Configurationpublic class EsClientConfig { @Value("${spring.elasticsearch.rest.uris}") private List uris; @Bean public RestHighLevelClient restHighLevelClient() { List httpHostList = uris.stream().map(HttpHost::create).collect(Collectors.toList()); HttpHost[] httpHost = new HttpHost[uris.size()]; httpHostList.toArray(httpHost); RestClientBuilder clientBuilder = RestClient.builder(httpHost); return new RestHighLevelClient(clientBuilder); }}
Snowflake是hutool包里的,导包:
cn.hutool hutool-all 5.7.14
聚合查询的测试用例:
@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTest(classes = StartApplication.class)public class EsTest { @Resource private EsSearchRepository esSearchRepository; @Test public void testSearchAggregation() { // 查询对象的封装 EsQueryReqPO queryPO = new EsQueryReqPO(); queryPO.setIndex(new String[]{"yzh1", "yzh2"}); queryPO.setPageNum(1); queryPO.setPageSize(10); // 时间戳范围 QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("timestamp") .from(System.currentTimeMillis() - 1000) .to(System.currentTimeMillis()); // 登录标识 QueryBuilder queryBuilder2 = QueryBuilders.termQuery("name", "yang"); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder1).must(queryBuilder2); queryPO.setQuery(queryBuilder); // 根据userName分组。创建terms桶聚合,聚合名字=terms_by_userName, 字段=payload.userName.keyword TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("terms_by_userName").field("payload.userName.keyword"); termsAggregationBuilder.size(queryPO.getPageSize() * queryPO.getPageNum()); termsAggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", null) .from((queryPO.getPageNum() - 1) * queryPO.getPageSize()).size(queryPO.getPageSize())); queryPO.setTermsAggregation(termsAggregationBuilder); // 根据userName聚合count统计、cardinality名=count_userName, 字段=payload.userName.keyword CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders .cardinality("count_userName").field("payload.userName.keyword"); queryPO.setCardinalityAggregation(cardinalityAggregationBuilder); // 执行查询 EsQueryRespPO esQueryRespPO = esSearchRepository.searchAggregation(queryPO); }}