IoService是对通信双方所进行的I/O操作的抽象,那么无论是在服务器端还是在客户端,都要进行I/O的读写操作,它们有一些共性,可以抽象出来。这里,我们主要详细说明IoAccectpr和IoConnector以及所基于的IoService抽象服务,都提供哪些操作和数据结构,都是如何构建的。首先,提供一个IoService服务接口相关的继承层次关系的类图,如图所示:
最终使用的Acceptor和Connector是上面继承层次中最下层的实现类。
IoService抽象
实际上,支持I/O操作服务的内容,集中在两个类中:IoService和AbstractIoService,看一下类图:
根据上图中IoService接口定义,我们给出接口中定义的方法,如下所示:
public interface IoService { void addListener(IoServiceListener listener); void removeListener(IoServiceListener listener); boolean isDisposing(); boolean isDisposed(); void dispose(); void dispose(boolean awaitTermination); IoHandler getHandler(); void setHandler(IoHandler handler); Map<Long, IoSession> getManagedSessions(); int getManagedSessionCount(); IoSessionConfig getSessionConfig(); IoFilterChainBuilder getFilterChainBuilder(); void setFilterChainBuilder(IoFilterChainBuilder builder); DefaultIoFilterChainBuilder getFilterChain(); boolean isActive(); long getActivationTime(); Set<WriteFuture> broadcast(Object message); IoSessionDataStructureFactory getSessionDataStructureFactory(); void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory); int getScheduledWriteBytes(); int getScheduledWriteMessages(); IoServiceStatistics getStatistics(); }
我们可以看到,IoService主要定义了两类服务,一类是提供I/O操作相关服务,另一类是会话 (IoSession)相关服务,这两类服务,无论是在服务端还是在客户端,都会提供,以此来保证双方通信。那么,具体地这两类服务中都包括哪些内容,我们总结如下:
- 管理IoService元数据,描述IoService本身,这些元数据都封装在TransportMetadata中,例如I/O 服务类型(如NIO,APR或RXTX),连接类型(如无连接接),地址类型等。
- 管理IoServiceListener,它是用来监听与一个IoService服务相关的事件的,比如服务的激活、会话的建立等 等,当然,这些监听服务不是提供给外部进行开发使用的,而是Mina内部使用的。
- 管理IoHandler,从Mina框架的架构我们知道,IoHandler的具体实现是与业务逻辑处理相关的,也是最靠近应用层的。
- 管理IoSession,即管理与一个IoService服务交互的会话对象,可以有一组会话同时使用该IoService服务。
- 管理IoFilter链,IoFilter链基于事件拦截模式,它位于IoHandler与IoService两层之间,Mina为 了方便使用IoFilter链,直接内置了一个IoFilterChainBuilder(具体实现为 DefaultIoFilterChainBuilder)。
- 管理一些相关的统计信息,如读写字节数、读写消息数、读写时间等。
上面类图中,AbstractIoService实现了IoService接口中定义的操作,同时增加了一些属性字段,可以通过这些字段看出,Mina框架IoService抽象服务层设计了哪些数据结构,用来辅助有关I/O操作的服务。我们通过如下几个方面来详述:
- IoServiceListener列表
管理服务于IoService的IoServiceListener,主要是通过IoServiceListenerSupport类,这 个类中定义了如下结构:
private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>(); private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>(); private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
当我们创建一个IoService实例时,可能是服务器端的IoAccectpr,也可能是客户端的IoConnector,可以分别通过调用如下两个方法来增加或者移除一个IoServiceListener:
void addListener(IoServiceListener listener); void removeListener(IoServiceListener listener);
一个IoServiceListener定义如下操作:
public interface IoServiceListener extends EventListener { void serviceActivated(IoService service) throws Exception; void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception; void serviceDeactivated(IoService service) throws Exception; void sessionCreated(IoSession session) throws Exception; void sessionDestroyed(IoSession session) throws Exception; }
通过接口中定义的方法名,可以了解到,一个IoService监听器都负责监听哪些事件。
- 构建IoFilter链
就像上面IoServiceListener与IoServiceListenerSupport的关系一样,IoFilter是通过另一 个工具类IoFilterChainBuilder来聚合起来,形成一个IoFilter链。通过实现IoFilterChainBuilder 接口的DefaultIoFilterChainBuilder可以对一组IoFilter进行创建。包含的数据结构如下所示:
private final List<Entry> entries; public DefaultIoFilterChainBuilder() { entries = new CopyOnWriteArrayList<Entry>(); }
其中Entry包装了一个IoFilter以及为其定义的名称。从IoFilterChainBuilder的名称来看,它只是关注一个 IoFilterChain如何创建,而不关心一组注册的IoFilter调用顺序,也不关心被指定事件被触发时调用哪个操作,这些逻辑是由 IoFilterChain来定义,并通过实现这个接口的DefaultIoFilterChain类实现的。当我们调用DefaultIoFilterChainBuilder 实例的有关操作IoFilter的方法,如下所示(在DefaultIoFilterChainBuilder中实现):
public synchronized void addFirst(String name, IoFilter filter); public synchronized void addLast(String name, IoFilter filter); public synchronized void addBefore(String baseName, String name, IoFilter filter); public synchronized void addAfter(String baseName, String name, IoFilter filter);
实际上最终在调用构建的方法buildFilterChain的时候,将已经组织到DefaultIoFilterChainBuilder 实例中的多个IoFilter实例添加到已经构造的IoFilterChain中(如默认的DefaultIoFilterChain),一 个IoFilterChain实例可以在IoService实例运行时被使用,下面是buildFilterChain方法的逻辑:
public void buildFilterChain(IoFilterChain chain) throws Exception { for (Entry e : entries) { chain.addLast(e.getName(), e.getFilter()); } }
也就是说,IoFilterChainBuilder是供使用Mina框架的开发网络应用程序的人员组织IoFilter链的,它只是一个运行前构建工具;而IoFilterChain是Mina框架运行服务所需要的,即是一个运行时辅助管理IoFilter链调用的工具。
- IoSession内存数据结构
每当有一个新的会话被创建,及使用了IoService提供的服务,就对应创建了一个IoSession实例,而且,与IoSession 相关的一些实时数据需要在内存中保存,以便IoService实例能够随时访问并对该会话实例提供需要的I/O读写服务。Mina定义了 IoSessionDataStructureFactory,来保存会话相关数据,这个结构提供了如下两个方法:
public interface IoSessionDataStructureFactory { IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception; WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception; }
可以看出,上面方法中的IoSessionAttributeMap和WriteRequestQueue都是与一个IoSession相 关的数据对象,我们可以看一下,这几个类之间的关系,如图所示:
与一个IoSession有关的数据,都在上面的结构中保存着。其中主要包含两类:一类是用户在启动会话时定义的属性集合,另一类是会话期 间可能需要进行读写操作。每个IoSession实例调用write方法的时候,都会对应这一个WriteRequest对象,封装了写请求数据。而提供I/O服务的IoService实例在运行时会把对应的WriteRequest对象放入/移出IoSessionDataStructureFactory 结构所持有的队列。
- Executor:处理I/O事件的执行
每个IoService都对应这一个Executor,用来处理被触发的I/O事件。
IoAcceptor与IoConnector抽象
IoAcceptor和IoConnector已经区分I/O操作相关的不同服务了,作为通信的服务器端和客户端,必然存在一些差异服务来维持各自在通信过程中的角色,比如,IoAcceptor需要监听指定服务端口,等待客户端的连接到服务器端,而IoConnector与服务器端进行通信,首先应该连接到服务器端Socket暴露的服务地址。下面,我们分别根据通信双方的这两种不同角色,来深入讨论一些细节。
- IoAcceptor抽象
从IoAcceptor接口定义,可以很好地看出它具有的一些基本操作,如下所示:
public interface IoAcceptor extends IoService { SocketAddress getLocalAddress(); Set<SocketAddress> getLocalAddresses(); SocketAddress getDefaultLocalAddress(); List<SocketAddress> getDefaultLocalAddresses(); void setDefaultLocalAddress(SocketAddress localAddress); void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses); void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses); void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses); boolean isCloseOnDeactivation(); void setCloseOnDeactivation(boolean closeOnDeactivation); void bind() throws IOException; void bind(SocketAddress localAddress) throws IOException; void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException; void bind(SocketAddress... addresses) throws IOException; void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException; void unbind(); void unbind(SocketAddress localAddress); void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses); void unbind(Iterable<? extends SocketAddress> localAddresses); IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress); }
可以看到上面定义的方法中,主要是与IP地址相关的操作,主要包括绑定和解绑定,这些操作的实现是在该接口的抽象实现类AbstractIoAcceptor中给予实现的,在AbstractIoAcceptor中并没有涉及到有关SocketChannel的I/O操作,有关如何基于轮询的策略去检查SocketChannel是否有相应的事件被触发,这些I/O相关的操作被封装到AbstractPollingIoAcceptor类中。以基于TCP的NIO通信为例,具体接收客户端到来的连接请求,这些逻辑是在AbstractPollingIoAcceptor的实现类NioSocketAcceptor中实现的,这里创建了用来管理与客户端通信的NioSocketSession对象(它是IoSession的NIO实现)。
- IoConnector抽象
IoConnector的接口定义,如下所示:
public interface IoConnector extends IoService { int getConnectTimeout(); long getConnectTimeoutMillis(); void setConnectTimeout(int connectTimeout); void setConnectTimeoutMillis(long connectTimeoutInMillis); SocketAddress getDefaultRemoteAddress(); void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress); ConnectFuture connect(); ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer); ConnectFuture connect(SocketAddress remoteAddress); ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer); ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer); }
IoConnector定义的操作基本是与连接到服务端的。同样,AbstractIoConnector实现了Connector接口定义的基本操作。以基于TCP的NIO通信为例,客户端和服务端有部分操作非常类似,如轮询SocketChannel检查是否有事件触发,读写请求等,所以,客户端在AbstractIoConnector的抽象实现类AbstractPollingIoConnector中处理于此相关的逻辑。与NioSocketAcceptor对应,客户端有一个NioSocketConnector实现类。
通过上面IoAcceptor和IoConnector的说明,我们还不知道具体I/O操作是由谁来处理的。实际上,无论是服务端还是客户端,在处理轮询通道的抽象服务中,封装了一个IoProcessor抽象,它才是实际处理I/O操作的抽象部分。为了将通信的宏观抽象过程与通信过程中的处理细节分开,将IoProcessor独立出来,与宏观通信过程的逻辑解耦合。以基于TCP的NIO通信为例,在AbstractPollingIoAcceptor和AbstractPollingIoConnector中都有一个IoProcessor实例(这里是实现类NioProcessor的实例),通过调用它提供的处理操作来完成实际的I/O操作。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
玩玩儿