Browse Source

整合elasticsearch

459242451@qq.com 4 năm trước cách đây
mục cha
commit
4ec8c52c70

+ 9 - 1
ruoyi-admin/src/main/resources/application-test.yml

@@ -92,7 +92,15 @@ spring:
                 max-active: 8
                 # #连接池最大阻塞等待时间(使用负值表示没有限制)
                 max-wait: -1ms
-
+elasticsearch:
+    port: 9200
+    ip: 200.200.19.122
+    type: http  #访问方式
+    numberOfReplicas: 2  #副本数
+    numberOfShards: 5  #分片数
+    defaultIndex: materials  #默认加载索引
+    searchTimeOut: 10    #搜索超时时长,单位:秒
+    masterShardTimeOut: 1 #主分片执行更新的超时时长 ,单位:秒
 
 # Swagger配置
 swagger:

+ 485 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/config/ElasticSearchClient.java

@@ -0,0 +1,485 @@
+package com.ruoyi.framework.config;
+
+import cn.hutool.core.util.StrUtil;
+import com.ruoyi.framework.config.properties.ElasticSearchConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.DistanceUnit;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.GeoDistanceQueryBuilder;
+import org.elasticsearch.index.query.MultiMatchQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ES客户端
+ */
+@Component
+@Slf4j
+public class ElasticSearchClient {
+
+    @Autowired
+    ElasticSearchConfig elasticSearchConfig;
+
+    public static RestHighLevelClient client;
+
+    /**
+     * 设置创建索引的相关参数
+     */
+    @PostConstruct
+    public void init() {
+        try {
+            if (client != null) {
+                client.close();
+            }
+            //创建
+            HttpHost httpHost = new HttpHost(elasticSearchConfig.getIp(),
+                    elasticSearchConfig.getPort(), elasticSearchConfig.getType());
+            client = new RestHighLevelClient(RestClient.builder(httpHost));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 如果注释以下代码,默认访问127.0.0.1:9200 或者添加下面代码,访问的是yml文件配置的IP,端口,类型
+     *
+     * @return
+     */
+    @Bean
+    public RestHighLevelClient client() {
+        return new RestHighLevelClient(RestClient.builder(new HttpHost(elasticSearchConfig.getIp(),
+                elasticSearchConfig.getPort(), elasticSearchConfig.getType())));
+    }
+
+    /**
+     * 创建索引
+     *
+     * @param index
+     * @return
+     */
+    public boolean createIndex(String index) throws IOException {
+        if (isIndexExist(index)) {
+            log.error("Index is  exits!");
+            return false;
+        }
+        //1.创建索引请求
+        CreateIndexRequest request = new CreateIndexRequest(index);
+        //2.执行客户端请求
+        CreateIndexResponse response = client.indices()
+                .create(request, RequestOptions.DEFAULT);
+        return response.isAcknowledged();
+    }
+
+    /**
+     * 删除索引
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteIndex(String index) throws IOException {
+        if (!isIndexExist(index)) {
+            log.error("Index is not exits!");
+            return false;
+        }
+        DeleteIndexRequest request = new DeleteIndexRequest(index);
+        AcknowledgedResponse delete = client.indices()
+                .delete(request, RequestOptions.DEFAULT);
+        return delete.isAcknowledged();
+    }
+
+    /**
+     * 新增单条文档数据
+     *
+     * @param indexName 索引名称
+     * @param id
+     * @throws Exception
+     */
+    public void createDocument(String indexName, String id, Map<String, Object> paramMap) throws Exception {
+        // 指定单条文档数据,最终会转化成Json格式
+        XContentBuilder builder = XContentFactory.jsonBuilder();
+        builder.startObject();
+        {
+            if (!CollectionUtils.isEmpty(paramMap)) {
+                paramMap.forEach((key, value) -> {
+                    try {
+                        builder.field(key, value);
+                    } catch (IOException ignored) {
+                    }
+                });
+            }
+        }
+        builder.endObject();
+        // 创建新增文档数据的请求
+        IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(builder);
+        // 手动指定路由的key,文档查询时可提高性能
+        indexRequest.routing("route_" + indexName);
+        // 等待主分片保存的超时时长
+        indexRequest.timeout(TimeValue.timeValueSeconds(elasticSearchConfig.getMasterShardTimeOut()));
+        // 刷新策略,WAIT_UNTIL设置则表示刷新使此请求的内容对搜索可见为止
+        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+        // 操作类型为新增
+        indexRequest.opType(DocWriteRequest.OpType.CREATE);
+
+        // 异步执行新增文档数据请求
+        client.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
+            @Override
+            public void onResponse(IndexResponse indexResponse) {
+                log.info("新增单条文档数据,结果:{}", indexResponse.toString());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+
+    /**
+     * 更新文档内容
+     *
+     * @param indexName 索引名称
+     * @param id
+     */
+    public void updateDocument(String indexName, String id, Map<String, Object> paramMap) {
+
+        // 创建更新文档请求并设置参数
+        UpdateRequest updateRequest = new UpdateRequest(indexName, id);
+        updateRequest.doc(paramMap);
+        // 主分片执行更新的超时时长
+        updateRequest.timeout(TimeValue.timeValueSeconds(elasticSearchConfig.getMasterShardTimeOut()));
+        // 刷新策略
+        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+        // 重试更新操作多少次
+        updateRequest.retryOnConflict(3);
+
+        // 异步执行更新文档的请求
+        client.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
+            @Override
+            public void onResponse(UpdateResponse updateResponse) {
+                log.info("更新文档内容,结果:{}", updateResponse.toString());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+    /**
+     * 删除文档数据
+     *
+     * @param indexName 索引名称
+     * @param id
+     * @throws Exception
+     */
+    public void deleteDocument(String indexName, String id) throws Exception {
+        DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
+        // 主分片执行删除的超时时长
+        deleteRequest.timeout(TimeValue.timeValueMinutes(2));
+        // 刷新策略
+        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+        // 异步执行删除
+        client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
+            @Override
+            public void onResponse(DeleteResponse deleteResponse) {
+                log.info("删除成功:" + deleteResponse.toString());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                log.info("删除失败:" + e.getMessage());
+            }
+        });
+    }
+
+
+    /**
+     * 文档数据批量插入
+     *
+     * @param mapList   行数据
+     * @param indexName 索引
+     * @param idName    主键名称
+     * @return
+     * @throws Exception
+     */
+    public void importDocument(List<Map<String, Object>> mapList, String indexName, String idName) {
+        IndexRequest indexRequest;
+        BulkRequest bulkRequest = new BulkRequest();
+        try {
+            for (Map<String, Object> mp : mapList) {
+                indexRequest = new IndexRequest(indexName).id(mp.get(idName).toString()).source(mp);
+                bulkRequest.add(indexRequest);
+            }
+            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+            log.info("批量插入结果:{}", bulkResponse.toString());
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    /**
+     * 根据经纬度查询范围查找location 经纬度字段,distance 距离中心范围KM,lat  lon 圆心经纬度
+     *
+     * @param index
+     * @param longitude
+     * @param latitude
+     * @param distance
+     * @return
+     */
+    public List<Map<String, Object>> geoDistanceQuery(String index, Float longitude, Float latitude, String distance) throws IOException {
+        if (longitude == null || latitude == null) {
+            return null;
+        }
+        //拼接条件
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+//        QueryBuilder isdeleteBuilder = QueryBuilders.termQuery("isdelete", false);
+        // 以某点为中心,搜索指定范围
+        GeoDistanceQueryBuilder distanceQueryBuilder = new GeoDistanceQueryBuilder("location");
+        distanceQueryBuilder.point(latitude, longitude);
+        //查询单位:km
+        distanceQueryBuilder.distance(distance, DistanceUnit.KILOMETERS);
+        boolQueryBuilder.filter(distanceQueryBuilder);
+//        boolQueryBuilder.must(isdeleteBuilder);
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.query(boolQueryBuilder);
+
+        SearchRequest searchRequest = new SearchRequest(index);
+        searchRequest.source(searchSourceBuilder);
+
+        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+        // 返回结果
+        SearchHits searchHitArray = searchResponse.getHits();
+        List<Map<String, Object>> mapList = new ArrayList<>();
+        for (SearchHit searchHit : searchHitArray.getHits()) {
+            Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
+            mapList.add(sourceAsMap);
+        }
+        return mapList;
+    }
+
+    /**
+     * 通过id查询数据
+     *
+     * @param indexName
+     * @param id
+     * @param fields    需要显示的字段,逗号分隔(缺省为全部字段)
+     * @return
+     */
+    public Map<String, Object> getDocById(String indexName, String id, String fields) {
+        Map<String, Object> getDocByIdMap = new HashMap<>();
+        GetRequest getRequest = new GetRequest(indexName, id);
+        if (StrUtil.isNotBlank(fields)) {
+            //只查询特定字段。如果需要查询所有字段则不设置该项。
+            getRequest.fetchSourceContext(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
+        }
+        GetResponse getResponse;
+        try {
+            getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+            if (getResponse.isExists()) {
+                getDocByIdMap.put("msg", "操作成功!");
+                getDocByIdMap.put("data", getResponse.getSource());
+                getDocByIdMap.put("code", 200);
+            } else {
+                getDocByIdMap.put("msg", "数据不存在");
+                getDocByIdMap.put("code", -1);
+                getDocByIdMap.put("data", null);
+            }
+            return getDocByIdMap;
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * 检索、分页
+     *
+     * @param indexName    索引名称
+     * @param mpParams     查询参数
+     * @param from         起始页
+     * @param size         每页数量
+     * @param fields       返回列
+     * @param preciseQuery 1:精确查询 2:模糊查询
+     * @return
+     */
+    public List<Map<String, Object>> searchDocument(String indexName, Map<String, Object> mpParams,
+                                                    int from, int size, String fields, String sorts, Integer preciseQuery) {
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        // 大多数搜索参数添加到searchSourceBuilder
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        List<Map<String, Object>> mapList = new ArrayList<>();
+        try {
+            // 组合字段查询
+            BoolQueryBuilder boolQueryBuilder = this.getBoolQueryBuilder(mpParams, preciseQuery);
+            searchSourceBuilder.query(boolQueryBuilder);
+            // 自定义返回列
+            if (StrUtil.isNotBlank(fields)) {
+                searchSourceBuilder.fetchSource(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
+            }
+            // 排序
+            if (StrUtil.isNotBlank(sorts)) {
+                searchSourceBuilder.sort(sorts + ".keyword", SortOrder.DESC);
+            }
+            // 分页
+            searchSourceBuilder.from(from);
+            searchSourceBuilder.size(size);
+            // 允许搜索的超时时长,10s
+            searchSourceBuilder.timeout(new TimeValue(elasticSearchConfig.getSearchTimeOut(), TimeUnit.SECONDS));
+            searchRequest.source(searchSourceBuilder);
+            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+            // 返回结果
+            SearchHits searchHitArray = searchResponse.getHits();
+            for (SearchHit searchHit : searchHitArray.getHits()) {
+                Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
+                mapList.add(sourceAsMap);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return mapList;
+    }
+
+
+    /**
+     * 组合字段查询条件
+     *
+     * @param mpParams
+     * @param preciseQuery 1:精确查询 2:模糊查询 3.匹配多个字段的模糊查询(使用IK中文分词器)
+     * @return
+     */
+    public BoolQueryBuilder getBoolQueryBuilder(Map<String, Object> mpParams, Integer preciseQuery) {
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+        try {
+            if (null != mpParams.get("search") && null != mpParams.get("fields")) {
+                if (mpParams.get("fields") instanceof List) {
+                    String[] fields = (String[]) ((List) mpParams.get("fields")).
+                            toArray(new String[((List) mpParams.get("fields")).size()]);
+                    MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(mpParams.get("search"), fields);
+                    boolQueryBuilder = boolQueryBuilder.must(multiMatchQueryBuilder);
+                }
+            }
+//            if (mpParams != null) {
+//
+//                if (preciseQuery != null && preciseQuery == 1) {
+//                    for (Map.Entry<String, Object> entry : mpParams.entrySet()) {
+//                        // 精确匹配
+//                        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(entry.getKey() + ".keyword", entry.getValue());
+//                        boolQueryBuilder = boolQueryBuilder.must(termQueryBuilder);
+//                    }
+//
+//                }
+//            } else if (preciseQuery != null && preciseQuery == 2) {
+//                for (Map.Entry<String, Object> entry : mpParams.entrySet()) {
+//                    // 模糊匹配
+//                    MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery(entry.getKey(), entry.getValue());
+//                    boolQueryBuilder = boolQueryBuilder.must(matchQueryBuilder);
+//                }
+//
+//            } else if (preciseQuery != null && preciseQuery == 3) {
+//                //多词模糊匹配
+////                List<Object> values = ((MultiValueMap) mpParams).get("fields");
+//
+//               /* for (String value : values) {
+//
+//                    log.info(key + ": " + value);
+//
+//                }*/
+//                ((MultiValueMap) mpParams).entrySet().forEach(entry -> {
+//
+//                    MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery("新品", entry.).minimumShouldMatch("50%");
+//                    boolQueryBuilder = boolQueryBuilder.must(multiMatchQueryBuilder);
+//                });
+//
+//            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return boolQueryBuilder;
+    }
+
+
+    /**
+     * 索引创建前,判断索引是否存在
+     */
+    public static Boolean isIndexExist(String checkIndex) {
+        Boolean flag = false;
+        try {
+            GetIndexRequest getIndexRequest = new GetIndexRequest(checkIndex);
+            getIndexRequest.local(false);
+            getIndexRequest.humanReadable(true);
+            getIndexRequest.includeDefaults(false);
+            flag = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return flag;
+    }
+
+    /**
+     * 通过ID判断文档是否存在
+     *
+     * @param index 索引,类似数据库
+     * @param id    数据ID
+     * @return
+     */
+    public boolean existsById(String index, String id) throws IOException {
+        GetRequest request = new GetRequest(index, id);
+        //不获取返回的_source的上下文
+        request.fetchSourceContext(new FetchSourceContext(false));
+        request.storedFields("_none_");
+        return client.exists(request, RequestOptions.DEFAULT);
+    }
+
+
+}

+ 32 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/config/properties/ElasticSearchConfig.java

@@ -0,0 +1,32 @@
+package com.ruoyi.framework.config.properties;
+
+import lombok.Data;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 读取yml中es的配置信息和设置存储索引的分片数量和副本数量  :es 配置类
+ */
+@Data
+@Configuration
+@ConditionalOnExpression("!'${elasticsearch}'.isEmpty()")
+@ConfigurationProperties(prefix = "elasticsearch")
+public class ElasticSearchConfig {
+    //ES地址
+    private String ip;
+    //端口
+    private Integer port;
+    //访问方式 http 或 tcp
+    private String type;
+    //默认全局索引
+    private String defaultIndex;
+    //副本数
+    private int numberOfReplicas;
+    //分片数
+    private int numberOfShards;
+    //搜索超时时间
+    private int searchTimeOut;
+    //主分片设置的超时时长
+    private int masterShardTimeOut;
+}