本文共 14409 字,大约阅读时间需要 48 分钟。
将最近阅读jstorm的源码笔记汇总一下,主要包括jstorm的task,jstorm网络通讯,jstorm限流部分的代码
task是storm中任务的实质,也就是业务逻辑的载体,首先Task实现了Runnable接口,那我们大致可以猜到task实际是在一个线程中不断了执行某些程序,看一下重写的run方法
public void run() { try { taskShutdownDameon = this.execute(); } catch (Throwable e) { LOG.error("init task error", e); if (reportErrorDie != null) { reportErrorDie.report(e); } else { throw new RuntimeException(e); } }}
run方法实质是对execute方法的一个封装,继续看execute方法
public TaskShutdownDameon execute() throws Exception { taskSendTargets = echoToSystemBolt(); // 创建taskTransfer实例,该实例是用于序列化元组(tuple)并通过netty发送出去的 taskTransfer = mkTaskSending(workerData); // 创建实际的task的执行逻辑的RunnableCallback实例 RunnableCallback baseExecutor = prepareExecutor(); setBaseExecutors((BaseExecutors) baseExecutor); // 创建业务逻辑的执行线程 AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, true); // 创建taskReceiver实例,该实例用于接受从元组(tuple)并解码后将它推入执行队列 taskReceiver = mkTaskReceiver(); ListallThreads = new ArrayList<>(); allThreads.add(executor_threads); LOG.info("Finished loading task " + componentId + ":" + taskId); taskShutdownDameon = getShutdown(allThreads, baseExecutor); return taskShutdownDameon;}
该方法返回的是一个TaskShutdownDameon实例,我们先看一下getShutdown方法
public TaskShutdownDameon getShutdown(ListallThreads, RunnableCallback baseExecutor) { AsyncLoopThread ackerThread; // 创建ack的执行线程,并添加到线程List里 if (baseExecutor instanceof SpoutExecutors) { ackerThread = ((SpoutExecutors) baseExecutor).getAckerRunnableThread(); if (ackerThread != null) { allThreads.add(ackerThread); } } // 将解码的执行线程添加到线程List里 List recvThreads = taskReceiver.getDeserializeThread(); for (AsyncLoopThread recvThread : recvThreads) { allThreads.add(recvThread); } // 将编码的执行线程添加到线程List里 List serializeThreads = taskTransfer.getSerializeThreads(); allThreads.addAll(serializeThreads); TaskHeartbeatTrigger taskHeartbeatTrigger = ((BaseExecutors) baseExecutor).getTaskHbTrigger(); // 创建TaskShutdownDameon线程 return new TaskShutdownDameon( taskStatus, topologyId, taskId, allThreads, zkCluster, taskObj, this, taskHeartbeatTrigger);}
可以看到该方法主要是将一些藏在其它实例里的线程给加入线程List里,然后构造了一个TaskShutdownDameon实例,该实例应该包含用于清理所有资源的方法,看一下TaskShutdownDameon,该类实现了ShutdownableDameon,而其重写的三个方法最主要的就是shutdown方法,该方法中做了资源的清理
@Overridepublic void shutdown() { if (isClosing.compareAndSet(false, true)) { LOG.info("Begin to shut down task " + topologyId + ":" + taskId); TopologyContext userContext = task.getUserContext(); // 清理钩子 for (ITaskHook iTaskHook : userContext.getHooks()) iTaskHook.cleanup(); // 根据task的类型(IBolt/ISpout)调用相应的清理操作 closeComponent(taskObj); // 更新心跳触发器的任务状态 taskHeartbeatTrigger.updateExecutorStatus(TaskStatus.SHUTDOWN); // wait 50ms for executor thread to shutdown to make sure to send shutdown info to TM try { Thread.sleep(50); } catch (InterruptedException ignored) { } // all thread will check the taskStatus // once it has been set to SHUTDOWN, it will quit // 更新任务状态 taskStatus.setStatus(TaskStatus.SHUTDOWN); // 将所有线程都进行清理 for (AsyncLoopThread thr : allThreads) { LOG.info("Begin to shutdown " + thr.getThread().getName()); thr.cleanup(); JStormUtils.sleepMs(10); thr.interrupt(); // try { // //thr.join(); // thr.getThread().stop(new RuntimeException()); // } catch (Throwable e) { // } LOG.info("Successfully shutdown " + thr.getThread().getName()); } taskHeartbeatTrigger.unregister(); LOG.info("Successfully shutdown task heartbeat trigger for task:{}", taskId); try { zkCluster.disconnect(); } catch (Exception e) { LOG.error("Failed to disconnect zk for task-" + taskId); } LOG.info("Successfully shutdown task " + topologyId + ":" + taskId); }}
至此jstorm的task大致也解析完了,接下来看一下task中除了处理业务的外最为重要的其它两个实例——taskTransfer与taskReceiver,看这两个类TaskTransfer和TaskReceiver,这两个类大致是差不多的,重要的都是类内部的两个实现了RunnableCallback的私有类。
TaskTransfer的内部类
protected class TransferRunnable extends RunnableCallback implements EventHandler { ... @Override public void run() { while (!shutdown.get()) { // 不断的消费事件 serializeQueue.multiConsumeBatchWhenAvailable(this); } } ... @Override public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { if (event == null) { return; } // 用于处理消费的事件的方法 serialize(serializer, event); } ...}
主要关注内部类里的两个方法,run和onEvent,run方法不断的用该类作为EventHandler消费事件,onEvent就是实现类EventHandler的具体方法,方法里调用了解码方法
看一下serialize方法
protected void serialize(KryoTupleSerializer serializer, Object event) { long start = serializeTimer.getTime(); try { ITupleExt tuple = (ITupleExt) event; int targetTaskId = tuple.getTargetTaskId(); // 获取根据taskId获取IConnection实例 IConnection conn = getConnection(targetTaskId); if (conn != null) { // 解码成byte数组 byte[] tupleMessage = serializer.serialize((TupleExt) tuple); //LOG.info("Task-{} sent msg to task-{}, data={}", task.getTaskId(), taskid, // JStormUtils.toPrintableString(tupleMessage)); // 创建能被定义的netty的解码器解码的消息并发送 TaskMessage taskMessage = new TaskMessage(taskId, targetTaskId, tupleMessage); conn.send(taskMessage); } else { LOG.error("Can not find connection for task-{}", targetTaskId); } } finally { if (MetricUtils.metricAccurateCal) { serializeTimer.updateTime(start); } }}
TaskReceiver的内部类
TaskReceiver的内部类重要的两个方法和上面的作用也差不多,只不过消费的是不同队列的事件而已,重点看一些onEvent里的反序列化方法
public void deserializeTuple(KryoTupleDeserializer deserializer, byte[] serMsg, DisruptorQueue queue) { // 解码 Tuple tuple = deserializer.deserialize(serMsg); if (tuple != null) { if (JStormDebugger.isDebugRecv(tuple.getMessageId())) { LOG.info(idStr + " receive " + tuple.toString()); } //queue.publish(tuple);将解码后的元组推送至执行的队列 if (isBackpressureEnable) { if (backpressureStatus) { while (queue.pctFull() > lowMark) { JStormUtils.sleepMs(1); } queue.publish(tuple); backpressureStatus = false; } else { queue.publish(tuple); if (queue.pctFull() > highMark) { backpressureStatus = true; } } } else { queue.publish(tuple); } }}
jstorm中网络通讯部分被抽象成一个IConnection接口,收发消息部分都实现了该接口,主要看它的三个实现类,NettyServer,NettyClient与NettyClientAsync。其中NettyClientAsync基础NettyClient。
NettyServer
主要看它的构造方法中重要的部分
... // 创建TCP监听线程池与IO工作线程池 ThreadFactory bossFactory = new NettyRenameThreadFactory("server" + "-boss"); ThreadFactory workerFactory = new NettyRenameThreadFactory("server" + "-worker"); if (maxWorkers > 0) { factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers); } else { factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory)); } // 设置TCP链接的选项 bootstrap = new ServerBootstrap(factory); bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.receiveBufferSize", buffer_size); bootstrap.setOption("child.keepAlive", true); // Set up the pipeline factory. 设置handler,handler里是真正的处理逻辑 bootstrap.setPipelineFactory(new StormServerPipelineFactory(this, stormConf));...
如何看一下设置handler的函数
public ChannelPipeline getPipeline() throws Exception { // Create a default pipeline implementation. ChannelPipeline pipeline = Channels.pipeline(); // Decoder pipeline.addLast("decoder", new MessageDecoder(true, conf)); // Encoder pipeline.addLast("encoder", new MessageEncoder()); // business logic. pipeline.addLast("handler", new StormServerHandler(server)); return pipeline;}
前两个是消息的序列化以及反序列化,最后一个才是处理函数,看一下StormServerHandler,server的handler最主要的还是看其接受到消息后做了什么,那就看其复写的messageReceived方法
@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Object msg = e.getMessage(); if (msg == null) return; // end of batch? if (msg == ControlMessage.EOB_MESSAGE) { return; } else if (msg instanceof ControlMessage) { // LOG.debug("Receive ...{}", msg); return; } // enqueue the received message for processing try { // 重点逻辑 server.enqueue((TaskMessage) msg, ctx.getChannel()); } catch (Exception e1) { LOG.warn("Failed to enqueue a request message " + e.toString(), e1); // Channel channel = ctx.getChannel(); // incFailureCounter(channel); }}
其它的都是废话,重点逻辑就是server.enqueue((TaskMessage) msg, ctx.getChannel());
这句
看enqueue方法
public void enqueue(TaskMessage message, Channel channel) { if (isClosed()) { return; } // lots of messages may be lost when deserialize queue hasn't finished init operation,做循环直到所有任务对应的DisruptorQueue都初始化成功 while (!bstartRec) { LOG.debug("check whether deserialize queues have already been created"); boolean isFinishInit = true; for (Integer task : workerTasks) { if (deserializeQueues.get(task) == null) { isFinishInit = false; JStormUtils.sleepMs(10); break; } } if (isFinishInit) { bstartRec = isFinishInit; } } short type = message.get_type(); // 正常消息,推入任务对应都解码queue,随后会被TaskReceiver都消费线程读取出来并消费掉 if (type == TaskMessage.NORMAL_MESSAGE) { // enqueue a received message int task = message.task(); DisruptorQueue queue = deserializeQueues.get(task); if (queue == null) { LOG.warn("Received invalid message directed at task {}. Dropping...", task); LOG.debug("Message data: {}", JStormUtils.toPrintableString(message.message())); return; } if (!isBackpressureEnable) { queue.publish(message.message()); } else { // 如果激活了反压配置 flowCtrlHandler.flowCtrl(channel, queue, task, message.message()); } // 控制消息,推入控制队列 } else if (type == TaskMessage.CONTROL_MESSAGE) { // enqueue a control message if (recvControlQueue == null) { LOG.info("Can not find the recvControlQueue. Dropping this control message"); return; } recvControlQueue.publish(message); } else { LOG.warn("Unexpected message (type={}) was received from task {}", type, message.task()); }}
该方法大致逻辑也就是将消息根据消息类型以及消息所属的taskId推入相应的队列,不过要主要一该变量isBackpressureEnable
,该变量是控制jstorm是否开启里反压的,我们看一下做反压的方法是怎么做的
首先我们看一下jstorm的限流机制
当task出现阻塞时,他会将自己的执行线程的执行时间, 传给topology master, 当触发阻塞后, topology master会把这个执行时间传给spout, 于是, spout每发送一个tuple,就会等待这个执行时间。storm 社区的人想通过动态调整max_pending达到这种效果,其实这种做法根本无效。————摘自jstorm文档
public void flowCtrl(Channel channel, DisruptorQueue queue, int taskId, byte[] message) { boolean initFlowCtrl = false; final SetremoteAddrs = flowCtrlClients.get(taskId); synchronized (remoteAddrs) { // 如果remoteAddrs不为空,代表这个task已经处于压力状态 if (!remoteAddrs.isEmpty()) { // 消息发送进入队列的cache里 queue.publishCache(message); // 将消息的来源地址添加进入set里,如果set里不存在过该地址,则也将反压消息传递给消息源地址 if(remoteAddrs.add(channel.getRemoteAddress().toString())) JStormUtils.sendFlowControlRequest(channel, taskId, 1); } else { queue.publish(message); // 如果队列里的水位超过了指定的水位,则发送反压消息 if (queue.pctFull() > highMark) { remoteAddrs.add(channel.getRemoteAddress().toString()); JStormUtils.sendFlowControlRequest(channel, taskId, 1); initFlowCtrl = true; } } } if (initFlowCtrl) // 将一个callback对象加入队列,这一步是很重要的,具体有什么作用,到了后面就知道了 queue.publishCallback(new BackpressureCallback(this, taskId));}
然后我们看一下限流解除
当spout降速后, 发送过阻塞命令的task 检查队列水位连续4次低于0.05时, 发送解除反应命令到topology master, topology master 发送提速命令给所有的spout, 于是spout 每发送一个tuple的等待时间--, 当spout的等待时间降为0时, spout会不断发送“解除限速”命令给 topology master, 而topology master确定所有的降速的spout都发了解除限速命令时, 将topology状态设置为正常,标志真正解除限速————摘自jstorm文档
NettyServerFlowCtrlHandler类中维持了一个线程来发送反压解除消息
private class FlowCtrlChecker implements Runnable { public void run() { while(true) { int taskId; try { // 从队列中获取要解除反压的taskId taskId = eventQueue.take(); // 根据taskId获取要发送反压解除消息的远程地址 SetremoteAddrs = flowCtrlClients.get(taskId); if (remoteAddrs == null) continue; // 依次发送反压消息,并清理远程地址的set synchronized (remoteAddrs) { if (!remoteAddrs.isEmpty()) { for (String remoteAddr : remoteAddrs) { Channel channel = allChannels.getChannel(remoteAddr); if (channel == null) { continue; } // send back backpressure flow control request to source client JStormUtils.sendFlowControlRequest(channel, taskId, 0); } remoteAddrs.clear(); } } } catch (InterruptedException e) { LOG.warn("Failed to take flow control event", e); } } }}
NettyServerFlowCtrlHandler类提供了一个叫releaseFlowCtrl
的方法用于将taskId推入eventQueue
,所以大致可以了解task中有一个地方在检测task队列的水位,并且在水位低于低水位时解除反压。接下来查找一下该方法在哪里被调用。该方法唯一被调用的地方就是BackpressureCallback的回调方法中,现在知道刚刚flowCtrl
方法最后的在队列里添加了一个callback有什么用了。其实很好理解,关于队列的callback,唯一会被调用的地方就是在获取/消费了队列中的事件之后,而会造成水位会下降的只有获取/消费队列中的事件,所以将检测task的队列以及释放反压至于callback中很合理,而不用像我一开始想的那样启动一个线程,如果不断检查,那样大大浪费了计算资源
转载地址:http://ligmi.baihongyu.com/