> 文章列表 > SpringCloud RocketMQ

SpringCloud RocketMQ

SpringCloud RocketMQ

目录

0. 课程视频地址

0.1 撮合引擎课程

0.1 RocketMQ安装

0.3 RocketMQ  搭建

1. docker 配置rocketmq

2. Disruptor 和 SpringBoot 整合

2.1生产者往RingBuffer中放数据 Disruptor把数据推给消费者

2.2 Disruptor配置文件 // ps:可以配置是否支持多生产者

2.3 Disruptor异常处理类

2.4 事件携带的数据

2.5 Order 减少耦合 新建实体类 

2.6 Bean 相同价格不同时间订单的订单类 // 不同价格订单 相同价格不同时间订单

2.7 Order 转换成OrderEvent

2.8 创建一个RingBuffer / 无锁高效的等待策略 / 创建DisruptorTemplate


0. 课程视频地址


0.1 撮合引擎课程

https://www.bilibili.com/video/BV13v4y1A7qH?p=48&vd_source=ff8b7f852278821525f11666b36f180a

0.1 RocketMQ安装

https://www.bilibili.com/video/BV173411P7M2?p=10&vd_source=ff8b7f852278821525f11666b36f180a

0.3 RocketMQ  搭建

http://rocket-server:8080/#/


1. docker 配置rocketmq

cd /usr/local
mkdir rocketmq
cd rocketmq
vi broker.conf
// 配置 broker.confterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 公网ip

2. Disruptor 和 SpringBoot 整合

2.1生产者往RingBuffer中放数据 Disruptor把数据推给消费者

2.2 Disruptor配置文件 // ps:可以配置是否支持多生产者

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.disruptor")
public class DisruptorProperties {/*** 缓冲区的大小*/private Integer ringBufferSize = 1024 * 1024 ;/*** 是否支持多生产者*/private boolean isMultiProducer = false ;
}

2.3 Disruptor异常处理类

package org.example.disruptor;import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import lombok.extern.slf4j.Slf4j;/*** DisruptorHandlerException 的异常处理*/
@Slf4j
public class DisruptorHandlerException implements ExceptionHandler {/*** <p>Strategy for handling uncaught exceptions when processing an event.</p>** <p>If the strategy wishes to terminate further processing by the {@link BatchEventProcessor}* then it should throw a {@link RuntimeException}.</p>** @param ex       the exception that propagated from the {@link EventHandler}.* @param sequence of the event which cause the exception.* @param event    being processed when the exception occurred.  This can be null.*/@Overridepublic void handleEventException(Throwable ex, long sequence, Object event) {log.info("handleEventException Exception===>{} , sequence==> {} ,event===>{}",ex.getMessage(),sequence,event);}/*** Callback to notify of an exception during {@link LifecycleAware#onStart()}** @param ex throw during the starting process.*/@Overridepublic void handleOnStartException(Throwable ex) {log.info("OnStartHandler Exception===>{} ",ex.getMessage());}/*** Callback to notify of an exception during {@link LifecycleAware#onShutdown()}** @param ex throw during the shutdown process.*/@Overridepublic void handleOnShutdownException(Throwable ex) {log.info("OnShutdownHandler Exception===>{} ",ex.getMessage());}
}

2.4 事件携带的数据

import lombok.Data;import java.io.Serializable;@Data
public class OrderEvent implements Serializable {/*** 时间戳*/private final long timestamp;/*** 事件携带的数据*/protected transient Object source;public OrderEvent() {this.timestamp = System.currentTimeMillis();}public OrderEvent(Object source) {this.timestamp = System.currentTimeMillis();this.source = source ;}
}

2.5 Order 减少耦合 新建实体类 

import lombok.Data;
import lombok.NoArgsConstructor;
import org.example.enums.OrderDirection;import java.io.Serializable;
import java.math.BigDecimal;/*** 委托单*/
@Data
@NoArgsConstructor
public class Order implements Serializable {/*** 本次订单的Id*/private String orderId;/*** 用户/会员Id*/private Long userId;/*** 支持的币币交易对*/private String symbol;/*** 买入或卖出量*/private BigDecimal amount = BigDecimal.ZERO;/*** 成交量*/private BigDecimal tradedAmount = BigDecimal.ZERO;/*** 成交额*/private BigDecimal turnover = BigDecimal.ZERO;/*** 币单位*/private String coinSymbol;/*** 结算单位*/private String baseSymbol;/*** 订单状态*/private Integer orderStatus;/*** 订单的方向*/private OrderDirection orderDirection;/*** 挂单的价格*/private BigDecimal price = BigDecimal.ZERO;/*** 挂单时间*/private Long time;/*** 交易完成时间*/private Long completedTime;/*** 交易取消时间*/private Long cancelTime;private boolean isCancelOrder ;//    /**
//     * 已经成功的水平订单
//     */
//    private List<OrderDetail> details;/*** 订单是否完成的判断依据** @return*/public boolean isCompleted() {return amount.compareTo(tradedAmount) <= 0;}
}

