bb两句

这个实验是我海带洪老师为计网课程设计的大实验,还是比较具有挑战性的。当时设置的赋分要求大致为:

标准 得分
RDT 2.0 3
RDT 2.2 6
RDT 3.0 8
GB/SR/TCP 13
TCP Tahoe 15
TCP Reno 16
未完成关键困难 2
迭代开发 1
已解决主要问题 1
实验系统建议 1
合计 20

事实上,拿到手的实验环境jre包中,已经完成到了RDT2.1版本(但仍需要手动补充校验和的计算类),因此头3分是白送滴。

我在这个实验中拿了满分,也就是我实现了TCP Reno版本,并不容易。我粗略估计了一下,完成到RDT4.0之前的部分,我大约花费了10h左右,由于这部分还未涉及到滑动窗口的引入,因此迭代的过程相对容易;但到了引入滑动窗口的版本时,花费我大约又是15h左右才算最终完成整个实验,这是因为我选择的SR到Tahoe的迭代过程几乎是推翻了前面几个版本,重新搭建类的框架。

照猫画虎来倒也轻松,难在复习周的时间匀兑,分奴嘛,生来是牛马,用肝子换分子,不亏。

实验环境

  • OS: Windows 11
  • Jdk: jdk-6u45-windows-i586
  • Jre: jre-6
  • IDE: Eclipse 4.29.0
  • TCP-TestSys: TCP_TestSys_Linux.jar(并没有提供windows版本,但该版本在windows上可以正常运行)

从可靠信道到不可靠信道的迭代开发过程

RDT 2.0+2.1: 信道上可能出现位错、管理出错的ACK/NAK

设计方案

  • 错误检测:校验和检查

  • 从错误中恢复:ACKs、NAKs;若NAK则重传

  • ACK/NAK出错——管理重复:sequence number

校验和检查函数CheckSum.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CheckSum {

/*计算TCP报文段校验和:只需校验TCP首部中的seq、ack和sum,以及TCP数据字段*/
public static short computeChkSum(TCP_PACKET tcpPack) {
long checkSum = 0;

CRC32 crc = new CRC32();

/*从首部获取seq、ack和sum*/
TCP_HEADER header = tcpPack.getTcpH();
crc.update(header.getTh_seq());
crc.update(header.getTh_ack());
//crc.update(header.getTh_sum()); //加上sum反而会导致错误

/*从数据部获取数据*/
TCP_SEGMENT segment = tcpPack.getTcpS();
int[] data = segment.getData();
for(int i = 0;i < data.length; i++) {
crc.update(data[i]);
}
/*计算校验和*/
checkSum = crc.getValue();

return (short) checkSum;
}

}

Sender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
//可靠发送(应用层调用):封装应用层数据,产生TCP数据报;需要修改
public void rdt_send(int dataIndex, int[] appData) {

//生成TCP数据报(设置序号和数据字段/校验和),注意打包的顺序
tcpH.setTh_seq(dataIndex * appData.length + 1);//包序号设置为字节流号:
tcpS.setData(appData);
tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);
//更新带有checksum的TCP 报文头
tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));
tcpPack.setTcpH(tcpH);

//发送TCP数据报
udt_send(tcpPack);
flag = 0;

//等待ACK报文
//waitACK();
while (flag==0);
}
@Override
//需要修改
public void waitACK() {
//循环检查ackQueue
//循环检查确认号对列中是否有新收到的ACK
if(!ackQueue.isEmpty()){
int currentAck=ackQueue.poll();
// System.out.println("CurrentAck: "+currentAck);
if (currentAck == tcpPack.getTcpH().getTh_seq()){
System.out.println("Clear: "+tcpPack.getTcpH().getTh_seq());
flag = 1;
//break;
}else{ //RDT2.1中有NAK(-1),收到-1(!=currentAck)则重发
System.out.println("Retransmit: "+tcpPack.getTcpH().getTh_seq());
udt_send(tcpPack);
flag = 0;
}
}
}

@Override
//接收到ACK报文:检查校验和,将确认号插入ack队列;NACK的确认号为-1;不需要修改
public void recv(TCP_PACKET recvPack) {
System.out.println("Receive ACK Number: "+ recvPack.getTcpH().getTh_ack());
ackQueue.add(recvPack.getTcpH().getTh_ack());
System.out.println();

//处理ACK报文
waitACK();

}

Receiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
//接收到数据报:检查校验和,设置回复的ACK报文段
public void rdt_recv(TCP_PACKET recvPack) {
//检查校验码,生成ACK
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {
//生成ACK报文段(设置确认号)
tcpH.setTh_ack(recvPack.getTcpH().getTh_seq());
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
//回复ACK报文段
reply(ackPack);

//将接收到的正确有序的数据插入data队列,准备交付
dataQueue.add(recvPack.getTcpS().getData());
sequence++;
}else{
System.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));
System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());
System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);
tcpH.setTh_ack(-1); //RDT2.1时仍需要区分ACK和NAK
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
//回复ACK报文段
reply(ackPack);
}

System.out.println();

//交付数据(每20组数据交付一次)
if(dataQueue.size() == 20)
deliver_data();
}

测试Log

  • Sender_eFlag = Receiver_eFlag = 0: 假设信道无差错

  • Sender_eFlag = 1 && Receiver_eFlag = 0: 假设个别数据包某些字节可能位错,但ACK/NAK不出错:

  • Sender_eFlag = 0 && Receiver_eFlag = 1: 假设ACK/NAK出错:

RDT2.2: 无NAK的协议

设计方案

  • 接收方若收到错误包则重发上一正确包seq对应的ACK

  • 发送方收到重复seq的ACK则重发当前包

Sender(无改动)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void waitACK() {
//循环检查ackQueue
//循环检查确认号对列中是否有新收到的ACK
if(!ackQueue.isEmpty()){
int currentAck=ackQueue.poll();
// System.out.println("CurrentAck: "+currentAck);
if (currentAck == tcpPack.getTcpH().getTh_seq()){
System.out.println("Clear: "+tcpPack.getTcpH().getTh_seq());
flag = 1;
//break;
}else{ //RDT2.2中没有NAK,若未收到currentAck而是重复收到相同seq的ACK,则重发
System.out.println("Retransmit: "+tcpPack.getTcpH().getTh_seq());
udt_send(tcpPack);
flag = 0;
}
}

Receiver(增加类成员、修改rdt_recv)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class TCP_Receiver extends TCP_Receiver_ADT {
private TCP_PACKET ackPack; //回复的ACK报文段
int sequence=1;//用于记录当前待接收的包序号,注意包序号不完全是
int pre_seq=-1; //用于记录上一个接收成功的包序号

/*构造函数*/
public TCP_Receiver() {
super(); //调用超类构造函数
super.initTCP_Receiver(this); //初始化TCP接收端
}

@Override
//接收到数据报:检查校验和,设置回复的ACK报文段
public void rdt_recv(TCP_PACKET recvPack) {
//由于sender发送的seq是字节流序号,因此需要求出data长度
int[] data = recvPack.getTcpS().getData();
int datalen = data.length;
//检查校验码,生成ACK
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {
//生成ACK报文段(设置确认号)
tcpH.setTh_ack(recvPack.getTcpH().getTh_seq());
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
//回复ACK报文段
reply(ackPack);

//逆向计算seq:dataIndex * appData.length + 1
sequence = (recvPack.getTcpH().getTh_seq() - 1 ) / datalen;
if(sequence != this.pre_seq) { //重复包丢弃,不交付
this.pre_seq = sequence; //更新pre_seq
//将接收到的正确有序的数据插入data队列,准备交付
dataQueue.add(data);
sequence++;
}
}else{ //RDT2.2:对于收到错误包,重新发送上一接收成功的包对应的ACK
System.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));
System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());
System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);
// tcpH.setTh_ack(-1); //RDT2.2不需特别标识出NAK
tcpH.setTh_ack(pre_seq * datalen + 1); //重发上一接收成功的包对应的ACK
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
//回复ACK报文段
reply(ackPack);
}

