天翼开放平台

请先登录 天翼开放平台

搜索
热搜: 活动
查看: 1795|回复: 0

#技术分享#基于Java、Kafka、ElasticSearch的搜索框架的设计与实 [复制链接]

Rank: 7Rank: 7Rank: 7

发表于 2017-9-11 09:00:09 |显示全部楼层


Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST API用于文档搜索。

项目主页:https://github.com/chaokunyang/jkes

安装

可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。

安装jkes-index-connectorjkes-delete-connector到Kafka Connect类路径

安装 Smart ChineseAnalysis Plugin

sudobin/elasticsearch-plugin install analysis-smartcn

配置

引入jkes-spring-data-jpa依赖

添加配置

@EnableAspectJAutoProxy

@EnableJkes

@Configuration

public class JkesConfig {

  @Bean

  publicPlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport){

    returnnew SearchPlatformTransactionManager(new JpaTransactionManager(factory),eventSupport);

  }

}

提供JkesPropertiesBean

@Component

@Configuration

public class JkesConf extends DefaultJkesPropertiesImpl {

    @PostConstruct

    publicvoid setUp() {

       Config.setJkesProperties(this);

    }

    @Override

    publicString getKafkaBootstrapServers() {

       return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";

    }

    @Override

    publicString getKafkaConnectServers() {

       return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";

    }

    @Override

    publicString getEsBootstrapServers() {

       return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";

    }

    @Override

    public String getDocumentBasePackage() {

       return "com.timeyang.jkes.integration_test.domain";

    }

    @Override

    publicString getClientId() {

       return "integration_test";

    }

}

这里可以很灵活,如果使用SpringBoot,可以使用@ConfigurationProperties提供配置

增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点

@RestController

@RequestMapping("/api/search")

public class SearchEndpoint {

    privateIndexer indexer;

    @Autowired

    publicSearchEndpoint(Indexer indexer) {

       this.indexer = indexer;

    }

    @RequestMapping(value = "/start_all", method = RequestMethod.POST)

    publicvoid startAll() {

       indexer.startAll();

    }

    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)

    public void start(@PathVariable("entityClassName") StringentityClassName) {

       indexer.start(entityClassName);

    }

    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)

    publicMap<String, Boolean> stopAll() {

       return indexer.stopAll();

    }

    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)

    publicBoolean stop(@PathVariable("entityClassName") String entityClassName) {

       return indexer.stop(entityClassName);

    }

    @RequestMapping(value = "/progress", method = RequestMethod.GET)

    publicMap<String, IndexProgress> getProgress() {

       return indexer.getProgress();

    }

}

快速开始

索引API

使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data

@Entity

@Document

public class Person extends AuditedEntity {

    // @Id will be identified automatically

    // @Field(type = FieldType.Long)

    @Id

    @GeneratedValue(strategy =GenerationType.IDENTITY)

    privateLong id;

    @MultiFields(

           mainField = @Field(type = FieldType.Text),

           otherFields = {

                    @InnerField(suffix = "raw", type =FieldType.Keyword),

                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")

           }

    )

    privateString name;

    @Field(type =FieldType.Keyword)

    privateString gender;

    @Field(type =FieldType.Integer)

    privateInteger age;

    // don't add @Field to test whether ignored

    // @Field(type = FieldType.Text)

    privateString description;

    @Field(type =FieldType.Object)

    @ManyToOne(fetch =FetchType.EAGER)

    @JoinColumn(name = "group_id")

    privatePersonGroup personGroup;

}

@lombok.Data

@Entity

@Document(type = "person_group", alias = "person_group_alias")

public class PersonGroup extends AuditedEntity {

    @Id

    @GeneratedValue(strategy =GenerationType.IDENTITY)

    privateLong id;

    privateString name;

    privateString interests;

    @OneToMany(fetch =FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval =true)

    privateList<Person> persons;

    privateString description;

    @DocumentId

    @Field(type =FieldType.Long)

    publicLong getId() {

       return id;

    }

    @MultiFields(

           mainField = @Field(type = FieldType.Text),

           otherFields = {

                    @InnerField(suffix = "raw", type =FieldType.Keyword),

                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")

           }

    )

    publicString getName() {

       return name;

    }

    @Field(type =FieldType.Text)

    publicString getInterests() {

       return interests;

    }

    @Field(type =FieldType.Nested)

    publicList<Person> getPersons() {

       return persons;

    }

    /**

    * 不加Field注解,测试序列化时是否忽略

    */

    publicString getDescription() {

       return description;

    }

}

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。