2.6 Bean 相同价格不同时间订单的订单类 // 不同价格订单 相同价格不同时间订单

import lombok.Data;import java.math.BigDecimal;@Data
public class OrderDetail {/*** 订单Id*/private String orderId;/*** 成交价格*/private BigDecimal price;/*** 成交数量*/private BigDecimal amount;/*** 成交额*/private BigDecimal turnover;/*** 费率*/private BigDecimal fee;/*** 成功时间*/private Long dealTime;}

2.7 Order 转换成OrderEvent

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import org.example.model.Order;/*** 在boot里面使用它发送消息*/
public class DisruptorTemplate {private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, Order>() {public void translateTo(OrderEvent event, long sequence, Order input) {event.setSource(input);}};private final RingBuffer<OrderEvent> ringBuffer;public DisruptorTemplate(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 我们使用DisruptorTemplate 时,就使用它的onData方法* @param input*/public void onData(Order input) {ringBuffer.publishEvent(TRANSLATOR, input);}
}

2.8 创建一个RingBuffer / 无锁高效的等待策略 / 创建DisruptorTemplate

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import net.openhft.affinity.AffinityThreadFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;@Configuration
@EnableConfigurationProperties(value = DisruptorProperties.class)
public class DisruptorAutoConfiguration {public DisruptorProperties disruptorProperties;public DisruptorAutoConfiguration(DisruptorProperties disruptorProperties) {this.disruptorProperties = disruptorProperties;}@Beanpublic EventFactory<OrderEvent> eventEventFactory() {EventFactory<OrderEvent> orderEventEventFactory = new EventFactory<OrderEvent>() {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}};return orderEventEventFactory;}@Beanpublic ThreadFactory threadFactory() {return new AffinityThreadFactory("Match-Handler:") ;}/*** 无锁高效的等待策略** @return*/@Beanpublic WaitStrategy waitStrategy() {return new YieldingWaitStrategy();}/*** 创建一个RingBuffer* eventFactory: 事件工厂* threadFactory:   我们执行者(消费者)的线程该怎么创建* waitStrategy : 等待策略: 当我们ringBuffer 没有数据时,我们怎么等待*/@Beanpublic RingBuffer<OrderEvent> ringBuffer(EventFactory<OrderEvent> eventFactory,ThreadFactory threadFactory,WaitStrategy waitStrategy,EventHandler<OrderEvent>[] eventHandlers) {/*** 构建disruptor*/Disruptor<OrderEvent> disruptor = null;ProducerType producerType = ProducerType.SINGLE;if (disruptorProperties.isMultiProducer()) {producerType = ProducerType.MULTI;}disruptor = new Disruptor<OrderEvent>(eventFactory, disruptorProperties.getRingBufferSize(), threadFactory, producerType, waitStrategy);disruptor.setDefaultExceptionHandler(new DisruptorHandlerException());// 设置消费者---我们的每个消费者代表我们的一个交易对,有多少个交易对,我们就有多少个eventHandlers ,事件来了后,多个eventHandlers 是并发执行的disruptor.handleEventsWith(eventHandlers);RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();disruptor.start();// 开始监听final Disruptor<OrderEvent> disruptorShutdown = disruptor;// 使用优雅的停机Runtime.getRuntime().addShutdownHook(new Thread(() -> {disruptorShutdown.shutdown();}, "DisruptorShutdownThread"));return ringBuffer;}/*** 创建DisruptorTemplate** @param ringBuffer* @return*/@Beanpublic DisruptorTemplate disruptorTemplate(RingBuffer<OrderEvent> ringBuffer) {return new DisruptorTemplate(ringBuffer);}
}

2.9 接收到了某个消息

import com.lmax.disruptor.EventHandler;
import org.example.match.MatchServiceFactory;
import org.example.match.MatchStrategy;
import org.example.model.Order;
import org.example.model.OrderBooks;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;/*** 该对象 有多个: 和Symbol的数据对应* 针对某一个OrderEventHandler ,只会同一时间有一个线程来执行它*/
@Data
@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent> {private OrderBooks orderBooks;private String symbol ;public OrderEventHandler(OrderBooks orderBooks) { // 构造器this.orderBooks = orderBooks;this.symbol =  this.orderBooks.getSymbol() ;}/*** 接收到了某个消息** @param event* @param sequence* @param endOfBatch* @throws Exception*/@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// 从ringbuffer 里面接收了某个数据Order order = (Order)event.getSource();if(!order.getSymbol().equals(symbol)){ // 我们接收到了一个不属于我们处理的数据,我们不处理return;}//        log.info("开始接收订单事件============>{}", event);/// 处理逻辑是啥?MatchServiceFactory.getMatchService(MatchStrategy.LIMIT_PRICE).match(orderBooks ,order);
////        log.info("处理完成我们的订单事件===================>{}", event);}
}

2.10 发送消息的Bean

/*** 创建DisruptorTemplate** @param ringBuffer* @return*/@Beanpublic DisruptorTemplate disruptorTemplate(RingBuffer<OrderEvent> ringBuffer) {return new DisruptorTemplate(ringBuffer);}