Mina框架IoSession详解

通过Mina官 网文档,我们可以看到,有如下几个状态:

  • Connected : the session has been created and is available
  • Idle : the session hasn’t processed any request for at least a period of time (this period is configurable)Closing : the session is being closed (the remaining messages are being flushed, cleaning up is not terminated)
    • Idle for read : no read has actually been made for a period of time
    • Idle for write : no write has actually been made for a period of time
    • Idle for both : no read nor write for a period of time
  • Closed : The session is now closed, nothing else can be done to revive it.

对应的状态迁移图,如图所示:
session-state
通过上面的状态图,我们可以看出,是哪个事件的发生使得IoSession进入哪个状态,比较直观明了。下面,我们看一下IoSession对应的设计,类继承关系如下所示:
Diagram_Mina_IoSession
对于IoSession接口类,我在上图把具有不同类型功能的操作进行了分类,说明如下:

  • 一个IoSession实例可以访问/持有哪些数据:前半部分以get开头的方法,能够返回对应的数据对象。
  • 一个IoSession实例可以检测哪些状态数据:中间部分以is开头的一些方法。
  • 一个IoSession实例可以执行哪些方法调用:后半部分以动词开头命名的方法。
  • 一个IoSession实例还可以获取通信过程相关的统计信息,如读取字节数/消息数、写入字节数/消息数,等等,在上面类图中省略 了这些方法。

可见,IoSession在Mina框架中的位置是相当重要的。

根据上面的类图,我们分析一下NioSocketSession类的源代码。
AbstractIoSession实现了IoSession接口中定义的大多数方法,我们关注读和写两个重要的方法,因为他们最终也被NioSocketSession类所继承。
先看读数据请求方法read,如下所示:

    public final ReadFuture read() {
        if (!getConfig().isUseReadOperation()) {
            throw new IllegalStateException("useReadOperation is not enabled.");
        }

        Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); // 获取到read请求就绪队列
        ReadFuture future;
        synchronized (readyReadFutures) { // 这个对就绪队列是共享的,对于读请求之间需要进行同步
            future = readyReadFutures.poll(); // 出队
            if (future != null) { // 如果队列中有就绪的read请求
                if (future.isClosed()) { // 如果与该IoSession相关的ReadFuture已经关闭(读取完成)
                    readyReadFutures.offer(future); // 还要将这个ReadFuture放入到队列,等待该IoSession下次可能的读请求
                }
            } else {
                future = new DefaultReadFuture(this); // 如果是与该IoSession相关的第一次读请求,目前读就绪队列肯定没有一个ReadFuture实例,则需要创建一个
                getWaitingReadFutures().offer(future); // 将新创建的ReadFuture实例放入到等待读队列
            }
        }

        return future; // 返回一个ReadFuture实例,无论是第一次发出读请求,还是上一次读请求已经完成,对于本次读请求都将返回这个ReadFuture实例
    }

再看一下,写数据请求方法write,如下所示:

    public WriteFuture write(Object message, SocketAddress remoteAddress) {
        if (message == null) {
            throw new IllegalArgumentException("Trying to write a null message : not allowed");
        }

        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
            throw new UnsupportedOperationException();
        }

        if (isClosing() || !isConnected()) { // 如果该次会话正在关闭,或者就没有连接过,则封装一个异常返回一个WriteFuture对象
            WriteFuture future = new DefaultWriteFuture(this);
            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
            WriteException writeException = new WriteToClosedSessionException(request);
            future.setException(writeException);
            return future;
        }

        FileChannel openedFileChannel = null;

        try {
            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { // 没有写任何数据
                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
            } else if (message instanceof FileChannel) {
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); // 如果是FileChannel,则创建一个DefaultFileRegion对象,用来被Mina操纵
            } else if (message instanceof File) {
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();
                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); // 如果是File,则创建FilenameFileRegion
            }
        } catch (IOException e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }

        // 可以写message了,做写前准备
        WriteFuture writeFuture = new DefaultWriteFuture(this); 
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);

        // Then, get the chain and inject the WriteRequest into it
        IoFilterChain filterChain = getFilterChain(); // 获取到与该IoSession相关的IoFilterChain,方法getFilterChain实现可以看NioSocketSession类中的实现:filterChain = new DefaultIoFilterChain(this);
        filterChain.fireFilterWrite(writeRequest); // 触发写事件,将WriteRequest注入到IoFilter实例链,执行注册的IoFilter的逻辑

        // 不关心FileChannel的操作,不进行处理,直接关闭掉
        if (openedFileChannel != null) {
            final FileChannel finalChannel = openedFileChannel;
            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }

        return writeFuture; // 返回WriteFuture,等待写操作异步完成
    }

再看,NioSession类中增加了一个返回IoProcessor实例的抽象方法,而这个IoProcessor是在创建一个IoSession实例(例如,可以实例化一个NioSocketSession)的时候,由外部传到IoSession内部。我们知道,IoProcessor是Mina框架底层真正用来处理实际I/O操作的处理器,通过一个IoSession实例获取一个IoProcessor,可以方便地响应作用于IoSession的I/O读写请求,从而由这个IoProcessor直接去处理。
根据Mina框架架构设计,IoService->IoFilter Chain->IoHandler,我们知道在IoFilter Chain的一端(头部)之前会调用处理实际的I/O操作请求,也就是IoProcessor需要处理的逻辑,那么可以想象到,IoProcessor被调用的位置,可以查看org.apache.mina.core.filterchain.DefaultIoFilterChain类的源代码,其中定义了一个内部类,源码如下所示:

private class HeadFilter extends IoFilterAdapter {
	@SuppressWarnings("unchecked")
	@Override
	public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {

		AbstractIoSession s = (AbstractIoSession) session;

		// Maintain counters.
		if (writeRequest.getMessage() instanceof IoBuffer) {
			IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
			// I/O processor implementation will call buffer.reset()
			// it after the write operation is finished, because
			// the buffer will be specified with messageSent event.
			buffer.mark();
			int remaining = buffer.remaining();

			if (remaining == 0) {
				// Zero-sized buffer means the internal message
				// delimiter.
				s.increaseScheduledWriteMessages();
			} else {
				s.increaseScheduledWriteBytes(remaining);
			}
		} else {
			s.increaseScheduledWriteMessages();
		}

		WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();

		if (!s.isWriteSuspended()) {
			if (writeRequestQueue.size() == 0) {
				// We can write directly the message
				s.getProcessor().write(s, writeRequest);
			} else {
				s.getWriteRequestQueue().offer(s, writeRequest);
				s.getProcessor().flush(s);
			}
		} else {
			s.getWriteRequestQueue().offer(s, writeRequest);
		}
	}

	@SuppressWarnings("unchecked")
	@Override
	public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
		((AbstractIoSession) session).getProcessor().remove(session);
	}
}

最后,我们看一下NioSocketSession实例被创建的时机。其实很容易想到,当一次网络通信开始的时候,也就是客户端连接到服务器端的时候,服务器端首先进行accept,这时候一次会话才被启动,也就是在这个是被创建,拿Mina中的NioSocketAcceptor类来看,创建NioSocketSession的代码,如下所示:

    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

        SelectionKey key = handle.keyFor(selector);

        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }

        // accept the connection from the client
        SocketChannel ch = handle.accept();

        if (ch == null) {
            return null;
        }

        return new NioSocketSession(this, processor, ch); // 创建NioSocketSession实例
    }

通过上面的分析,我们可知,IoSession在基于Mina进行网络通信的过程中,对于网络通信相关操作的请求都是基于一个IoSession实例来进行的,所以说,IoSession在Mina中是一个很重要的结构。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>