System.out.println();

//交付数据(每20组数据交付一次)
if(dataQueue.size() == 20)
deliver_data();
}
public void deliver_data() {…
public void reply(TCP_PACKET replyPack) {…
}

说明:

  • 需要记录上一成功接收的包的序号,因此在类中添加成员pre_seq初始化-1

  • 对于重复数据包丢弃,需要判断rdt_rcv收到的包序号是否重复,因此应当计算出当前sequence = (recvPack.getTcpH().getTh_seq() - 1 ) / datalen;(由于sender发送的数据包序号是字节流信号,所以接收方计算seq时需要-1再除以数据长度),并将接收正确的包的序号更新在pre_seq中、交付上层。

  • 对于校验失败的错误包,重新发送上接受正确的包seq的ACK。

测试Log(Sender_eFlag = Receiver_eFlag = 1):

  • Receiver以重复ACK(37801、39001)代替NAK:

  • Sender成功识别出重复ACK(37801、39001)并重发37901、39101:

RDT3.0: 通道上可能出错和丢失数据

设计方案:

  • 丢包:发送方使用计时器等待ACK,若倒计时间内未收到ACK则重发

  • 延迟:接收方对重复seq数据包丢弃

Sender(增添Timer成员、reTrans成员,修改rdt_send、waitACK())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class TCP_Sender extends TCP_Sender_ADT {

private TCP_PACKET tcpPack; //待发送的TCP数据报
private volatile int flag = 0;
private UDT_Timer timer; //计时器
private UDT_RetransTask reTrans; //重传任务

public TCP_Sender() {…

@Override
//可靠发送(应用层调用):封装应用层数据,产生TCP数据报;需要修改
public void rdt_send(int dataIndex, int[] appData) {

//生成TCP数据报(设置序号和数据字段/校验和),注意打包的顺序
tcpH.setTh_seq(dataIndex * appData.length + 1);//包序号设置为字节流号:
tcpS.setData(appData);
tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);
//更新带有checksum的TCP 报文头
tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));
tcpPack.setTcpH(tcpH);

timer = new UDT_Timer(); //RDT3.0:实例化计时器,发送后启动
reTrans = new UDT_RetransTask(client, tcpPack); //实例化重传任务,timeout则重传

//RDT3.0: 每隔3秒执行重传,直到收到ACK
timer.schedule(reTrans, 3000, 3000);

//发送TCP数据报
udt_send(tcpPack);
flag = 0;

//等待ACK报文
//waitACK();
while (flag==0);
}

public void udt_send(TCP_PACKET stcpPack) {…

@Override
//需要修改
public void waitACK() {
//循环检查ackQueue
//循环检查确认号对列中是否有新收到的ACK
if(!ackQueue.isEmpty()){
int currentAck=ackQueue.poll();
// System.out.println("CurrentAck: "+currentAck);
if (currentAck == tcpPack.getTcpH().getTh_seq()){
System.out.println("Clear: "+tcpPack.getTcpH().getTh_seq());
timer.cancel(); //RDT3.0: 收到对应ACK则停止计时
flag = 1;
//break;
}


// else{ //RDT3.0: 超时重传,因此对于收到的错误ACK不需回应
// System.out.println("Retransmit: "+tcpPack.getTcpH().getTh_seq());
// udt_send(tcpPack);
// flag = 0;
// }
}
}

public void recv(TCP_PACKET recvPack) {…

说明:

  • 为Sender类设置UDT_Timer计时器成员、UDT_RetransTask重传任务成员

  • 在sender进行rdt_send时初始化计时器和重传任务,设置重传时间为3s(实际运行时发现时间有点长了),在timeout时执行重传任务

  • 在weitACK时,若收到正确的ACK则暂停timer,否则不做操作(等待timeout重传)

Receiver(对延迟导致的乱序进行处理)

1
2
3
4
5
6
if(sequence > this.pre_seq) {     //RDT3.0: 对于延迟送达的数据包丢弃
this.pre_seq = sequence; //更新pre_seq
//将接收到的正确有序的数据插入data队列,准备交付
dataQueue.add(data);
sequence++;
}

测试Log

  • Sender_eFlag = Receiver_eFlag = 2 (丢包)

    发送方在计时器时间内等待ACK,超时对应的可能情况:数据包丢失、接收方根本没收到于是没有ACK、数据包没丢失并且接收方收到后发送了ACK但是该ACK丢失。发送方并不关心是什么情况,只要等待的ACK超时未收到则重发。

    但是在接收方视角,如果ACK丢失,就会收到发送方重复的数据报,于是丢弃该包后重发ACK。

  • Sender_eFlag = Receiver_eFlag = 3 (延迟)

    对于发送方,依旧是并不关心ACK为什么没收到,只要timeout,就重发,若收到错误的ACK(例如接收方ACK延迟送达、接收方对发送方延迟包的ACK)都不做任何操作。

    而对于接收方,若发送方的数据包延迟:对重发的数据包和延迟到达的数据包都进行正常回应,但是对延迟到达的乱序数据包丢弃;若接收方ACK延迟,同样丢弃发送方重发的数据包。

​ 至此,RDT已经可以解决不可靠信道上可能出现的出错、丢包、延迟的所有问题,尽管使用的停止等待使得效率较低。

效率提高的流水线协议迭代开发过程

Select-Response:选择响应协议

设计方案

滑动窗口设计:循环队列。

  • 接收方逐个对所有正确收到的分组进行应答:对接收到的(失序)分组进行缓存,以便最后对上层进行有效递交。

  • 发送方仅对未收到应答的分组进行重发:为每个分组设置计时器。

Sender:

image-20240219192400371

发送方的窗口SenderWindow:

  • 数据结构:

    • 将窗口内的每个块划分为4个状态:

      0: not usable //表示该块尚未缓存数据包

      1: usable, not yet sent //表示该块已缓存数据,但尚未发送

      2: sent, not yet ack’ed //表示该块缓存的数据已经发送,但尚未收到对应的ACK

      3: already ack’ed //表示该块已发送的数据收到应答

    • 为窗口设定三个指针:

      queue_base //窗口基址,即左端

      queue_rear //窗口末端

      next_to_send //下一待发送数据地址

  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    /***************************Selective-Response: 选择响应协议
    **************************** 郝文轩; 2023-12-21*/

    package com.ouc.tcp.test;


    import com.ouc.tcp.test.CheckSum;
    import com.ouc.tcp.client.TCP_Sender_ADT;
    import com.ouc.tcp.client.UDT_RetransTask;
    import com.ouc.tcp.client.UDT_Timer;

    import java.util.Arrays;

    import com.ouc.tcp.client.Client;
    import com.ouc.tcp.message.*;
    import com.ouc.tcp.tool.TCP_TOOL;

    public class SenderWindow {
    private int WinSize = 16; //窗口大小
    private TCP_PACKET[] tcpPacks = new TCP_PACKET[WinSize]; //待发送的TCP数据报窗口(循环队列)
    private UDT_Timer[] timers = new UDT_Timer[WinSize]; //为窗口内每个包分配一个计时器
    private int queue_base = 0; //窗口始端指针
    private int next_to_send = 0; //待发送包指针
    private int queue_rear = 0; //队末指针
    private int[] flag; //0:have no pkt; 1:usable not yet send; 2:sent not yet ack'ed; 3: already ack'ed
    Client client;

    public SenderWindow(Client client, int WinSize) {
    this.client = client;
    this.WinSize = WinSize;
    flag = new int[WinSize];
    Arrays.fill(flag, 0);
    }

    //窗口满判断
    public boolean isFull() {
    return (queue_rear - queue_base) == WinSize;
    }

    //在队末处缓冲包
    public void pushPack(TCP_PACKET packet) {
    int rear_index = queue_rear % WinSize; //求队末相对索引
    tcpPacks[rear_index] = packet;
    flag[rear_index] = 1; //标记为可发的包
    queue_rear++;
    int[] data = packet.getTcpS().getData();
    int datalen = data.length;
    int sequence = (packet.getTcpH().getTh_seq() - 1) / datalen;
    //System.out.println(sequence);
    }

    //对ACK处理
    public void rcvAck(int sequence) {
    //求各项相对索引
    int seq_index = sequence % WinSize;
    int rear_index = queue_rear % WinSize;
    int base_index = queue_base % WinSize;
    if (sequence >= queue_base && sequence <= queue_rear) {
    if (flag[seq_index] == 2) { //若未收到ACK
    timers[seq_index].cancel(); //计时器暂停
    timers[seq_index] = null;
    flag[seq_index] = 3; //标记为已收到ACK
    }
    else { //收到重复/错误ACK
    return;
    }
    if(sequence == queue_base) { //若窗口左端的包收到ACK
    while(flag[base_index] == 3) { //检索最大已ACK位置索引
    tcpPacks[base_index] = null;
    flag[base_index] = 0; //标记为空(可放入包)
    queue_base++;
    base_index = queue_base % WinSize;
    }
    }
    }
    else {
    return;
    }
    }

    //获取可发送包
    public int getSendIndex() {
    int send_index = next_to_send % WinSize;
    if(flag[send_index] == 1 ) {
    flag[send_index] = 2; //标识为已发送但未ACK
    next_to_send++;
    //System.out.println(send_index);
    return send_index;
    }
    else {
    return -1;
    }
    }

    //可发包预备工作
    public TCP_PACKET getPack(int index) {
    UDT_RetransTask retrans = new UDT_RetransTask(client, tcpPacks[index]);
    timers[index] = new UDT_Timer();
    timers[index].schedule(retrans, 1000, 1000); //计时开始,超时重发
    return tcpPacks[index];
    }
    }
  • 发送方窗口类存在五个行为:

    • ifFull(): 判断窗口是否缓存满,用来控制应用层的写入节奏,在满时需要进入忙等(直到窗口左端已发数据包收到ACK);

    • pushPack(): 将应用层提供的数据包缓存到窗口队列中,在该行为中,将块的状态由0转换到1;

    • getSendIndex(): 为TCP_Sender类提供预备发送包的接口,即提供该块的索引地址;

    • getPack(): 完成包的发送工作,在该行为中初始化计时器并设定计时开始,同时将窗口中该块的状态由1转换到2;

    • rcvAck(): 对当前收到的ACK进行解析,若该ACK的序号为窗口中unACK’ed块所对应的应答,则将状态由2转换到3,否则不做处理;同时,若对应为窗口最左端的块的应答,则将窗口基址移动到下一未收到ACK的块的位置,并将此过程中已收到ACK的块的状态由3转换到0。

TCP_Sender.java:

修改了rdt_end()函数及waitACK()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/***************************2.1: ACK/NACK
**************************** Feng Hong; 2015-12-09*/

/***************************Selective-Response: 选择响应协议
**************************** 郝文轩; 2023-12-21*/

package com.ouc.tcp.test;

import

public class TCP_Sender extends TCP_Sender_ADT {

private TCP_PACKET tcpPack; //待发送的TCP数据报
private volatile int flag = 1;
//private UDT_Timer timer; //计时器
//private UDT_RetransTask reTrans; //重传任务
private SenderWindow window = new SenderWindow(client, 16);

/*构造函数*/
public TCP_Sender() {…}

@Override
//可靠发送(应用层调用):封装应用层数据,产生TCP数据报;需要修改
public void rdt_send(int dataIndex, int[] appData) {

//生成TCP数据报(设置序号和数据字段/校验和),注意打包的顺序
tcpH.setTh_seq(dataIndex * appData.length + 1);//包序号设置为字节流号:
tcpS.setData(appData);
tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);
//更新带有checksum的TCP 报文头
tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));
tcpPack.setTcpH(tcpH);
//System.out.println(dataIndex);
//若未装满则载入包
if(window.isFull()) {
flag = 0;
}

//装满则等待
while(flag == 0);

try {
window.pushPack(tcpPack.clone());
} catch (CloneNotSupportedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}

//发送TCP数据报
int index = window.getSendIndex();
if(index != -1) {
udt_send(window.getPack(index));
}
}

