摘要:传统IO是通过流的形式建立内存与外设之间的传输。而NIO则以通道和缓存的方式实现新的数据传输方式以解决传统IO的一些不足,NIO产生自JDK1.4,使用NIO代替传统IO会大大提高数据传输效率,掌握Java NIO的基本知识和使用方法也是对传统IO的一种补充,是一种同步非阻塞IO模式。

NIO概述

Java NIO产生自JDK1.4,主要由以下几个部分组成:

  • Channel 双向交流的通道
  • Buffer 缓冲区,包含数据且用于读写的线性表结构
  • Charset 字符集,提供Unicode字符串映射到字节序列以及逆映射的操作,解决字符编码与解码的问题
  • Selector 选择器,将多元异步IO操作集中到一个或多个线程中

NIO中的通道有点类似于传统IO中的流,首先建立好连接外设的通道,数据可以从Channel读入到Buffer,也可以从Buffer写入Channel进而流向外设。Channel是接口,而Buffer是抽象类,Java NIO中Channel的主要实现包括:

  • FileChannel 文件通道
  • DatagranChannel UAP数据包通道
  • SocketChannel TCP客户端通道
  • ServerSocketChannel TCP服务器端通道

Buffer的实现主要包括:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer
  • MappedByteBuffer

Selector允许单线程处理多个Channel,如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就很方便。

Channel 和 Buffer 的使用

Channel 的实例化是从传统IO流当中获取的,也可以从RandomAccessFile类获取,因为该类集成了InputStream和OutputStream。Buffer的实例化,主要是针对其实现类ByteBuffer,IntBuffer等:

  • FileChannel fc = new FileInputStream(new File(“./src.txt”)).getChannel();
  • FileChannel fc = new RandomAccessFile(“./src.txt”,”rw”).getChannel();
  • ByteBuffer buf = ByteBuffer.allocate(1024); 表示分配1024个字节的缓冲区
  • ByteBuffer buf = ByteBuffer.wrap(new String(“hello”).getBytes()); //指定缓冲区包裹字符串流,且内容初始化为字符串

