博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
jstorm源码阅读汇总(一)
阅读量:4223 次
发布时间:2019-05-26

本文共 14409 字,大约阅读时间需要 48 分钟。

jstorm源码阅读汇总(一)

将最近阅读jstorm的源码笔记汇总一下,主要包括jstorm的task,jstorm网络通讯,jstorm限流部分的代码

jstrorm task

task是storm中任务的实质,也就是业务逻辑的载体,首先Task实现了Runnable接口,那我们大致可以猜到task实际是在一个线程中不断了执行某些程序,看一下重写的run方法

public void run() {    try {        taskShutdownDameon = this.execute();    } catch (Throwable e) {        LOG.error("init task error", e);        if (reportErrorDie != null) {            reportErrorDie.report(e);        } else {            throw new RuntimeException(e);        }    }}

run方法实质是对execute方法的一个封装,继续看execute方法

public TaskShutdownDameon execute() throws Exception {    taskSendTargets = echoToSystemBolt();    // 创建taskTransfer实例,该实例是用于序列化元组(tuple)并通过netty发送出去的    taskTransfer = mkTaskSending(workerData);    // 创建实际的task的执行逻辑的RunnableCallback实例    RunnableCallback baseExecutor = prepareExecutor();    setBaseExecutors((BaseExecutors) baseExecutor);    // 创建业务逻辑的执行线程    AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, true);    // 创建taskReceiver实例,该实例用于接受从元组(tuple)并解码后将它推入执行队列    taskReceiver = mkTaskReceiver();    List
allThreads = new ArrayList<>(); allThreads.add(executor_threads); LOG.info("Finished loading task " + componentId + ":" + taskId); taskShutdownDameon = getShutdown(allThreads, baseExecutor); return taskShutdownDameon;}

该方法返回的是一个TaskShutdownDameon实例,我们先看一下getShutdown方法