public void udt_send(TCP_PACKET stcpPack) {…}

@Override
//需要修改
public void waitACK() {
//循环检查ackQueue
//循环检查确认号对列中是否有新收到的ACK
if(!ackQueue.isEmpty()){
int currentAck=(ackQueue.poll() - 1) / 100; //?此处已知字节流长度100
window.rcvAck(currentAck); //交于窗口处理
if(!window.isFull()) { //释放锁
flag = 1;
}
}
}
public void recv(TCP_PACKET recvPack) {…}
  • 当窗口未装满的情况下从上层装入包,否则进入加锁忙等;当next_to_send指针所指向的块可以发送,则发送;收到ACK后交于窗口类处理,当窗口存在空闲块时解锁。

Receiver:

接收方的ReceiverWindow.java

  • 数据结构:

    窗口块的状态简化为两类:

    0: Expected, not yet received //表示该位序的包尚未收到

    1: already ack’ed //收到该位的包并应答

  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    /***************************Selective-Response: 选择响应协议
    **************************** 郝文轩; 2023-12-21*/

    package com.ouc.tcp.test;

    import com.ouc.tcp.test.CheckSum;

    import java.util.Arrays;

    import com.ouc.tcp.client.Client;
    import com.ouc.tcp.client.TCP_Sender_ADT;
    import com.ouc.tcp.client.UDT_RetransTask;
    import com.ouc.tcp.client.UDT_Timer;
    import com.ouc.tcp.message.*;
    import com.ouc.tcp.tool.TCP_TOOL;

    public class ReceiverWindow {
    private int WinSize = 16; //窗口大小
    private TCP_PACKET[] tcpPacks; //接收TCP数据报窗口(循环队列)
    private int queue_base = 0; //窗口始端指针
    private int queue_rear = WinSize-1; //队末指针
    private int[] flag; //0:expected not yet received; 1: already ack'ed
    Client client;

    public ReceiverWindow(Client client, int WinSize) {
    this.client = client;
    this.WinSize = WinSize;
    tcpPacks = new TCP_PACKET[WinSize];
    flag = new int[WinSize];
    Arrays.fill(flag, 0);
    }

    //判断当前seq是否为窗口最左端
    public boolean isBase(int seq) {
    return queue_base == seq;
    }

    //处理收到的包
    public int pullPack(TCP_PACKET packet) {
    //计算seq
    int[] data = packet.getTcpS().getData();
    int datalen = data.length;
    int sequence = (packet.getTcpH().getTh_seq() - 1) / datalen;

    //求各项相对索引
    int seq_index = sequence % WinSize;
    int rear_index = queue_rear % WinSize;
    int base_index = queue_base % WinSize;
    if (sequence >= queue_base && sequence <= queue_rear) {
    if (flag[seq_index] == 0) { //若该块未收到packet
    tcpPacks[seq_index] = packet;
    flag[seq_index] = 1; //标记为已收到
    }
    }
    else if (sequence > queue_rear) { //对于超前收到的包不做应答
    return -1;
    }
    return sequence; //对已收到的包(包括历史窗口)进行应答
    }

    //包交付处理(对有序包交付)
    public int seq_deliver() {
    int rear_index = queue_rear % WinSize;
    int base_index = queue_base % WinSize;
    if(flag[base_index] == 1) {
    queue_base++;
    queue_rear++;
    flag[base_index] = 0;
    return base_index;
    }
    else {
    return -1;
    }
    }

    //包接口
    public TCP_PACKET getPack(int index) {
    TCP_PACKET pack = tcpPacks[index];
    tcpPacks[index] = null;
    return pack;
    }

    }
  • 该类存在以下四个行为:

    • isBase(): 返回当前收到的包序号是否为窗口最左端,用以在TCP_Receiver中进行交付判定条件;
    • pullPack(): 对收到的包进行处理,其中若该包序号为窗口对应块所期望的,则将块状态由0转换为1,以及序号为历史窗口收到过的(即重复包),返回该包序号,准备应答,而对于序号超前(即还未被窗口收纳的)包,则不做应答直接丢弃;
    • seq_deliver(): 包交付处理,用来返回当前窗口最左端块是否已完成接收,对最左端起始的连续块进行有序交付;
    • getPack(): 包返回接口。