以下展示一个基本的文件拷贝的例子来展示Channel和Buffer的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void copyFile(File srcFile, File desFile){
FileChannel inChannel = null;
FileChannel outChannel = null;
ByteBuffer buf = null;

try{
inChannel = new FileInputStream(srcFile).getChannel();
outChannel = new FileOutputStream(desFile).getChannel();
buf = ByteBuffer.allocate(1024);
int len = 0;
while((len = inChannel.read(buf)) != -1){
buf.flip();
outChannel.write(buf);
outChannel.force(true);
buf.clear();
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
outChannel.close();
inChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
}

buf.flip()用来反转buf,将buf从写模式切换到读模式,先从通道中读取数据到buf,然后反转,从buf中写入数据到通道,进而输出到外设,FileChannel 的force方法强制将缓冲区的内容全部写进文件,类似于传统IO中的flush方法。

Buffer 详解

Buffer的基本使用步骤

Buffer的使用一般遵循以下步骤:

  1. 从Channel中读取数据到Buffer;
  2. 调用flip()方法反转Buffer;
  3. 从Buffer中写入数据到Channel;
  4. 调用clear()方法清空缓冲区.

Buffer的基本属性

Buffer的基本属性包括:capacitypositionlimit

参数 写模式 读模式
capacity Buffer的容量 Buffer的容量
position 当前写入单位数据的数量 当前读入的单位数据的数量
limit 最多能写入多少单位的数据量,默认和capacity一致 代表最多能读入多少单位的数据量,和之前写入的数据量一致

Buffer的分配和模式切换

前面已经提及,Buffer的分配方式主要有两种,一种是allocate分配指定字节大小的内存区;另一种则是通过已有的字节数组来分配:wrap(字节数组).flip方法用于反转,表示将写模式切换到读模式,positon设为0,limit设置成之前写入到Buffer的数据量。

文件锁定

FileChannel提供两种方法获得FileLock:

  1. FileLock lock();
  2. FileLock lock(long position, long size, boolean shared);

要获取文件一部分上的锁,需要调用FileChannel的第二种lock方法,如果要获取一个排他锁,必须以写方式打开文件。在拥有锁之后,您可以执行需要的任何敏感操作,然后再释放锁。

1
2
3
4
5
RandomAccessFile aFile = new RandomAccessFile("./src.txt","rw");
FileChannel fc = aFile.getChannel();
FileLock lock = fc.lock(position, size, false);
//operations on locked region
lock.release();

Scatter/Gather

Scatter是指将Channel中的数据分散读入到多个Buffer中;Gather是指将多个Buffer数据聚合写入到Channel中。读取和写入的时候都是按顺序写入或读取到Buffer中。

1
2
3
4
5
6
7
8
9
10
11
//Scatter
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer bufferArray[] = {header, body};
channel.read(bufferArray);

//Gather
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer bufferArray[] = {header, body};
channel.write(bufferArray);

Selector与Channel的搭配使用

Selector允许单线程管理多个通道,通过Selector.open方法可以创建一个Selector对象。为了使Selector能够管理多个通道,需要将这些通道注册到Selector上。需要注意的是与Selector一起使用的时候,Channel必须处于非阻塞模式下,由于FileChannel不能切换到非阻塞模式,所以不能被Selector管理,套接字通道都可以。注册通道的register方法有两个参数,第一个是要注册的Selector对象,第二个参数实际上是一个interest集,涵盖要监听的事件类型。该方法返回SelectionKey对象,该对象包含了你感兴趣的属性:

  • interest集合 感兴趣的事件集合
  • ready集合 已准备的就绪操作集合
  • Selector和Channel 从SelectionKey访问Selector和Channel变得非常简单,直接key.selector()和key.channel()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Selector selector = Selector.open();
InetSocketAddress socketAddress = new InetSocketAddress(“www.baidu.com”,80);
SocketChannel sc = SocketChannel.open(socketAddress);
sc.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
while(true){
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator iter = selectedKeys.iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
//a connection was accepted by a ServerSocketChannel
}else if(key.isConnectable()){
//a connection was established with a remote server
}else if(key.isReadable()){
//a channel is ready for reading
}else if(key.isWritable()){
//a channel is ready for writing
}
iter.remove();
}
}

综合案例

接下来的这个案例综合Java NIO,多线程,字符串,正则表达式以及Java类集的使用。该案例的问题背景如下:

统计一个单词可重复的英文文件(假设4G)中每个单词出现的次数,输出出现最多单词的次数以及这些单词(可能有多个),并把所有单词出现次数结果按照英文排序放入一个文件中。并能够检索特定单词的出现次数。由于文件过大,不重复单词总数有限,需要考虑到执行速度和内存使用情况。

分析:

  1. 大文件IO,使用Java NIO代替传统IO
  2. 将该文件拆分成若干部分,对每个部分进行锁定,并开启若干线程进行处理
  3. 单词和次数配对出现,因此,我们需要使用一对值操作集合Map
  4. 最后要按英文排序,并能够检索特定单词出现的次数,显然单词是key,次数是value,且按key有序,所以使用TreeMap进行管理。
  5. 另外,从英文文件中提取每一个单词需要采用Java正则表达式,或StringTokenizer实现

下面是程序示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


