> 文章列表 > elasticsearch MySQL 数据同步。

elasticsearch MySQL 数据同步。

elasticsearch  MySQL 数据同步。

elasticsearch & MySQL 数据同步。


文章目录

    • elasticsearch & MySQL 数据同步。
      • 3. 数据同步。
          • 3.1. 思路分析。
            • 3.1.1. 同步调用。
            • 3.1.2. 异步通知。
            • 3.1.3. 监听 binlog。
            • 3.1.4. 选择。
          • 3.2. 实现数据同步。
            • 3.2.1. 思路。
            • 3.2.2. 导入 demo。
            • 3.2.3. 声明交换机、队列
            • 1)引入依赖。
            • 2)声明队列交换机名称。
            • 3)声明队列交换机。
            • 3.2.4. 发送 MQ 消息
            • 3.2.5. 接收 MQ 消息。

3. 数据同步。

elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步

在微服务中,负责酒店管理(操作 MySQL)的业务与负责酒店搜索(操作 elasticsearch)的业务可能在两个不同的微服务上,两者数据该如克如何保持同步?


3.1. 思路分析。

常见的数据同步方案有三种。

  • 同步调用。

  • 异步通知。

  • 监听 binlog。


3.1.1. 同步调用。

方案一:同步调用。

elasticsearch  MySQL 数据同步。
基本步骤如下。

  • hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据。

  • 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口,


3.1.2. 异步通知。

方案二:异步通知。

elasticsearch  MySQL 数据同步。
流程如下。

  • hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息。

  • hotel-demo 监听 MQ,接收到消息后完成 elasticsearch 数据修改。


3.1.3. 监听 binlog。

方案三:监听 binlog。

elasticsearch  MySQL 数据同步。
流程如下。

  • 给 mysql 开启 binlog 功能。

  • mysql 完成增、删、改操作都会记录在 binlog 中。

  • hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容。


3.1.4. 选择。

方式一:同步调用。

  • 优点:实现简单,粗暴。

  • 缺点:业务耦合度高。

方式二:异步通知。

  • 优点:低耦合,实现难度一般。

  • 缺点:依赖 mq 的可靠性。

方式三:监听 binlog。

  • 优点:完全解除服务间耦合。

  • 缺点:开启 binlog 增加数据库负担、实现复杂度高。


3.2. 实现数据同步。
3.2.1. 思路。

开发 hotel-admin 项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对 elasticsearch 中数据也要完成相同操作。

步骤。

  • 导入 hotel-admin 项目,启动并测试酒店数据的 CRUD。

  • 声明 exchange、queue、routingKey。

  • 在 hotel-admin 中的增、删、改业务中完成消息发送。

  • 在 hotel-demo 中完成消息监听,并更新 elasticsearch 中数据。

  • 启动并测试数据同步功能。


3.2.2. 导入 demo。

导入 hotel-admin 项目。

运行后,访问 http://localhost:8099。

其中包含了酒店的 CRUD 功能。


3.2.3. 声明交换机、队列。

MQ 结构如图。

elasticsearch  MySQL 数据同步。


1)引入依赖。

在 hotel-admin、hotel-demo 中引入 rabbitmq 的依赖。

<!-- amqp。-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)声明队列交换机名称。

在 hotel-admin 和 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.constatnts 包下新建一个类 MqConstants

package com.geek.elasticsearchgeek.hotel.constant;/* @author geek*/
public interface IMqConstants {/* 交换机。*/String TOPIC_EXCHANGE_HOTEL = "topic.exchange.hotel";/* 监听新增和修改的队列。*/String QUEUE_HOTEL_INSERT = "queue.hotel.insert";/* 监听删除的队列。*/String QUEUE_HOTEL_DELETE = "queue.hotel.delete";/* 新增或修改的 RoutingKey。*/String ROUTING_KEY_HOTEL_INSERT = "routing.key.hotel.insert";/* 删除的 RoutingKey。*/String ROUTING_KEY_HOTEL_DELETE =  "routing.key.hotel.delete";}