TCP_Receiver.java:

修改了rdt_recv函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/***************************2.1: ACK/NACK*****************/
/***** Feng Hong; 2015-12-09******************************/

/***************************Selective-Response: 选择响应协议
**************************** 郝文轩; 2023-12-21*/

package com.ouc.tcp.test;

import

public class TCP_Receiver extends TCP_Receiver_ADT {

private TCP_PACKET ackPack; //回复的ACK报文段
int sequence=1;//用于记录当前待接收的包序号,注意包序号不完全是
int pre_seq=-1; //用于记录上一个接收成功的包序号
private ReceiverWindow window = new ReceiverWindow(client, 16);

/*构造函数*/
public TCP_Receiver() {…}

@Override
//接收到数据报:检查校验和,设置回复的ACK报文段
public void rdt_recv(TCP_PACKET recvPack) {
//检查校验码,生成ACK
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {
int[] data = recvPack.getTcpS().getData();
int datalen = data.length;
int sequence = (recvPack.getTcpH().getTh_seq() - 1) / datalen;
int seq_to_ACK = -1;

//获取当前应答ACK报文的seq
try {
seq_to_ACK = window.pullPack(recvPack.clone());
} catch (CloneNotSupportedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}

if(seq_to_ACK != -1) { //需要应答(非超前包)
tcpH.setTh_ack(seq_to_ACK * datalen + 1);
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
reply(ackPack);
if(window.isBase(sequence)) { //该包恰为最左端块
int index = window.seq_deliver();
while (index != -1) { //则将连续已接收块的包交付上层
TCP_PACKET pack = window.getPack(index);
dataQueue.add(pack.getTcpS().getData());
// int seq = (pack.getTcpH().getTh_seq() - 1) / pack.getTcpS().getData().length;
// System.out.println(seq);
index = window.seq_deliver();
}
}
}

}
//else{} //对于收到错误包,直接丢弃

System.out.println();


//交付数据(每20组数据交付一次)
deliver_data();
}

public void deliver_data() {…}

public void reply(TCP_PACKET replyPack) {…}

}

首先判断对接收到的包是否需要应答(超前收到的包不做应答),同时判断若为期望包是否为最左端块所期望的包,若为最左端则交付连续的有序包给上层。

测试Log:

对于发送方,其不在乎NO_ACK的具体原因,只要没有收到ACK则等待倒计时结束时重发;由于倒计时时间大于RTT,因此在重发前窗口便已满,因此无法进一步缓存包、发送包,直到刚才未收到ACK(位于窗口最左端)的包倒计时结束、重发后才会有空闲块允许继续发送。

对于失序的问题,Receiver的窗口也很好地完成了任务,保证了接收数据的有序性:

TCP Tahoe:拥塞控制方法——慢开始和快重传

设计方案

发送方维持拥塞窗口cwnd状态变量

  • 发送方让自己的发送窗口等于拥塞窗口,因此,在SR协议的基础上用cwnd取代滑动窗口。
  • TCP连接进行初始化时,将拥塞窗口cwnd置为1。慢开始门限ssthresh初始化为16。
  • 发送方每收到一个确认,就把cwnd加1。
  • 在慢开始阶段,cwnd随传输轮次RTT指数增长,当cwnd增长到慢开始门限值时,执行拥塞避免算法

拥塞避免算法:

  • cwnd随传输轮次RTT线性增长。

快重传:

  • 当网络出现超时(发送端计时器结束),判定为网络拥塞,更新ssthresh为当前cwnd值的一半,拥塞窗口重新设置为1,并执行慢开始算法。

  • Tahoe协议中,接收到3个重复Ack同样判定为拥塞,同样更新ssthresh为当前cwnd值的一半,拥塞窗口重新设置为1,并执行慢开始算法。

Sender