public class TestNIO{
public static TreeMap<String, Integer> map = new TreeMap<String, Integer>();

public static void main(String[] args) throws Exception{
File srcFile = new File("./src.txt");
File desFile = new File("./des.txt");

long fileLength = srcFile.length();

/*
long size = 25;
int num = 0;

if(fileLength < size){
num = 1;
size = fileLength;
}else{
num = (int)(fileLength / size);
}*/


LineNumberReader lnr = new LineNumberReader(new InputStreamReader(new FileInputStream(srcFile)));
long lineSize = lnr.readLine().length();
lnr.skip(fileLength);
long lineNum = lnr.getLineNumber()+1;
System.out.println("The file has "+lineNum+" lines");

int numBlock = 10;
long size = numBlock*lineSize;
long num = 0;

if(fileLength < size){
num = 1;
size = fileLength;
}else{
num = lineNum / numBlock;
}

for(int i=0;i<num;i++)
{
long pos = i*size;
if(i == (num-1)){
size = fileLength - pos;
}
CountWords cw = new CountWords(srcFile, pos, size);
Thread t = new Thread(cw);
t.start();
t.join();
}

getMaxWords(map);
map2File(map,desFile);

}

public static void getMaxWords(Map<String, Integer> map){
TreeMap<Integer, ArrayList<String>> result = new TreeMap<Integer, ArrayList<String>>();

Set<Map.Entry<String, Integer>> entrys = map.entrySet();
Iterator<Map.Entry<String, Integer>> iter = entrys.iterator();

while(iter.hasNext()){
Map.Entry<String, Integer> me = iter.next();
String str = me.getKey();
Integer num = me.getValue();
ArrayList<String> al = new ArrayList<String>();
if(null != result.get(num)){
al = result.get(num);
al.add(str);
}else{
al.clear();
al.add(str);
}
result.put(num,al);
}

Map.Entry<Integer, ArrayList<String>> last = result.lastEntry();

System.out.println("The max repeat num is: "+last.getKey());
System.out.print("The corresponding str is: ");
ArrayList<String> al = last.getValue();

for(int i=0;i<al.size();i++){
System.out.print(al.get(i));
if(i<al.size()-1){
System.out.print(",");
}
}
System.out.println();

}

public static void map2File(Map<String, Integer> map, File des){
FileChannel fc = null;
try{
fc = new FileOutputStream(des).getChannel();
Set<Map.Entry<String, Integer>> entrys = map.entrySet();
Iterator<Map.Entry<String, Integer>> iter = entrys.iterator();
while(iter.hasNext()){
Map.Entry<String, Integer> me = iter.next();
String key = me.getKey();
int value = me.getValue();

ByteBuffer buf = ByteBuffer.wrap((key+": "+value+"\r\n").getBytes());
fc.write(buf);
buf.clear();
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
fc.close();
System.out.println("map to file done");
}catch(Exception e){
}
}
}
}



class CountWords implements Runnable {

private FileChannel fc = null;
private FileLock fl = null;
private MappedByteBuffer mbBuf = null;
private HashMap<String, Integer> hm = null;

public CountWords(File src, long pos, long size){
try{
fc = new RandomAccessFile(src,"rw").getChannel();
fl = fc.lock(pos, size, false);
mbBuf = fc.map(FileChannel.MapMode.READ_ONLY, pos, size);
hm = new HashMap<String ,Integer>();
}catch(Exception e){
e.printStackTrace();
}
}

public void run(){
String str = Charset.forName("UTF-8").decode(mbBuf).toString();
String word = null;

//use regex to obtain word
Pattern pattern = Pattern.compile("\\b[\\w-']+\\b");
Matcher matcher = pattern.matcher(str);
while (matcher.find()) {
word = matcher.group();
if(null != hm.get(word)){
hm.put(word, hm.get(word)+1);
}else{
hm.put(word, 1);
}
}
/*use stringtokenizer to obtain word
StringTokenizer token = new StringTokenizer(str);
while(token.hasMoreTokens()){
word = token.nextToken();
if(null != hm.get(word)){
hm.put(word, hm.get(word)+1);
}else{
hm.put(word, 1);
}
}*/

TestNIO.map.putAll(hm);

try{
fl.release();
fc.close();
}catch(Exception e){
e.printStackTrace();
}
}
}

该案例问题总结如下:

  • 其中文件的每个部分的划分,开始是按文件大小进行分割,但是发现,这种分割方式会将文件中的单词截断,所以后面选择采用对文件中的行数进行分割
  • 文件的每一部分分配给指定线程进行处理,先锁定该部分内容,处理完成之后,添加到全局map中,然后,再释放锁。
  • 使用StringTokenizer提取单词,会包含标点符号,因此使用正则表达式的方式是最好的。
  • 另外本示例若分配到多个线程处理,好像还存在问题,希望共同探讨。