搜索API

启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。

URIquery

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10

Nestedquery

integration_test_person_group/person_group/_search?from=0&size=10

{

  "query": {

    "nested": {

     "path": "persons",

     "score_mode": "avg",

     "query": {

       "bool": {

         "must": [

           {

              "range": {

                "persons.age": {

                  "gt": 5

                }

              }

           }

       }

     }

    }

  }

}

matchquery

integration_test_person_group/person_group/_search?from=0&size=10

{

  "query": {

     "match": {

       "interests": "Hadoop"

     }

    }

}

boolquery

{

  "query":{

    "bool": {

     "must" : {

       "match" : { "interests" : "Hadoop" }

     },

     "filter": {

       "term" : { "name.raw" : "name0" }

     },

     "should" : [

       { "match" : { "interests" : "Flink" } },

       {

           "nested" : {

                "path" : "persons",

                "score_mode" : "avg",

                "query" : {

                    "bool" : {

                        "must" : [

                        { "match" : {"persons.name": "name40"} },

                        { "match" : {"persons.interests": "interests"} }

                       ],

                        "must_not" :{

                            "range" :{

                              "age" :{ "gte" : 50, "lte" : 60 }

                            }

                          }

                    }

                }

           }

       }

     ],

     "minimum_should_match" : 1,

     "boost" : 1.0

    }

  }

}

Sourcefiltering

integration_test_person_group/person_group/_search

{

    "_source": false,

    "query" : {

       "match" : { "name" : "name17" }

    }

}

integration_test_person_group/person_group/_search

{

    "_source": {

           "includes": [ "name", "persons.*" ],

           "excludes": [ "date*", "version", "persons.age" ]

       },

    "query" : {

       "match" : { "name" : "name17" }

    }

}

prefix

integration_test_person_group/person_group/_search

{

  "query": {

    "prefix" : { "name" : "name" }

  }

}

wildcard

integration_test_person_group/person_group/_search

{

    "query": {

       "wildcard" : { "name" : "name*" }

    }

}

regexp

integration_test_person_group/person_group/_search

{

    "query": {

       "regexp":{

           "name": "na.*17"

       }

    }

}

Jkes工作原理

索引工作原理:

应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据。

基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java RestClient将创建/更新index配置。

为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档

为整个项目启动/更新Jkes Deleter Connector,用于删除文档

拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer

拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。

SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据

SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector

查询工作原理:

查询服务通过rest api提供

我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度

查询服务是一个SpringBoot Application,使用docker打包为镜像

查询服务提供多版本API,用于API进化和兼容

查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。

为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

流程图

模块介绍

jkes-core

jkes-core是整个jkes的核心部分。主要包括以下功能:

annotation包提供了jkes的核心注解

elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping

kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Client

metadata包提供了核心的注解元数据的构建与结构化模型

event包提供了事件模型与容器

exception包提供了常见的Jkes异常

http包基于Apache Http Client封装了常见的http json请求

support包暴露了Jkes核心配置支持

util包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils,IOUtils, JsonUtils, ReflectionUtils, StringUtils

jkes-boot

jkes-boot用于与一些第三方开源框架进行集成。

当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

jkes-spring-data-jpa说明:

ContextSupport类用于从bean工厂获取Repository Bean

@EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型

EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件

SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook

audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据

exception包封装了常见异常

intercept包提供了AOP切点和切面

index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能

jkes-services

jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:

jkes-delete-connector

jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。

借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活

jkes-search-service

jkes-search-service是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容

jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。

我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度

查询服务是一个SpringBoot Application,使用docker打包为镜像

查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。

为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能

jkes-integration-test

jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率

开发

Tobuild a development version you’ll need a recent version of Kafka. You canbuild jkes with Maven using the standard lifecycle phases.

Contribute

SourceCode: https://github.com/chaokunyang/jkes

IssueTracker: https://github.com/chaokunyang/jkes/issues

LICENSE

Thisproject is licensed under Apache License 2.0.

文章转自:http://www.codeceo.com/article/j ... csearch-search.html

小编寄语

截止当前,天翼开放平台共上线API数达260个,能力以中国电信八大基地、两大专业公司及增值业务运营中心的众多内容和信息能力,尤其对外全面开放了模板短信、“应用内计费web版、“web支付接口,希望对各位开发者有帮助,欢迎移动互联网开发者使用平台能力,感谢对平台的支持!


天翼开放平台
汇众智 翼起创未来
http://open.189.cn

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

回顶部