elasticsearch——数据同步
目录
数据同步思路分析
方案一:同步调用
方案二:异步通知
方案三:监听binlog
区别
关于elasticsearch与数据库数据同步
导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
声明exchange、queue、RoutingKey
导依赖
加配置
定义静态类
在hotel-admin中的增、删、改业务中完成消息发送
在hotel-demo中完成消息监听,并更新elasticsearch中数据
数据同步思路分析
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。
在微服务中,负责酒店管理(操作mysql )的业务与负责酒店搜索(操作elasticsearch )的业务可能在两个不同的微服务上,数据同步该如何实现呢?
方案一:同步调用
方案二:异步通知
方案三:监听binlog
区别
方式一:同步调用
优点:实现简单,粗暴
缺点:业务耦合度高
方式二:异步通知
优点:低耦合,实现难度一般
缺点:依赖mq的可靠性
方式三:监听binlog
优点:完全解除服务间耦合
缺点:开启binlog增加数据库负担、实现复杂度高
关于elasticsearch与数据库数据同步
导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
声明exchange、queue、RoutingKey
一般是在消费者服务声明这些东西,也就是hotel-dome服务,hotel-admin服务时消息发送者
导依赖
<!-- amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
加配置
rabbitmq:port: 5672host: 8.130.89.67virtual-host: /username: itcastpassword: 123# listener:# simple:# prefetch: 1
定义静态类
package cn.itcast.hotel.constants;public class MyContantsMq {/* 交换机*/public final static String HOTEL_EXCHANGE="hotel.topic";/* 修改和插入队列*/public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";/* 删除队列*/public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";/* 修改和新增RoutingKey*/public final static String HOTEL_INSERT_KEY="hotel.insert";/* 删除RoutingKey*/public final static String HOTEL_DELETE_KEY="hotel.delete";
}
在hotel-admin中的增、删、改业务中完成消息发送
也就是消息生产者的配置
还是导依赖和添加配置
添加静态类
然后找到controller层的mysql操作代码
需要先声明 RabbitTemplate这个bean,通过自动注入
@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){rabbitTemplate.convertAndSend(MyContantsMq.HOTEL_EXCHANGE,MyContantsMq.HOTEL_INSERT_KEY,hotel.getId());hotelService.save(hotel);}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}rabbitTemplate.convertAndSend(MyContantsMq.HOTEL_EXCHANGE,MyContantsMq.HOTEL_INSERT_KEY,hotel.getId());hotelService.updateById(hotel);}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {rabbitTemplate.convertAndSend(MyContantsMq.HOTEL_EXCHANGE,MyContantsMq.HOTEL_DELETE_KEY,id);hotelService.removeById(id);}
在hotel-demo中完成消息监听,并更新elasticsearch中数据
@Configuration
public class MqListener {@Autowiredprivate IHotelService service;/* Topic* 新增和修改监听*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MyContantsMq.HOTEL_INSERT_QUEUE),exchange = @Exchange(name = MyContantsMq.HOTEL_EXCHANGE,type = ExchangeTypes.TOPIC),key = {MyContantsMq.HOTEL_INSERT_KEY}))public void listenTopicInsertQueue(Long id){service.insert(id);}/* Topic* 删除监听*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MyContantsMq.HOTEL_DELETE_QUEUE),exchange = @Exchange(name = MyContantsMq.HOTEL_EXCHANGE,type = ExchangeTypes.TOPIC),key = {MyContantsMq.HOTEL_DELETE_KEY}))public void listenTopicDeleteQueue(Long id){service.delete(id);}
}
这是监听类,现在就去service层写代码
@Overridepublic void delete(Long id) {DeleteRequest request=new DeleteRequest("hotel",id.toString());try {restHighLevelClient.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void insert(Long id) {Hotel hotel = getById(id);HotelDoc hotelDoc=new HotelDoc(hotel);IndexRequest request=new IndexRequest("hotel").id(id.toString());request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);try {restHighLevelClient.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}
可以把之前的测试类代码拿来改改就行