欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

netcore使用elasticsearch

时间:2023-05-14
elasticsearch7.x

[ExposeServices(isSingleton: true)]public class ElasticSearchServer : IElasticSearchServer{ public IElasticClient ElasticLinqClient { get; set; } public IElasticLowLevelClient ElasticJsonClient { get; set; } public ElasticSearchServer(ESConfig esConfig) { var urls = esConfig.Urls.ConvertAll(x => new Uri(x)); var connectonPool = new StaticConnectionPool(urls); var settings=new ConnectionSettings(connectonPool).RequestTimeout(TimeSpan.FromSeconds(esConfig.Timeout)); this.ElasticJsonClient=new ElasticLowLevelClient(settings); this.ElasticLinqClient= new ElasticClient(settings);//linq请求客户端初始化 } #region 创建Index。指定分片 ///

/// 创建Index,如果已经存在则返回 /// /// /// index名称 /// 副本数量 /// 分片数量 /// public async Task CreateIndexAsync(string indexName, int numberOfReplicas, int numberOfShards, CancellationToken cancellationToken = default) where T : class { var indexExistInfo = this.ElasticLinqClient.Indices.ExistsAsync(indexName, item => item.IgnoreUnavailable(false), cancellationToken); var indexExist = await indexExistInfo; if (indexExist.IsValid) { return true; } CreateIndexResponse result = await this.ElasticLinqClient.Indices.CreateAsync(index: indexName, index => { index.Map(x => x.AutoMap()); index.Settings(setting => { setting.NumberOfShards(numberOfShards); setting.NumberOfReplicas(numberOfReplicas); setting.RefreshInterval(TimeSpan.FromSeconds(10)); return setting; }); return index; } ); return result.IsValid; } /// /// 删除index /// /// /// /// public async Task DeleteIndexAsync(string indexName, CancellationToken cancellationToken = default) { var indexExistInfo = this.ElasticLinqClient.Indices.ExistsAsync(indexName, item => item.IgnoreUnavailable(false), cancellationToken); var indexExist = await indexExistInfo; if (indexExist.IsValid) //存在Index { await this.ElasticLinqClient.Indices.DeleteAsync(indexName, item=>item.IgnoreUnavailable(false), cancellationToken); } } #endregion #region 创建document,批量创建,删除document,批量删除,修改document,批量修改 /// /// 插入单条document /// /// /// public async Task InsertdocumentAsync(string indexName,T t,CancellationToken cancellationToken=default) where T:class { try { await this.ElasticLinqClient.IndexAsync(t, i => i.Index(indexName),cancellationToken); return true; } catch (Exception) { return false; } } public bool InsertBulkdocuments(string indexName, IEnumerable list,int size=1000, CancellationToken cancellationToken = default) where T : class { try { var observableBulk = this.ElasticLinqClient.BulkAll(list, f => f.MaxDegreeOfParallelism(8). BackOffTime(TimeSpan.FromSeconds(10)). BackOffRetries(2). Size(size). RefreshonCompleted(). Index(indexName). BufferToBulk((r, buffer) => r.IndexMany(buffer)), cancellationToken); var countdownEvent = new CountdownEvent(1); var bulkAllObserver = new BulkAllObserver( onNext: response => { // WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"); }, onError: ex => { // WriteLine("BulkAll Error : {0}", ex); // exception = ex; countdownEvent.Signal(); }, () => { // WriteLine("BulkAll Finished"); countdownEvent.Signal(); }); observableBulk.Subscribe(bulkAllObserver); countdownEvent.Wait(cancellationToken); return true; } catch (Exception) { return false; } } /// /// 删除指定条件的document /// /// /// /// /// /// /// /// public async Task DeleteBulkdocument(string indexName, string deleteProperty, string deletevalue, string clusterName= "lcn_elasticsearch", CancellationToken cancellationToken=default) where T : class { var esContent= await this.ElasticLinqClient.DeleteByQueryAsync(p => p.Index(indexName).Query(op => op.Match( x => x.Field(deleteProperty).Query(deletevalue) )),cancellationToken); return esContent.IsValid; } #endregion #region 搜索 public async Task> SearchAsync(List indexNameList, string searchProperty, string searchValue, string clusterName= "lcn_elasticsearch", CancellationToken cancellationToken=default) where T:class { List indexNames = indexNameList.ConvertAll(x => { IndexName indexName = x; return indexName; }); Indices indices = indexNames.ToArray(); var esContent =await this.ElasticLinqClient.SearchAsync(p => p.Index(indices).Query( op=>op.Match(x=>x.Field(searchProperty).Query(searchValue)) ),cancellationToken); return esContent.documents; } public async Task> SearchPagingAsync(List indexNameList, int pageSize, int pageIndex, string searchProperty, string searchValue, string clusterName = "lcn_elasticsearch", CancellationToken cancellationToken = default) where T : class { List indexNames = indexNameList.ConvertAll(x => { IndexName indexName = x; return indexName; }); Indices indices = indexNames.ToArray(); var esContent = await this.ElasticLinqClient.SearchAsync(p => p.Index(indices).From(pageIndex).Size(pageSize). Query( op => op.Match(x => x.Field(searchProperty).Query(searchValue)) ), cancellationToken); return esContent.documents; } #endregion}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。