SenderWindow.java

  • 数据结构:

    • reAck: 记录重复包的个数,用以判定网络状况
    • lastAck: 记录上一ACK序号
    • cwnd: 拥塞窗口大小
    • ssthresh: 慢开始门限
    • curAcked: 拥塞窗口中已收到Ack的数量
    • RTT: 一个传输轮次(以包为单位)
  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    /***************************TCP-Tahoe
    **************************** 郝文轩; 2024-01-02*/

    package com.ouc.tcp.test;


    import com.ouc.tcp.test.CheckSum;
    import com.ouc.tcp.client.TCP_Sender_ADT;
    import com.ouc.tcp.client.UDT_RetransTask;
    import com.ouc.tcp.client.UDT_Timer;

    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Hashtable;
    import java.util.logging.FileHandler;
    import java.util.logging.Logger;
    import java.util.logging.SimpleFormatter;

    import com.ouc.tcp.client.Client;
    import com.ouc.tcp.message.*;
    import com.ouc.tcp.tool.TCP_TOOL;

    public class SenderWindow {
    private int cwnd = 1; //拥塞窗口
    private volatile int ssthresh = 16; //慢开始门限
    private Hashtable<Integer, TCP_PACKET> tcpPacks = new Hashtable<Integer, TCP_PACKET>(); //待发送的TCP数据报窗口(循环队列)
    private Hashtable<Integer, UDT_Timer> timers = new Hashtable<Integer, UDT_Timer>(); //为窗口内每个包分配一个计时器
    private int reAck = 0; //重复包个数
    private int lastAck = -1; //上一ACK序号
    private int curAcked = 0; //窗口中已收到Ack的数量(等于cwnd时表示一个RTT结束)
    private int RTT = 1;

    Logger logger;
    Client client;

    private void resetCwndAndRtt() { //慢开始初始化
    cwnd = 1;
    curAcked = 0;
    RTT = 1;
    }

    class Tahoe_Timer extends UDT_RetransTask {
    int seq;
    private TCP_PACKET packet;

    public Tahoe_Timer(Client client, TCP_PACKET packet) {
    super(client, packet);
    int[] data = packet.getTcpS().getData();
    int datalen = data.length;
    seq = (packet.getTcpH().getTh_seq() - 1) / datalen;
    this.packet = packet;
    }

    @Override
    public void run() {
    logger.info("############### 拥塞慢开始 ###############");
    ssthresh = Math.max(cwnd / 2, 2);
    resetCwndAndRtt();
    super.run();

    logger.info("[慢开始阶段]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
    }
    }

    private void initLogger(){ //设置log文件
    logger= Logger.getLogger(SenderWindow.class.getName());

    logger.setUseParentHandlers(false);
    FileHandler fh = null;
    try {
    fh = new FileHandler("RDTSender.log",false);
    fh.setFormatter(new SimpleFormatter());//输出格式

    } catch (IOException e) {
    e.printStackTrace();
    }
    // fh.setFormatter();
    logger.addHandler(fh);
    }

    public SenderWindow(Client client) {
    initLogger();
    this.client = client;
    }

    public boolean isFull() {
    return cwnd <= tcpPacks.size();
    }

    public void pushPack(TCP_PACKET packet) { //传包
    int[] data = packet.getTcpS().getData();
    int datalen = data.length;
    int seq = (packet.getTcpH().getTh_seq() - 1) / datalen;
    timers.put(seq, new UDT_Timer());
    timers.get(seq).schedule(new Tahoe_Timer(client, packet), 1000, 1000);
    tcpPacks.put(seq, packet);
    }

    //对ACK处理
    public void rcvAck(int sequence) {
    logger.info("接收到ack: "+ sequence);
    // logger.info("cwnd = " + cwnd + " curAcked = " + curAcked);
    if(sequence == lastAck) { //收到重复包
    reAck++;
    logger.info("############### 收到重复包:sequence = "
    + sequence + " 重复计数: " + reAck + " ###############");
    if(reAck >= 3) { //收到3个重复包,快重传
    int resendSeq = sequence + 1;
    TCP_PACKET pack = tcpPacks.get(resendSeq);
    UDT_Timer timer = timers.get(resendSeq);
    if(pack != null && timer != null) {
    // System.out.println("############### 快重传");
    client.send(pack);
    timer.cancel();
    timers.put(resendSeq, new UDT_Timer());
    timers.get(resendSeq).schedule(new Tahoe_Timer(client, pack), 1000, 1000);
    logger.info("############### 快重传:seq = " + resendSeq + " ###############");
    }
    System.out.println("############### Tahoe重传慢开始 ###############");
    ssthresh = Math.max(cwnd / 2, 2); //乘法减小(但不能小于2)
    resetCwndAndRtt();
    logger.info("[慢开始阶段]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
    }
    }
    else {
    curAcked++;
    for(int i = lastAck +1; i <= sequence; i++) { //累积确认
    tcpPacks.remove(i);
    if (timers.containsKey(i)) {
    timers.get(i).cancel();
    timers.remove(i);
    }
    }
    lastAck = sequence; //记录
    reAck = 0;
    if(cwnd < ssthresh) { //慢开始阶段
    cwnd++;
    if(curAcked >= RTT) {
    logger.info("[一个传输轮次]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
    curAcked = 0;
    RTT *= 2;
    }
    }
    else { //拥塞控制阶段
    // logger.info("############### cwnd = " + cwnd + "curAcked = " + curAcked + " ###############");
    if(curAcked >= RTT) {
    logger.info("############### 拥塞避免 ###############");
    curAcked = 0;
    cwnd ++;
    RTT ++;
    logger.info("[拥塞控制阶段]拥塞窗口信息:cwnd = " + cwnd);
    }
    }
    }
    }
    }

TCP_Sender.java(仅对rdt_send进行修改):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//可靠发送(应用层调用):封装应用层数据,产生TCP数据报;需要修改

public void rdt_send(int dataIndex, int[] appData) {

//生成TCP数据报(设置序号和数据字段/校验和),注意打包的顺序
tcpH.setTh_seq(dataIndex * appData.length + 1);//包序号设置为字节流号:
tcpS.setData(appData);
tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);
//更新带有checksum的TCP 报文头
tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));
tcpPack.setTcpH(tcpH);
//System.out.println(dataIndex);
//若未装满则载入包
if(window.isFull()) {
// System.out.println("++++++++++++ 发送窗口满");
flag = 0;
}

//装满则等待
while(flag == 0);
try {
window.pushPack(tcpPack.clone());
} catch (CloneNotSupportedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}

//发送TCP数据报
udt_send(tcpPack);
}

Receiver

简化接收端的窗口,仅需实现对包次序的判断:

  • 收到期望包则按序将已收到的有序包交付

  • 收到失序包(包序大于期望序),不做丢弃,提前缓存,并立即发送ACK(快重传)等待该序号之前的包悉数收到后,按有序包交付。

TCP_Receiver.java(对rdt_recv进行修改):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//接收到数据报:检查校验和,设置回复的ACK报文段
public void rdt_recv(TCP_PACKET recvPack) {
//检查校验码,生成ACK
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {
int[] data = recvPack.getTcpS().getData();
int datalen = data.length;
int sequence = (recvPack.getTcpH().getTh_seq() - 1) / datalen;
if(sequence == excSeq) { //若收到期望包
dataQueue.add(data);
excSeq ++;
while(packets.containsKey(excSeq)) { //将有序包逐个上传,并从窗口中剔除
dataQueue.add(packets.get(excSeq).getTcpS().getData());
packets.remove(excSeq);
excSeq ++;
}
if(dataQueue.size() >= 20 || excSeq >= 999)
deliver_data();
}
else { //若收到失序包
if(!packets.contains(sequence) && sequence > excSeq) {
packets.put(sequence, recvPack);
}
}
tcpH.setTh_ack((excSeq - 1) * datalen + 1);//生成ACK报文段
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
reply(ackPack);
}

System.out.println();

//交付数据(每20组数据交付一次)
deliver_data();
}

验证(Log+Sender.log)

  • 慢启动:

TCP初连时设置cwnd为1且ssthresh为16,因此cwnd呈指数增长。

  • 拥塞避免(加法增大):

当cwnd大于ssthresh时,执行拥塞避免算法,cwnd随RTT呈线性增长。

  • 收到3个重复ACK后执行乘法减小、快重传:

发送方收到三个重复的162序号ACK执行快重传163,并乘法减小,使ssthresh变为当前cwnd大小(22)的一半,然后重置cwnd为1,执行慢开始:

其中可以看到这样的现象:发送方有些包no_ack但是并没有进行重传,这是因为接收方的累积确认,如果发送方(在timer之内)收到了比no_ack报文段序号更高序号的ACK,则证明接收方已经收到了那个包,no_ack的原因可能是ack丢失或者延迟,因此发送方没必要再重发这个包。这也体现了TCP累积确认的重要意义。

TCP Reno: 快恢复

TCP Reno与TCP Tahoe的唯一区别,就是对发送端连续收到收到三个重复ACK并执行快重传之后的处理方式。正如上面所实现的,Tahoe方式是将cwnd直接重置为1,并且以慢开始方式增长cwnd;而Reno则是在ssthresh执行完“乘法减小”之后,令cwnd恢复到ssthresh的大小,因而是执行拥塞避免(加法增大)算法。

因此这个版本的修改较为简单,仅需要在SenderWindow类中修改收到三个ACK之后的执行方式为拥塞避免算法即可。

