> 文章列表 > elasticsearch——数据同步

elasticsearch——数据同步

elasticsearch——数据同步

目录

数据同步思路分析

方案一:同步调用

方案二:异步通知

方案三:监听binlog 

区别

关于elasticsearch与数据库数据同步

导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD

声明exchange、queue、RoutingKey

导依赖

加配置

定义静态类

在hotel-admin中的增、删、改业务中完成消息发送

在hotel-demo中完成消息监听,并更新elasticsearch中数据


数据同步思路分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。

在微服务中,负责酒店管理(操作mysql )的业务与负责酒店搜索(操作elasticsearch )的业务可能在两个不同的微服务上,数据同步该如何实现呢?

方案一:同步调用

方案二:异步通知

方案三:监听binlog 

区别

方式一:同步调用

         优点:实现简单,粗暴

         缺点:业务耦合度高

方式二:异步通知

         优点:低耦合,实现难度一般

         缺点:依赖mq的可靠性

方式三:监听binlog

优点:完全解除服务间耦合

缺点:开启binlog增加数据库负担、实现复杂度高 

关于elasticsearch与数据库数据同步

导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD

声明exchange、queue、RoutingKey

一般是在消费者服务声明这些东西,也就是hotel-dome服务,hotel-admin服务时消息发送者

导依赖

        <!--        amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

加配置

  rabbitmq:port: 5672host: 8.130.89.67virtual-host: /username: itcastpassword: 123#    listener:#      simple:#        prefetch: 1

定义静态类

package cn.itcast.hotel.constants;public class MyContantsMq {/* 交换机*/public final static String HOTEL_EXCHANGE="hotel.topic";/* 修改和插入队列*/public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";/* 删除队列*/public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";/* 修改和新增RoutingKey*/public final static String HOTEL_INSERT_KEY="hotel.insert";/* 删除RoutingKey*/public final static String HOTEL_DELETE_KEY="hotel.delete";
}

在hotel-admin中的增、删、改业务中完成消息发送

也就是消息生产者的配置

还是导依赖和添加配置

添加静态类

然后找到controller层的mysql操作代码

需要先声明 RabbitTemplate这个bean,通过自动注入

    @PostMappingpublic void saveHotel(@RequestBody Hotel hotel){rabbitTemplate.convertAndSend(MyContantsMq.HOTEL_EXCHANGE,MyContantsMq.HOTEL_INSERT_KEY,hotel.getId());hotelService.save(hotel);}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}rabbitTemplate.convertAndSend(MyContantsMq.HOTEL_EXCHANGE,MyContantsMq.HOTEL_INSERT_KEY,hotel.getId());hotelService.updateById(hotel);}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {rabbitTemplate.convertAndSend(MyContantsMq.HOTEL_EXCHANGE,MyContantsMq.HOTEL_DELETE_KEY,id);hotelService.removeById(id);}

在hotel-demo中完成消息监听,并更新elasticsearch中数据

@Configuration
public class MqListener {@Autowiredprivate IHotelService service;/* Topic* 新增和修改监听*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MyContantsMq.HOTEL_INSERT_QUEUE),exchange = @Exchange(name = MyContantsMq.HOTEL_EXCHANGE,type = ExchangeTypes.TOPIC),key = {MyContantsMq.HOTEL_INSERT_KEY}))public void listenTopicInsertQueue(Long id){service.insert(id);}/* Topic* 删除监听*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MyContantsMq.HOTEL_DELETE_QUEUE),exchange = @Exchange(name = MyContantsMq.HOTEL_EXCHANGE,type = ExchangeTypes.TOPIC),key = {MyContantsMq.HOTEL_DELETE_KEY}))public void listenTopicDeleteQueue(Long id){service.delete(id);}
}

这是监听类,现在就去service层写代码

    @Overridepublic void delete(Long id) {DeleteRequest request=new DeleteRequest("hotel",id.toString());try {restHighLevelClient.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void insert(Long id) {Hotel hotel = getById(id);HotelDoc hotelDoc=new HotelDoc(hotel);IndexRequest request=new IndexRequest("hotel").id(id.toString());request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);try {restHighLevelClient.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}

可以把之前的测试类代码拿来改改就行

齐鲁人百科