查询电脑启动记录表

发布时间: 2023-04-16 05:16 阅读: 文章来源:转载

from:cnblogs.com/shanml/p/16350127.htm

NameServer是一个注册中心,Broker在启动时向所有的NameServer注册,生产者Producer和消费者Consumer可以从NameServer中获取所有注册的Broker列表,并从中选取Broker进行消息的发送和消费。

NameServer的启动类是NamesrvStartup,主要做了两件事情:

调用createNamesrvController方法创建NamesrvController,NamesrvController是NameServer的核心调用start方法,启动NameServerpublic class NamesrvStartup {private static InternalLogger log;private static Properties properties = null;private static CommandLine commandLine = null;public static void main(String[] args) {// 启动入口main0(args);}public static NamesrvController main0(String[] args) {try {// 创建NamesrvControllerNamesrvController controller = createNamesrvController(args);// 启动nameserverstart(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}}创建NamesrvController

createNamesrvController方法主要是对配置信息进行处理:

创建NamesrvConfig,从名字可以看出是记录NameServer的相关配置信息创建NettyServerConfig,与Netty服务相关的配置信息,默认设置监听端口为9876从启动命令中检查是否通过- c指定了配置文件,如果指定了配置文件,从指定的路径中加载文件,并将解析文件将配置保存到NamesrvConfigNettyServerConfig校验RocketMQ的主目录是否为空,可以在启动命令中通过-Drocketmq.home.dir=路径指定主目录,也可以在操作系统设置环境变量ROCKETMQ_HOME的方式来指定处理日志相关的设置创建NamesrvController并返回public class NamesrvStartup {public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// Nameserver相关配置final NamesrvConfig namesrvConfig = new NamesrvConfig();// Netty服务器连接相关配置final NettyServerConfig nettyServerConfig = new NettyServerConfig();// 设置端口nettyServerConfig.setListenPort(9876);// 如果启动命令中指定了配置文件if (commandLine.hasOption(‘c‘)) {String file = commandLine.getOptionValue(‘c‘);if (file != null) {// 获取文件InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);// 解析配置文件MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 如果启动命令中带了-p参数,打印NameServer的相关配置信息if (commandLine.hasOption(‘p‘)) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}// 将启动命令中的一些设置记录到namesrvConfigMixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 如果RocketMQ主目录获取为空,打印异常信息if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 日志相关设置LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 创建NamesrvControllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);controller.getConfiguration().registerConfig(properties);return controller;}}启动NameServer

NameServer的启动主要通过NamesrvController进行,处理逻辑如下:

调用NamesrvController的initialize函数进行初始化注册JVM关闭钩子函数,在JVM关闭的时候,调用NamesrvController的shutdown方法关闭相关资源调用NamesrvController的start方法启动NameServerpublic class NamesrvStartup {public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化NamesrvControllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注册JVM关闭钩子函数Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));// 启动nameservercontroller.start();return controller;}}NamesrvController初始化

NamesrvController的初始化方法中主要做了如下操作:

加载配置信息,主要是从kvConfig.json中加载数据创建NettyRemotingServer,用于网络通信根据设置的工作线程数量创建netty服务相关线程池注册处理器DefaultRequestProcessor,用于处理收到的请求,比如Broker发起的注册请求注册用于心跳检测的定时任务,定时扫描处于不活跃状态的Broker并剔除注册定时打印KV信息的任务public class NamesrvController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvConfig namesrvConfig; // NameServer相关配置private final NettyServerConfig nettyServerConfig; // Netty服务相关配置private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread")); // 定时执行任务的线程池private final KVConfigManager kvConfigManager; private final RouteInfoManager routeInfoManager; // 路由表private RemotingServer remotingServer; // 远程服务,使用的是NettyRemotingServerprivate BrokerHousekeepingService brokerHousekeepingService;private ExecutorService remotingExecutor; // Netty服务相关线程池// ...public boolean initialize() {// 加载配置信息this.kvConfigManager.load();// 创建NettyRemotingServerthis.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// 创建netty服务相关线程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 注册处理器this.registerProcessor();// 定时任务,扫描Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 心跳监测扫描处于不活跃状态的BrokerNamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 定时打印KV配置信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);// ....return true;}// 注册处理器private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {// 注册DefaultRequestProcessor处理请求this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}}}启动

在启动方法中,主要是调用了RemotingServer的start方法启动服务,在NamesrvController的初始化方法中可知,使用的实现类是NettyRemotingServer,所以之后会启动Netty服务:

public void start() throws Exception {// 启动Netty服务this.remotingServer.start();if (this.fileWatchService != null) {this.fileWatchService.start();}}Netty启动

NettyRemotingServer的start方法中主要是对Netty的一些设置,然后绑定端口并启动服务:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {@Overridepublic void start() {// ...prepareSharableHandlers();// Netty相关设置ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) // 设置EventLoopGroup线程组.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) // 设置channel类型.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 设置端口.childHandler(new ChannelInitializer() {@Overridepublic void initChannel(SocketChannel ch) throws Exception { // 设置ChannelHandlerch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());// 设置Socket发送缓存区大小childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());// 设置Socket接收缓存区大小childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 绑定端口并启动服务ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}}

参考丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

•••展开全文
相关文章