SenderWindow.java(仅修改handleDeuplicateAck函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void handleDuplicateAck(int sequence) {     //重复包处理

reAck++;
logger.info("############### 收到重复包:sequence = " + sequence + " 重复计数: " + reAck + " ###############");
if (reAck >= 3) {
performFastRetransmit(sequence);
logger.info("[慢开始阶段]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
}
}

private void performCongestionAvoidance() { //拥塞避免
if (curAcked >= RTT) {
logger.info("############### 拥塞避免 ###############");
curAcked = 0;
cwnd++;
RTT++;
logger.info("[拥塞控制阶段]拥塞窗口信息:cwnd = " + cwnd);
}
}

由于这部分是我最终的实验设计,我将会把TCP-Reno的完整代码、Log文件及验证说明放在下一部分——实验结果中说明。

实验结果——TCP Reno

完整代码

CheckSum.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.ouc.tcp.test;

import java.util.Arrays;
import java.util.zip.CRC32;

import com.ouc.tcp.message.TCP_HEADER;
import com.ouc.tcp.message.TCP_SEGMENT;
import com.ouc.tcp.message.TCP_PACKET;

public class CheckSum {

/*计算TCP报文段校验和:只需校验TCP首部中的seq、ack和sum,以及TCP数据字段*/
public static short computeChkSum(TCP_PACKET tcpPack) {
long checkSum = 0;

CRC32 crc = new CRC32();

/*从首部获取seq、ack和sum*/
TCP_HEADER header = tcpPack.getTcpH();
crc.update(header.getTh_seq());
crc.update(header.getTh_ack());
// crc.update(header.getTh_sum());

/*从数据部获取数据*/
TCP_SEGMENT segment = tcpPack.getTcpS();
int[] data = segment.getData();
for(int i = 0;i < data.length; i++) {
crc.update(data[i]);
}

/*计算校验和*/
checkSum = crc.getValue();
return (short) checkSum;
}

}

TCP_Sender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/***************************2.1: ACK/NACK
**************************** Feng Hong; 2015-12-09*/

/***************************TCP-Reno
**************************** 郝文轩; 2024-01-02*/

package com.ouc.tcp.test;

import com.ouc.tcp.test.CheckSum;
import com.ouc.tcp.client.TCP_Sender_ADT;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.*;
import com.ouc.tcp.tool.TCP_TOOL;

public class TCP_Sender extends TCP_Sender_ADT {

private TCP_PACKET tcpPack; //待发送的TCP数据报
private volatile int flag = 1;
//private UDT_Timer timer; //计时器
//private UDT_RetransTask reTrans; //重传任务
private SenderWindow window = new SenderWindow(client);

/*构造函数*/
public TCP_Sender() {
super(); //调用超类构造函数
super.initTCP_Sender(this); //初始化TCP发送端
}

@Override
//可靠发送(应用层调用):封装应用层数据,产生TCP数据报;需要修改
public void rdt_send(int dataIndex, int[] appData) {

//生成TCP数据报(设置序号和数据字段/校验和),注意打包的顺序
tcpH.setTh_seq(dataIndex * appData.length + 1);//包序号设置为字节流号:
tcpS.setData(appData);
tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);
//更新带有checksum的TCP 报文头
tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));
tcpPack.setTcpH(tcpH);
//System.out.println(dataIndex);
//若未装满则载入包
if(window.isFull()) {
// System.out.println("++++++++++++ 发送窗口满");
flag = 0;
}

//装满则等待
while(flag == 0);

try {
window.pushPack(tcpPack.clone());
} catch (CloneNotSupportedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}

//发送TCP数据报
udt_send(tcpPack);
}

@Override
//不可靠发送:将打包好的TCP数据报通过不可靠传输信道发送;仅需修改错误标志
public void udt_send(TCP_PACKET stcpPack) {
//设置错误控制标志
tcpH.setTh_eflag((byte)7);
//System.out.println("to send: "+stcpPack.getTcpH().getTh_seq());
//发送数据报
client.send(stcpPack);
}

@Override
//需要修改
public void waitACK() {
//循环检查ackQueue
//循环检查确认号对列中是否有新收到的ACK
if(!ackQueue.isEmpty()){
int currentAck=(ackQueue.poll() - 1) / 100; //?此处已知字节流长度100
window.rcvAck(currentAck); //交于窗口处理
if(!window.isFull()) { //释放锁
flag = 1;
}
}
}

@Override
//接收到ACK报文:检查校验和,将确认号插入ack队列;NACK的确认号为-1;不需要修改
public void recv(TCP_PACKET recvPack) {
System.out.println("Receive ACK Number: "+ recvPack.getTcpH().getTh_ack());
ackQueue.add(recvPack.getTcpH().getTh_ack());
System.out.println();

//处理ACK报文
waitACK();

}

}

SenderWindow.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/***************************TCP-Reno
**************************** 郝文轩; 2024-01-02*/

package com.ouc.tcp.test;


import com.ouc.tcp.test.CheckSum;
import com.ouc.tcp.client.TCP_Sender_ADT;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.logging.FileHandler;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;

import com.ouc.tcp.client.Client;
import com.ouc.tcp.message.*;
import com.ouc.tcp.tool.TCP_TOOL;