public TaskShutdownDameon getShutdown(List
allThreads, RunnableCallback baseExecutor) { AsyncLoopThread ackerThread; // 创建ack的执行线程,并添加到线程List里 if (baseExecutor instanceof SpoutExecutors) { ackerThread = ((SpoutExecutors) baseExecutor).getAckerRunnableThread(); if (ackerThread != null) { allThreads.add(ackerThread); } } // 将解码的执行线程添加到线程List里 List
recvThreads = taskReceiver.getDeserializeThread(); for (AsyncLoopThread recvThread : recvThreads) { allThreads.add(recvThread); } // 将编码的执行线程添加到线程List里 List
serializeThreads = taskTransfer.getSerializeThreads(); allThreads.addAll(serializeThreads); TaskHeartbeatTrigger taskHeartbeatTrigger = ((BaseExecutors) baseExecutor).getTaskHbTrigger(); // 创建TaskShutdownDameon线程 return new TaskShutdownDameon( taskStatus, topologyId, taskId, allThreads, zkCluster, taskObj, this, taskHeartbeatTrigger);}

可以看到该方法主要是将一些藏在其它实例里的线程给加入线程List里,然后构造了一个TaskShutdownDameon实例,该实例应该包含用于清理所有资源的方法,看一下TaskShutdownDameon,该类实现了ShutdownableDameon,而其重写的三个方法最主要的就是shutdown方法,该方法中做了资源的清理

@Overridepublic void shutdown() {    if (isClosing.compareAndSet(false, true)) {        LOG.info("Begin to shut down task " + topologyId + ":" + taskId);        TopologyContext userContext = task.getUserContext();        // 清理钩子        for (ITaskHook iTaskHook : userContext.getHooks())            iTaskHook.cleanup();        // 根据task的类型(IBolt/ISpout)调用相应的清理操作        closeComponent(taskObj);        // 更新心跳触发器的任务状态        taskHeartbeatTrigger.updateExecutorStatus(TaskStatus.SHUTDOWN);        // wait 50ms for executor thread to shutdown to make sure to send shutdown info to TM        try {            Thread.sleep(50);        } catch (InterruptedException ignored) {        }        // all thread will check the taskStatus        // once it has been set to SHUTDOWN, it will quit        // 更新任务状态        taskStatus.setStatus(TaskStatus.SHUTDOWN);        // 将所有线程都进行清理        for (AsyncLoopThread thr : allThreads) {            LOG.info("Begin to shutdown " + thr.getThread().getName());            thr.cleanup();            JStormUtils.sleepMs(10);            thr.interrupt();            // try {            // //thr.join();            // thr.getThread().stop(new RuntimeException());            // } catch (Throwable e) {            // }            LOG.info("Successfully shutdown " + thr.getThread().getName());        }        taskHeartbeatTrigger.unregister();        LOG.info("Successfully shutdown task heartbeat trigger for task:{}", taskId);        try {            zkCluster.disconnect();        } catch (Exception e) {            LOG.error("Failed to disconnect zk for task-" + taskId);        }        LOG.info("Successfully shutdown task " + topologyId + ":" + taskId);    }}

至此jstorm的task大致也解析完了,接下来看一下task中除了处理业务的外最为重要的其它两个实例——taskTransfer与taskReceiver,看这两个类TaskTransfer和TaskReceiver,这两个类大致是差不多的,重要的都是类内部的两个实现了RunnableCallback的私有类。

  • TaskTransfer的内部类

    protected class TransferRunnable extends RunnableCallback implements EventHandler {
    ... @Override public void run() { while (!shutdown.get()) { // 不断的消费事件 serializeQueue.multiConsumeBatchWhenAvailable(this); } } ... @Override public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { if (event == null) { return; } // 用于处理消费的事件的方法 serialize(serializer, event); } ...}

    主要关注内部类里的两个方法,run和onEvent,run方法不断的用该类作为EventHandler消费事件,onEvent就是实现类EventHandler的具体方法,方法里调用了解码方法

    看一下serialize方法

    protected void serialize(KryoTupleSerializer serializer, Object event) {    long start = serializeTimer.getTime();    try {        ITupleExt tuple = (ITupleExt) event;        int targetTaskId = tuple.getTargetTaskId();        // 获取根据taskId获取IConnection实例        IConnection conn = getConnection(targetTaskId);        if (conn != null) {            // 解码成byte数组            byte[] tupleMessage = serializer.serialize((TupleExt) tuple);            //LOG.info("Task-{} sent msg to task-{}, data={}", task.getTaskId(), taskid,            // JStormUtils.toPrintableString(tupleMessage));            // 创建能被定义的netty的解码器解码的消息并发送            TaskMessage taskMessage = new TaskMessage(taskId, targetTaskId, tupleMessage);            conn.send(taskMessage);        } else {            LOG.error("Can not find connection for task-{}", targetTaskId);        }    } finally {        if (MetricUtils.metricAccurateCal) {            serializeTimer.updateTime(start);        }    }}
  • TaskReceiver的内部类

    TaskReceiver的内部类重要的两个方法和上面的作用也差不多,只不过消费的是不同队列的事件而已,重点看一些onEvent里的反序列化方法

    public void deserializeTuple(KryoTupleDeserializer deserializer, byte[] serMsg, DisruptorQueue queue) {    // 解码    Tuple tuple = deserializer.deserialize(serMsg);    if (tuple != null) {        if (JStormDebugger.isDebugRecv(tuple.getMessageId())) {            LOG.info(idStr + " receive " + tuple.toString());        }        //queue.publish(tuple);将解码后的元组推送至执行的队列        if (isBackpressureEnable) {            if (backpressureStatus) {                while (queue.pctFull() > lowMark) {                    JStormUtils.sleepMs(1);                }                queue.publish(tuple);                backpressureStatus = false;            } else {                queue.publish(tuple);                if (queue.pctFull() > highMark) {                    backpressureStatus = true;                }            }        } else {            queue.publish(tuple);        }    }}

jstorm的网络通讯部分(netty)

jstorm中网络通讯部分被抽象成一个IConnection接口,收发消息部分都实现了该接口,主要看它的三个实现类,NettyServer,NettyClient与NettyClientAsync。其中NettyClientAsync基础NettyClient。

  • NettyServer

    主要看它的构造方法中重要的部分

    ...    // 创建TCP监听线程池与IO工作线程池    ThreadFactory bossFactory = new NettyRenameThreadFactory("server" + "-boss");    ThreadFactory workerFactory = new NettyRenameThreadFactory("server" + "-worker");    if (maxWorkers > 0) {        factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers);    } else {        factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));    }    // 设置TCP链接的选项    bootstrap = new ServerBootstrap(factory);    bootstrap.setOption("reuseAddress", true);    bootstrap.setOption("child.tcpNoDelay", true);    bootstrap.setOption("child.receiveBufferSize", buffer_size);    bootstrap.setOption("child.keepAlive", true);    // Set up the pipeline factory. 设置handler,handler里是真正的处理逻辑    bootstrap.setPipelineFactory(new StormServerPipelineFactory(this, stormConf));...

    如何看一下设置handler的函数

    public ChannelPipeline getPipeline() throws Exception {    // Create a default pipeline implementation.    ChannelPipeline pipeline = Channels.pipeline();    // Decoder    pipeline.addLast("decoder", new MessageDecoder(true, conf));    // Encoder    pipeline.addLast("encoder", new MessageEncoder());    // business logic.    pipeline.addLast("handler", new StormServerHandler(server));    return pipeline;}

    前两个是消息的序列化以及反序列化,最后一个才是处理函数,看一下StormServerHandler,server的handler最主要的还是看其接受到消息后做了什么,那就看其复写的messageReceived方法

    @Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {    Object msg = e.getMessage();    if (msg == null)        return;    // end of batch?    if (msg == ControlMessage.EOB_MESSAGE) {        return;    } else if (msg instanceof ControlMessage) {        // LOG.debug("Receive ...{}", msg);        return;    }    // enqueue the received message for processing    try {        // 重点逻辑        server.enqueue((TaskMessage) msg, ctx.getChannel());    } catch (Exception e1) {        LOG.warn("Failed to enqueue a request message " + e.toString(), e1);        // Channel channel = ctx.getChannel();        // incFailureCounter(channel);    }}

    其它的都是废话,重点逻辑就是server.enqueue((TaskMessage) msg, ctx.getChannel()); 这句

    看enqueue方法

    public void enqueue(TaskMessage message, Channel channel) {    if (isClosed()) {        return;    }    // lots of messages may be lost when deserialize queue hasn't finished init operation,做循环直到所有任务对应的DisruptorQueue都初始化成功    while (!bstartRec) {        LOG.debug("check whether deserialize queues have already been created");        boolean isFinishInit = true;        for (Integer task : workerTasks) {            if (deserializeQueues.get(task) == null) {                isFinishInit = false;                JStormUtils.sleepMs(10);                break;            }        }        if (isFinishInit) {            bstartRec = isFinishInit;        }    }    short type = message.get_type();    // 正常消息,推入任务对应都解码queue,随后会被TaskReceiver都消费线程读取出来并消费掉    if (type == TaskMessage.NORMAL_MESSAGE) {        // enqueue a received message        int task = message.task();        DisruptorQueue queue = deserializeQueues.get(task);        if (queue == null) {            LOG.warn("Received invalid message directed at task {}. Dropping...", task);            LOG.debug("Message data: {}", JStormUtils.toPrintableString(message.message()));            return;        }        if (!isBackpressureEnable) {            queue.publish(message.message());        } else {            // 如果激活了反压配置            flowCtrlHandler.flowCtrl(channel, queue, task, message.message());        }        // 控制消息,推入控制队列    } else if (type == TaskMessage.CONTROL_MESSAGE) {        // enqueue a control message        if (recvControlQueue == null) {            LOG.info("Can not find the recvControlQueue. Dropping this control message");            return;        }        recvControlQueue.publish(message);    } else {        LOG.warn("Unexpected message (type={}) was received from task {}", type, message.task());    }}

    该方法大致逻辑也就是将消息根据消息类型以及消息所属的taskId推入相应的队列,不过要主要一该变量isBackpressureEnable,该变量是控制jstorm是否开启里反压的,我们看一下做反压的方法是怎么做的

    首先我们看一下jstorm的限流机制

    当task出现阻塞时,他会将自己的执行线程的执行时间, 传给topology master, 当触发阻塞后, topology master会把这个执行时间传给spout, 于是, spout每发送一个tuple,就会等待这个执行时间。storm 社区的人想通过动态调整max_pending达到这种效果,其实这种做法根本无效。————摘自jstorm文档
    public void flowCtrl(Channel channel, DisruptorQueue queue, int taskId, byte[] message) {    boolean initFlowCtrl = false;    final Set
    remoteAddrs = flowCtrlClients.get(taskId); synchronized (remoteAddrs) { // 如果remoteAddrs不为空,代表这个task已经处于压力状态 if (!remoteAddrs.isEmpty()) { // 消息发送进入队列的cache里 queue.publishCache(message); // 将消息的来源地址添加进入set里,如果set里不存在过该地址,则也将反压消息传递给消息源地址 if(remoteAddrs.add(channel.getRemoteAddress().toString())) JStormUtils.sendFlowControlRequest(channel, taskId, 1); } else { queue.publish(message); // 如果队列里的水位超过了指定的水位,则发送反压消息 if (queue.pctFull() > highMark) { remoteAddrs.add(channel.getRemoteAddress().toString()); JStormUtils.sendFlowControlRequest(channel, taskId, 1); initFlowCtrl = true; } } } if (initFlowCtrl) // 将一个callback对象加入队列,这一步是很重要的,具体有什么作用,到了后面就知道了 queue.publishCallback(new BackpressureCallback(this, taskId));}

    然后我们看一下限流解除

    当spout降速后, 发送过阻塞命令的task 检查队列水位连续4次低于0.05时, 发送解除反应命令到topology master, topology master 发送提速命令给所有的spout, 于是spout 每发送一个tuple的等待时间--, 当spout的等待时间降为0时, spout会不断发送“解除限速”命令给 topology master, 而topology master确定所有的降速的spout都发了解除限速命令时, 将topology状态设置为正常,标志真正解除限速————摘自jstorm文档

    NettyServerFlowCtrlHandler类中维持了一个线程来发送反压解除消息

    private class FlowCtrlChecker implements Runnable {
    public void run() { while(true) { int taskId; try { // 从队列中获取要解除反压的taskId taskId = eventQueue.take(); // 根据taskId获取要发送反压解除消息的远程地址 Set
    remoteAddrs = flowCtrlClients.get(taskId); if (remoteAddrs == null) continue; // 依次发送反压消息,并清理远程地址的set synchronized (remoteAddrs) { if (!remoteAddrs.isEmpty()) { for (String remoteAddr : remoteAddrs) { Channel channel = allChannels.getChannel(remoteAddr); if (channel == null) { continue; } // send back backpressure flow control request to source client JStormUtils.sendFlowControlRequest(channel, taskId, 0); } remoteAddrs.clear(); } } } catch (InterruptedException e) { LOG.warn("Failed to take flow control event", e); } } }}

    NettyServerFlowCtrlHandler类提供了一个叫releaseFlowCtrl的方法用于将taskId推入eventQueue,所以大致可以了解task中有一个地方在检测task队列的水位,并且在水位低于低水位时解除反压。接下来查找一下该方法在哪里被调用。该方法唯一被调用的地方就是BackpressureCallback的回调方法中,现在知道刚刚flowCtrl方法最后的在队列里添加了一个callback有什么用了。其实很好理解,关于队列的callback,唯一会被调用的地方就是在获取/消费了队列中的事件之后,而会造成水位会下降的只有获取/消费队列中的事件,所以将检测task的队列以及释放反压至于callback中很合理,而不用像我一开始想的那样启动一个线程,如果不断检查,那样大大浪费了计算资源

转载地址:http://ligmi.baihongyu.com/

你可能感兴趣的文章
Kafka 入门三问
查看>>
c/c++ 内存泄漏检测,开源工具valgrind使用整理
查看>>
h264 sps pps详解
查看>>
AAC的ADTS头信息介绍
查看>>
Coroutine,你究竟干了什么?
查看>>
代码宏的一点小知识
查看>>
Sweet Snippet系列 之 随机选择
查看>>
名人•牛人•我们这些普通人
查看>>
小话游戏脚本(一)
查看>>
使用VS2010在项目中编写C++头文现出"PCH 警告:标头停止点不能位于宏或#if块中"
查看>>
统计源期刊
查看>>
多线程解码并保存为yuv
查看>>
使用信号量控制线程执行顺序,进而控制不同视频流的解码顺序
查看>>
解码单个视频及保存yuv数据到文件中
查看>>
为什么基类中的析构函数要声明为虚析构函数?
查看>>
对象切割 - 常量引用传递
查看>>
北邮同学面经
查看>>
Effective C++条款16:成对使用new和delete时要采取相同形式
查看>>
sizeof与strlen
查看>>
一个递归+二分法的洗牌程序
查看>>