3)声明队列交换机。

在 hotel-demo 中,定义配置类,声明队列、交换机。

package com.geek.elasticsearchgeek.hotel.config;import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* @author geek*/
@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange() {return new TopicExchange(IMqConstants.TOPIC_EXCHANGE_HOTEL, true, false);}@Beanpublic Queue queueHotelInsert() {return new Queue(IMqConstants.QUEUE_HOTEL_INSERT, true);}@Beanpublic Queue queueHotelDelete() {return new Queue(IMqConstants.QUEUE_HOTEL_DELETE, true);}@Beanpublic Binding hotelInsertQueueRoutingKeyBinding() {return BindingBuilder.bind(queueHotelInsert()).to(topicExchange()).with(IMqConstants.ROUTING_KEY_HOTEL_INSERT);}@Beanpublic Binding hotelDeleteQueueRoutingKeyBinding() {return BindingBuilder.bind(queueHotelDelete()).to(topicExchange()).with(IMqConstants.ROUTING_KEY_HOTEL_DELETE);}}

3.2.4. 发送 MQ 消息。

在 hotel-admin 中的增、删、改业务中分别发送 MQ 消息。

package com.geek.elasticsearch.hotel.admin.controller;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.geek.elasticsearch.hotel.admin.constant.IMqConstants;
import com.geek.elasticsearch.hotel.admin.dataobject.Hotel;
import com.geek.elasticsearch.hotel.admin.dto.PageResult;
import com.geek.elasticsearch.hotel.admin.service.IHotelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.security.InvalidParameterException;/* @author geek*/
@Slf4j
@RestController
@RequestMapping("/hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id) {return this.hotelService.getById(id);}@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size) {Page<Hotel> result = this.hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}@PostMappingpublic void saveHotel(@RequestBody Hotel hotel) {this.hotelService.save(hotel);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_INSERT,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {this.hotelService.removeById(id);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_DELETE,id);}@PutMappingpublic void updateById(@RequestBody Hotel hotel) {if (hotel.getId() == null) {throw new InvalidParameterException("id 不能为空。");}this.hotelService.updateById(hotel);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_INSERT,hotel.getId());}}

3.2.5. 接收 MQ 消息。

hotel-demo 接收到 MQ 消息要做的事情包括。

  • 新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到索引库。

  • 删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据。

1)首先在 hotel-demo 的 com.geek.elasticsearchgeek.hotel.service 包下的 IHotelService 中新增新增、删除业务。

void deleteById(Long id);void insertById(Long id);

2)给 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.service.impl 包下的 HotelService 中实现业务。

@Overridepublic void insertById(Long id) {// 根据 id 查询酒店数据。Hotel hotel = getById(id);// 转换为文档类型。HotelDoc hotelDoc = new HotelDoc(hotel);// 准备 Request 对象。IndexRequest indexRequest = new IndexRequest("hotel").id(hotel.getId().toString());// 准备 Json 文档。indexRequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 发送请求。try {this.restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long id) {// 准备 Request。DeleteRequest deleteRequest = new DeleteRequest("hotel", id.toString());// 发送请求。try {this.restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}

3)编写监听器。

在 hotel-demo 中的com.geek.elasticsearchgeek.hotel.mq包新增一个类。

package com.geek.elasticsearchgeek.hotel.mq;import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import com.geek.elasticsearchgeek.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/* @author geek*/
@Component
public class HotelMqListener {@Autowiredprivate IHotelService hotelService;/* 监听酒店新增或修改的业务。 @param id 酒店 id。*/@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_INSERT)public void listenHotelInsertOrUpdate(Long id) {this.hotelService.insertById(id);}/* 监听酒店删除的业务。 @param id 酒店 id。*/@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_DELETE)public void listenHotelDelete(Long id) {this.hotelService.deleteById(id);}}

拓道资本