public class SenderWindow {
private int cwnd = 1; //拥塞窗口
private volatile int ssthresh = 16; //慢开始门限
private Hashtable<Integer, TCP_PACKET> tcpPacks = new Hashtable<Integer, TCP_PACKET>(); //待发送的TCP数据报窗口(循环队列)
private Hashtable<Integer, UDT_Timer> timers = new Hashtable<Integer, UDT_Timer>(); //为窗口内每个包分配一个计时器
// private int queue_base = 0; //窗口始端指针
// private int next_to_send = 0; //待发送包指针
// private int queue_rear = 0; //队末指针
// private int[] flag; //0:have no pkt; 1:usable not yet send; 2:sent not yet ack'ed; 3: already ack'ed
private int reAck = 0; //重复包个数
private int lastAck = -1; //上一ACK序号
private int curAcked = 0; //窗口中已收到Ack的数量(等于cwnd时表示一个RTT结束)
private int RTT = 1;

Logger logger;
Client client;

private void resetCwndAndRtt() {
cwnd = 1;
curAcked = 0;
RTT = 1;
}

class Tahoe_Timer extends UDT_RetransTask {
int seq;
private TCP_PACKET packet;

public Tahoe_Timer(Client client, TCP_PACKET packet) {
super(client, packet);
int[] data = packet.getTcpS().getData();
int datalen = data.length;
seq = (packet.getTcpH().getTh_seq() - 1) / datalen;
this.packet = packet;
}

@Override
public void run() {
logger.info("############### 拥塞慢开始 ###############");
ssthresh = Math.max(cwnd / 2, 2);
resetCwndAndRtt();
super.run();

logger.info("[慢开始阶段]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
}
}

private void initLogger(){
logger= Logger.getLogger(SenderWindow.class.getName());

logger.setUseParentHandlers(false);
FileHandler fh = null;
try {
fh = new FileHandler("Sender.log",false);
fh.setFormatter(new SimpleFormatter());//输出格式

} catch (IOException e) {
e.printStackTrace();
}
// fh.setFormatter();
logger.addHandler(fh);
}

public SenderWindow(Client client) {
initLogger();
this.client = client;
}

public boolean isFull() {
return cwnd <= tcpPacks.size();
}

private void handleDuplicateAck(int sequence) { //重复包处理
reAck++;
logger.info("############### 收到重复包:sequence = " + sequence + " 重复计数: " + reAck + " ###############");
if (reAck >= 3) {
performFastRetransmit(sequence);
logger.info("[慢开始阶段]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
}
}

private void performFastRetransmit(int sequence) { //快重传与快恢复
int resendSeq = sequence + 1;
TCP_PACKET pack = tcpPacks.get(resendSeq);
UDT_Timer timer = timers.get(resendSeq);
if (pack != null && timer != null) {
client.send(pack);
timer.cancel();
timers.put(resendSeq, new UDT_Timer());
timers.get(resendSeq).schedule(new Tahoe_Timer(client, pack), 1000, 1000);
logger.info("############### 快重传:seq = " + resendSeq + " ###############");
}
logger.info("############### Reno快恢复 ###############");
ssthresh = Math.max(cwnd / 2, 2);
cwnd = ssthresh;
RTT = cwnd;
curAcked = 0;
performCongestionAvoidance();
//logger.info("[慢开始阶段]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
}

private void performCongestionAvoidance() { //拥塞避免
if (curAcked >= RTT) {
logger.info("############### 拥塞避免 ###############");
curAcked = 0;
cwnd++;
RTT++;
logger.info("[拥塞控制阶段]拥塞窗口信息:cwnd = " + cwnd);
}
}

private void performSlowStart() { //慢开始
cwnd++;
if (curAcked >= RTT) {
logger.info("[一个传输轮次]拥塞窗口信息:cwnd = " + cwnd + " ssthresh = " + ssthresh);
curAcked = 0;
RTT *= 2;
}
}

private void handleNewAck(int sequence) { //处理新Ack
curAcked++;
for (int i = lastAck + 1; i <= sequence; i++) {
tcpPacks.remove(i);
if (timers.containsKey(i)) {
timers.get(i).cancel();
timers.remove(i);
}
}
lastAck = sequence;
reAck = 0;

if (cwnd < ssthresh) {
performSlowStart();
}
else {
performCongestionAvoidance();
}
}

public void pushPack(TCP_PACKET packet) {
int[] data = packet.getTcpS().getData();
int datalen = data.length;
int seq = (packet.getTcpH().getTh_seq() - 1) / datalen;
timers.put(seq, new UDT_Timer());
timers.get(seq).schedule(new Tahoe_Timer(client, packet), 1000, 1000);
tcpPacks.put(seq, packet);
}

//对ACK处理
public void rcvAck(int sequence) {
logger.info("接收到ack: "+ sequence);
// logger.info("cwnd = " + cwnd + " curAcked = " + curAcked);
if(sequence == lastAck) { //收到重复包
handleDuplicateAck(sequence);
}
else {
handleNewAck(sequence);
}
}
}

TCP_Receiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/***************************2.1: ACK/NACK*****************/
/***** Feng Hong; 2015-12-09******************************/

/***************************TCP-Reno
**************************** 郝文轩; 2024-01-02*/

package com.ouc.tcp.test;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Hashtable;

import com.ouc.tcp.test.CheckSum;
import com.ouc.tcp.client.TCP_Receiver_ADT;
import com.ouc.tcp.message.*;
import com.ouc.tcp.tool.TCP_TOOL;

public class TCP_Receiver extends TCP_Receiver_ADT {

private TCP_PACKET ackPack; //回复的ACK报文段
int sequence=1;//用于记录当前待接收的包序号,注意包序号不完全是
// int pre_seq=-1; //用于记录上一个接收成功的包序号
int excSeq = 0; //期望数据包seq
private Hashtable<Integer, TCP_PACKET> packets = new Hashtable<Integer, TCP_PACKET>();
// private ReceiverWindow window = new ReceiverWindow(client, 16);

/*构造函数*/
public TCP_Receiver() {
super(); //调用超类构造函数
super.initTCP_Receiver(this); //初始化TCP接收端
}

@Override
//接收到数据报:检查校验和,设置回复的ACK报文段
public void rdt_recv(TCP_PACKET recvPack) {
//检查校验码,生成ACK
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {
int[] data = recvPack.getTcpS().getData();
int datalen = data.length;
int sequence = (recvPack.getTcpH().getTh_seq() - 1) / datalen;
if(sequence == excSeq) {
dataQueue.add(data);
excSeq ++;
while(packets.containsKey(excSeq)) {
dataQueue.add(packets.get(excSeq).getTcpS().getData());
packets.remove(excSeq);
excSeq ++;
}
if(dataQueue.size() >= 20 || excSeq >= 999)
deliver_data();
}
else {
if(!packets.contains(sequence) && sequence > excSeq) {
packets.put(sequence, recvPack);
}
}
tcpH.setTh_ack((excSeq - 1) * datalen + 1);//生成ACK报文段
ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
reply(ackPack);

}

System.out.println();


//交付数据(每20组数据交付一次)
deliver_data();
}

@Override
//交付数据(将数据写入文件);不需要修改
public void deliver_data() {
//检查dataQueue,将数据写入文件
File fw = new File("recvData.txt");
BufferedWriter writer;

try {
writer = new BufferedWriter(new FileWriter(fw, true));

//循环检查data队列中是否有新交付数据
while(!dataQueue.isEmpty()) {
int[] data = dataQueue.poll();

//将数据写入文件
for(int i = 0; i < data.length; i++) {
writer.write(data[i] + "\n");
}

writer.flush(); //清空输出缓存
}
writer.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Override
//回复ACK报文段
public void reply(TCP_PACKET replyPack) {
//设置错误控制标志
tcpH.setTh_eflag((byte)7); //

//发送数据报
client.send(replyPack);
}
}

TestRun.java

1
2
3
4
5
6
7
8
9
10
11
package com.ouc.tcp.test;

import com.ouc.tcp.app.SystemStart;

public class TestRun {

public static void main(String[] args) throws InterruptedException {
SystemStart.main(null);
}

}

实验结果验证

Sender.log

  1. 初始化慢开始: cwnd=1, ssthresh=16

    根据日志文件可以明显看出,TCP在建立连接后首先执行的是慢开始算法,在这个阶段,cwnd窗口大小每收到一个ACK就扩大一个单位MSS,每经过一个RTT(传输轮次),cwnd就扩大一倍。在这个阶段中,cwnd窗口的大小与RTT呈指数相关增长。

  1. 拥塞控制阶段: cwnd>ssthresh, ssthresh=16

    在拥塞窗口cwnd长度到达慢开始门限ssthresh时,将开始执行加法增大的拥塞控制算法。在这个阶段中,每经过一个传输轮次RTT,cwnd增加一个单位MSS大小,因此cwnd关于RTT呈线性增长。

  1. (收到三个连续ACK)快重传、快恢复: cwnd=21,收到重复包:sequence=118

    在cwnd=21时,发送方连续收到三个报文段序号为118的ACK,此时立刻重传119报文段(快重传),在快重传之后,立刻执行快恢复,此时将ssthresh乘法减小为10,并执行拥塞控制算法,从cwnd=10开始加法增大、线性增长。

Log.txt

由于Log.txt可包含信息受限,我们仅能从中看出ACK接受情况和重传情况。截图部分,则是映照了上文中对119的快重传。

对应的,接收端发送了三个重复的118:

通过这两个截图我们还可以获取到这样的信息:累计确认的作用。

发送方有些包no_ack但是并没有进行重传,这是因为接收方的累积确认,如果发送方(在timer之内)收到了比no_ack报文段序号更高序号的ACK,则证明接收方已经收到了那个包,no_ack的原因可能是ack丢失或者延迟,因此发送方没必要再重发这个包。

并且,累计确认为接收端“发送三个重复ACK”的情景提供了条件,这个过程中,接收端可以照常收到发送端发送的报文段120、121,但是由于119的缺失,接收端为了保证有序而使得接收窗口即使正确收到了120、121报文段,却仅仅将其缓存下来,而回应的ACK仍然是窗口最左端的118,直到发送端重传119并被接收端收到之后,窗口才会右移到下一期望报文段序号前,并发送一个累计确认ACK序号。因此,这就是我们在Log.txt文件中的发送端看到120、121是no_ack并没有重传,而接收端连续发送了三个118的ACK的原因。

因此,我实现的累积确认功能也顺便得到了很好的验证。

RecvData.txt

RecvData的意义,就是用来验证数据是否按序交付给上层,这是可靠传输的重要需求。由于文件过大而不可能悉数展示,仅将最后部分展示如下:

可以看出,接收端成功地按序将数据传输到了上层。至此,完全证明了我所实现的TCP_Reno达到了可靠传输、拥塞控制、流水线设计的要求。

实验中遇到的问题、难点及解决方案

CRC校验和计算

最初我直接从首部获取sum,然而log中NO_ACK铺满屏幕,说明被校验为错误后出现重发死循环,也就是说我的校验和计算函数出现了问题。

解决方法则是在检验和计算中删掉sum。

延迟送达的包导致乱序

在进行RDT3.0设计时,发现原先Receiver对seq的确认仅考虑了是否与上一个包是重复的if(sequence != this.pre_seq),因此在接收到延迟的数据包时会被错误交付给上层。

解决方法则是对延迟收到的包丢弃if(sequence > this.pre_seq)。

选择响应协议中出现的死循环bug

在我最初的设计中,测试时发现这个问题:

窗口范围内的包不断重发,窗口未移动,经过排查后发现:

Sender对于未得到响应的包重发时,取代了该线程本该发送的包,使得窗口序号与数据包序号无法对应,接收方一直在等待该包、而发送方的窗口对应的序号存放了其他序号的数据,于是一直在重发“位于接收方期望的窗口位置的包但实际序号并非接收方所期望的包”,于是接收方ACK、发送方重发……

在代码中排错后发现:

我原本的代码将“窗口满”的忙等循环放在了当前数据包递交给窗口处理操作之后,导致该包被缓存后并未被发送,因而造成了死循环。解决方法则是调转顺序,在窗口处理操作之前就加锁等待空闲块。

慢开始阶段cwnd每收到一个ACK就应该+1,而非等到RTT后一次性*2

为了解决这个问题,我另外设置了一个变量RTT,用来实现对传输轮次的记录,从而保证cwnd在每次接收到Ack之后自加1的同时,curAcked变量的值还能随传输轮次进行更新。

事后🚬

一本正经地

实验设计上的必要解释

事实上由Selective-Response到TCP-Tahoe的开发过程并没有真正的迭代,因为TCP协议是结合了GBN和SR,其中Receiver对收到的包是进行累积确认的,而不同于SR的单个确认,因此在这里需要推翻原本建立的方式;尽管如此,由于TCP协议中对失序包的处理仍使用SR的方式,因此我还是认为这是一个迭代的过程。(当然如果我在RDT4的部分选择了实现TCP而非GBN或SR,将会是严格意义上的迭代开发)。

本实验中的timer也并未严格按照RTT进行更新,仅依靠主观设置时长,当然不严谨,但对于似乎并不在本实验所注重的内容中,因此自行设置的重传时间已经足够。

本实验中seq实际上的值应当是以字节为单位的序号,也即包的序号*datalen之后的值,但为了方便进行窗口大小单位和顺序标定的统一,我在实验中直接将seq粗略的设定为以包(报文段)为单位的数值,把拥塞窗口的单位设置为一个报文段长度(这是由于TCP_PACKETS作为传输单位所决定的),这并不严谨。同时,由于显而易见的在本实验中每个报文段中包含100个单位的数据,因此同样粗略的,我在某些不能计算出datalen(或者说不便于计算)的情况中,将MSS设定为了100,在这个过程中,我同样粗略的将不同报文段的实际长度与最大报文段长度、报文段序号与seq值等单位干脆混为一谈,然而在现实的实现中这几个单位应当是有着区别的。

实验总结

从我的实验报告篇幅就可以看出,我在这个实验上下了很大功夫,这不仅仅是出于教学要求,更重要的是我在其中得到了很大的满足感和获得感,迭代设计的开发原则就是这样刺激人们的头脑,如同垒高楼一样,一点一点向上铺盖,不断提出问题、解决问题,直到满足现有的需求,这很突出的体现了一个系统设计的原则——复杂性递增原则。因而,我在试验中获得的最大收获是,对计算机系统设计工程的思考与验证:

模块化

在实验设计的最一开始,现有的资源就已经为我们提供了一定的模块化,坚不可摧的基础法则下,我们对于迭代开发的过程变得更加简单。仅举例而言,我在实现RDT4的基础上实现TCP_Tahoe的过程中,仅需要修改SenderWindow模块,而基本不需要对将其调用的上层驱动模块进行修改,就可以在SR的基础上实现拥塞控制等功能;再例如,从TCP_Tahoe到TCP_Reno的迭代,更细致地模块单元变更,我只需要修改“收到重复包后的处理”中,将调用更改为执行拥塞控制算法,即可满足从Tahoe到Reno的更新。更改模块比更改模块化更容易,这其中也是强制模块化的主要体现。

抽象化

在我看来,滑动窗口的设计就是一个抽象的过程。举例而言,我所实现的SenderWindow类,就掩盖了下层对于收到ACK的不同情况的不同处理,例如:如果收到三个重复ACK,窗口将会如何活动,如果timeout,又该如何处理,但通过窗口作为间接层的设计,这些复杂性将透明于上层,不为上层所考虑,TCP_Sender只需要——实例化这个SenderWindow的间接,当收到ACK时直接交给他来处理而不用操心他是怎么处理的,再由他将下一个应当发送的报文段返回给TCP_Sender,进行发送……任何问题都可以通过增加一层间接来解决,或者说,计算机系统都可以通过移除一层间接来加快速度。抽象使得模块之间的交互很少甚至没有、任何模块都能完全根据其外部规范来处理所有其他模块而不需要了解内部发生了什么,其将接口与内部内容分离,将规范与显示内容分离。这便是间接方式解耦模块化的设计原则。

分层化

当然的,计算机网络的协议栈本身就是分层的设计思想的体现,在本实验中也有体现——接收端运输层对收到的报文进行处理,按批交付给上层应用层,这个过程便是分层化的主要思想。类似的,我要说的不止是在协议栈上的分层思想,而是通过抽象设计来减少模块互联的模块组织方法,最终形成了分层的组织方法。在使用层进行设计时,可以建立在一组已经完成的底层上,使用之创建不同的不同完整机制(上层)。减少了模块之间的互连,从一小组模块开始,并将它们组装成一个稳定的、独立的子系统,它由一个定义良好的接口。接下来,组装一小群子系统来产生一个更大的子系统。这是对间接与抽象在宏观上的表述。

迭代

迭代本质就是首先构建一个简单的、可工作的系统,它只满足需求的少量子集,然后以小的步骤发展该系统,逐渐包括越来越多的完整需求集。无论是我们在这个小的实验中所复现的运输层RDT协议的发展过程,还是纵观整个计算机网络的发展历史,我们都不难看出,一个完整的、尽可能解决所有问题的、应对一切复杂性的、随时代更新的系统,都应当被设计得容易修改,为迭代设计做好准备,这是每一位系统工程师及设计者应当具备的品质——未雨绸缪,我们不会一次性正确。这正是我们前述铺垫的模块化、抽象、分层或层级所要实现的目的。正是这些应对复杂性的方法,让我们的设计有了随着需求而更新的可能。

瞎说两句

上面的总结,我是搬来上学期计工课的内容,觉得但凡涉及到系统的设计,偏离不了这几点,改改术语、套套皮,就是一篇不错的课设总结,词汇也高大上(毕竟机翻来的生疏语感,总给人一种很专业的感觉),这叫strong,死装。

说我真从中学到东西了吗?撑死,是对计网期末考试时候涉及到运输层的内容有了很深刻的理解,动作起来也很称心应手。但要是说对我专业能力加没加点,我直言是寡淡:学会如何设计虽然重要,但像我这种半吊子选手玩网络也就是51CTO热粉,照着别人嚼过的馒头鼓捣鼓捣路由器,我更感兴趣些,因此要不是守着三瓜两枣的绩点,我对网络的学习绝不会想到要照本宣科去复现TCP。