目录
TCP协议特点
编写TCP服务器
服务端实现
类方法:初始化服务器
类方法:启动服务器
部署服务器:
服务端代码
客户端实现
1.创建套接字socket
2.发起链接请求connect
3.收发消息
客户端代码:
线程池代码:
定义任务类代码:
有连接:需要提前通过三次握手建立连接
可靠传输:接收方没有收到数据会重新发送数据包,直到收到确认信息为止。
面向字节流:数据是连续的字节流,没有消息边界,接收方需要根据协议等来解析数据。
支持全双工:可以读写使用同一个套接字,因为服务端和客户端的TCP底层都具有接收缓冲区和发送缓冲区
封装成类
类成员:(倘若要实现线程池版本,可以加入线程池指针)
private:int listenSock_; // 监听套接字uint16_t port_; // 端口号string ip_; // ip地址
1.创建套接字:
domain:AF_INET使用IPv4进行网络通信
type套接字类型:SOCK_STREAM表示流式套接
protocol:0表示使用默认协议类型
2.绑定网络信息
2.1填充信息到结构体,这步与UDP服务器同
填充协议家族,填充端口号,填充IP地址
2.2bind,将本地socket的信息写入socket对应的内核区域
3.监听socket
因为Tcp是面向连接的,所以在通信之前需要先进行连接
使用接口:
sockfd:进行监听的套接字
backlog:TCP底层所能维护的未被accpet获取的最大连接长度(已经三次握手进入ESTABLISHED状态)
循环进行
1.获取链接:
sockfd:从监听的套接字获取连接,没有连接就阻塞等待,它的核心工作是获取新的连接
后两个参数,一个是输出型参数,另一个是输入输出型参数。 可以用于提取客户端的IP地址或端口号
返回值:accept返回一个新的socketfd,是主要为用户提供网络服务的套接字,主要是进行IO
2.提供服务
多种方法:
(1)单进程:单进程只能为一个客户端提供服务
(2)多进程版本1:将SIGCHLD信号处理方法设置为忽略,创建子进程提供服务
(3)多进程版本2:创建孙子进程提供服务,退出子进程,让父进程等待子进程,
(4)多线程版本:创建新线程提供服务
(5)线程池版本:创建线程池,构建任务放入任务队列,线程池自动取任务处理
也就是让服务器以守护进程方式运行
一般服务器工作对外提供服务,一旦启动,除非主动关闭,否则一直在运行,比如sshd提供Linux的登录服务,其父进程是1,是自成进程组,自成新的会话,其做的工作不受用户登录和注销的影响。
守护进程化步骤:
1.调用setsid:setsid将调用进程设置一个新的会话,调用函数的进程不能是进程组的组长,常规做法fork,因为子进程不是组长进程
2.重定向0,1,2文件描述符到Linux下的垃圾桶/dev/null,从垃圾桶中的读写一概会被丢弃3.命令行用命令:nohup ./a.out &来启动程序,nohup表示不被挂起,&是后台的意思
或者可以使用系统调用daemon进行守护进程化
可以选择忽略sigpipe信号,因为写端一直在写时,读端关闭,写端会被sigpipe信号信号终止,要更改进程的工作目录则使用chdir()函数。
主函数:1.守护进程化 2.创建tcp服务器对象 3.初始化服务 4.启动服务
#include "log.hpp"
#include "threadPool.hpp"
#include "task.hpp"
#include "daemonize.hpp"
class ThreadData
{
public:ThreadData(int sockfd, string ip, uint16_t port): sockfd_(sockfd),ip_(ip),port_(port){}int sockfd_;string ip_;uint16_t port_;
};void transService(int sockfd, string ip, uint16_t port)//转换大小写服务
{assert(sockfd > 0);assert(!ip.empty());assert(port >= 1024);char buffer[BUFFER_SIZE];while (1){ssize_t s = read(sockfd, buffer, sizeof(buffer) - 1);if (s > 0){buffer[s] = 0;if (strcasecmp(buffer, "quit") == 0){logMessage(DEBUG, "client quit -- %s[%d]", ip.c_str(), port);break;}logMessage(DEBUG, "trans before:%s[%d]>>>%s", ip.c_str(), port, buffer);for (int i = 0; i < s; i++)buffer[i] = toupper(buffer[i]);logMessage(DEBUG, "trans after:%s[%d]>>>%s", ip.c_str(), port, buffer);write(sockfd, buffer, strlen(buffer));}else if (s == 0){logMessage(DEBUG, "client quit:%s[%d]", ip.c_str(), port);break;}else{logMessage(DEBUG, "%s[%d] read:%s", ip.c_str(), port, strerror(errno));break;}}close(sockfd);logMessage(DEBUG, "server close %d done", sockfd);
}void execCommand(int sock, const std::string &clientIp, uint16_t clientPort)//执行指令服务
{assert(sock >= 0);assert(!clientIp.empty());assert(clientPort >= 1024);char command[BUFFER_SIZE];while (1){ssize_t s = read(sock, command, sizeof(command) - 1);if (s > 0){command[s] = 0;string safe = command;if ((safe.find("rm") != string::npos) || (safe.find("unlink") != string::npos) || (safe.find("mv") != string::npos))break;FILE *pf = popen(command, "r");if (pf == nullptr){logMessage(WARINING, "exec %s failed ,because : %s", command, strerror(errno));break;}char line[BUFFER_SIZE];while (fgets(line, sizeof(line) - 1, pf) != nullptr) // fgets自动填充\0{write(sock, line, strlen(line));}pclose(pf);logMessage(DEBUG, "[%s:%d]exec %s done", clientIp.c_str(), clientPort, command);}else if (s == 0){logMessage(DEBUG, "client[%s:%d] quit", clientIp.c_str(), clientPort);break;}else{logMessage(DEBUG, "[%s:%d] read : %s", clientIp.c_str(), clientPort, strerror(errno));break;}}close(sock);logMessage(DEBUG, "server close %d done", sock);
}void *threadRoutine(void *argc)
{ThreadData *td = static_cast(argc);transService(td->sockfd_, td->ip_, td->port_);delete td;return nullptr;
}class tcpServer
{
public:tcpServer(uint16_t port, string ip): port_(port), ip_(ip), listenSock_(-1){}~tcpServer() {}void init(){// 1.创建套接字listenSock_ = socket(AF_INET, SOCK_STREAM, 0);if (listenSock_ < 0){logMessage(FATAL, "socket: %s ", strerror(errno));exit(SOCKET_ERR);}logMessage(DEBUG, "socket: %s ", strerror(errno));// 2. bind// 2.1 填充信息到结构体struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port_);ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));// 2.2 bind,本地sock写入内核sockif (bind(listenSock_, (const struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind: %s", strerror(errno));exit(BIND_ERR);}logMessage(DEBUG, "bind: %s,%d", strerror(errno), listenSock_);// 3.监听if (listen(listenSock_, 5) < 0){logMessage(FATAL, "listen: %s", strerror(errno));exit(LISTEN_ERR);}logMessage(DEBUG, "listen: %s,%d", strerror(errno), listenSock_);// 初始化线程池tp_ = ThreadPool::getInstance();tp_->start();}void start(){while (1){// signal(SIGCHLD,SIG_IGN);// 4.连接struct sockaddr_in peer;socklen_t len = sizeof peer;int serviceSock = accept(listenSock_, (struct sockaddr *)&peer, &len);if (serviceSock < 0){logMessage(WARINING, "accept:%s[%d]", strerror(errno), serviceSock);continue;}// 4.1获取客户端基本信息uint16_t peerPort = ntohs(peer.sin_port);string ip = inet_ntoa(peer.sin_addr);logMessage(DEBUG, "accept :%s |%s[%d],sockfd:%d", strerror(errno), ip.c_str(), peerPort, serviceSock);// 5.提供服务:转成大写// 单进程版本// transService(serviceSock, ip, peerPort);// 多进程版本 忽略SIGCHLD信号// pid_t id=fork();// assert(id!=-1);// if(id==0)// {// close(listenSock_);// transService(serviceSock, ip, peerPort);// exit(0);// }// close(serviceSock);//避免文件描述符资源泄漏// 多进程版本 创建孙子进程,退出父进程,让系统回收孤儿进程// pid_t id = fork();// assert(id != -1);// if (id == 0)// {// close(listenSock_);// if (fork() > 0)// exit(0);// // 孙子进程提供服务// transService(serviceSock, ip, peerPort);// exit(0);// }// pid_t ret=waitpid(id,nullptr,0);// assert(ret>0);// (void )ret;// 多线程版本// ThreadData *td = new ThreadData(serviceSock, ip, peerPort);// pthread_t tid;// pthread_create(&tid, nullptr, threadRoutine, (void *)td);// 线程池版本// task t(serviceSock,ip,peerPort// ,std::bind(transService,placeholders::_1,placeholders::_2,placeholders::_3));// tp_->push(t);task t(serviceSock, ip, peerPort, execCommand);tp_->push(t);}}private:int listenSock_; // 监听套接字uint16_t port_; // 端口号string ip_; // ip地址ThreadPool *tp_; // 线程池
};
int main(int argc, char *argv[])
{if (argc != 2 && argc != 3){cout << "Usage:\n\t" << argv[0] << " port [ip]" << endl;exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);string ip;if (argc == 3)ip = argv[2];daemonize();tcpServer ts(port, ip);ts.init();ts.start();return 0;
}
守护进程函数实现:
#include"log.hpp"
#include
void daemonize()
{int fd;signal(SIGPIPE,SIG_IGN);// 忽略SIGPIPE信号,避免写端一直在写,读端关闭,写端被信号终止if(fork()>0)exit(0);// 让自己不成为进程组的组长,可以调用setsidsetsid();//设置自己为独立会话组//重定向0 1 2 到 /dev/null linux下的垃圾桶if((fd = open("/dev/null",O_RDWR))!=-1){dup2(fd,0);dup2(fd,1);dup2(fd,2);}//close(fd);
}
不需要显式绑定bind,避免客户端端口号固定,因为端口号可能被其他客户端占用,不需要监听Listen,不需要accept
先填充需要连接的远端主机的基本信息 ,再调用connect连接服务端
注意:发起请求时,connect自动帮我们绑定端口号
不能使用recvfrom和sendto(UDP专用的),可以使用read和write,或recv和send
#include "log.hpp"
volatile bool quit = false;
int main(int argc, char *argv[])
{if (argc != 3){cout << "Usage\n\t" << argv[0] << " ip port" << endl;exit(USAGE_ERR);}string ip = argv[1];uint16_t port = atoi(argv[2]);// 1.创建套接字int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){logMessage(FATAL, "sockfd :%s", strerror(errno));exit(SOCKET_ERR);}// 2.发起连接请求// 2.1填充服务器基本信息struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(port);inet_aton(ip.c_str(), &server.sin_addr);// 2.2发起请求if (connect(sockfd, (const struct sockaddr *)&server, sizeof server) != 0){logMessage(FATAL, "connect :%s", strerror(errno));exit(CONN_ERR);}logMessage(DEBUG, "connect :%s,sockfd:%d ", strerror(errno), sockfd);string message;while (!quit){message.clear();cout << "请输入你的信息:";getline(cin, message);if (strcasecmp("quit", message.c_str()) == 0)quit = true;ssize_t s = write(sockfd, message.c_str(), message.size());if (s > 0){message.resize(1024);ssize_t t = read(sockfd, (void *)message.c_str(), 1024);if (t > 0){message[t] = '\0';cout << "server echo#" << message.c_str() << endl;}}else if (s <= 0){break;}}close(sockfd);return 0;
}
其他头文件代码:
#include
#include
#include
#include
#include
using namespace std;int gThreadNum = 5;template
class ThreadPool
{
private:ThreadPool(int threadNum = gThreadNum): threadNum_(threadNum), isStart_(false){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&cond_,nullptr);}ThreadPool(ThreadPool &) = delete;void operator=(ThreadPool &) = delete;public:// 线程安全地创建单例static ThreadPool *getInstance(){if (instance == nullptr) // 过滤掉不满足条件的{static pthread_mutex_t mutex_s = PTHREAD_MUTEX_INITIALIZER;pthread_mutex_lock(&mutex_s);if (instance == nullptr){instance = new ThreadPool();}pthread_mutex_unlock(&mutex_s);}return instance;}static void *threadRoutine(void *args){pthread_detach(pthread_self());ThreadPool *tp = static_cast *>(args);//prctl(PR_SET_NAME, "follower");while(1){//用条件变量保证线程间的互斥和同步tp->lockQueue();while(tp->queueEmpty()){tp->waitFortask();}//从任务队列中取出任务;T t = tp->pop();tp->unlockQueue();t();}return nullptr;}void start(){assert(!isStart_);for (int i = 0; i < threadNum_; i++){pthread_t p;pthread_create(&p, nullptr, threadRoutine, this); // 传入this,可以访问类内的方法}isStart_ = true;}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}void push(const T&in)//将任务放入任务队列中{lockQueue();taskQueue_.push(in);unlockQueue();wakeThread();}
private:void lockQueue(){pthread_mutex_lock(&mutex_);}void unlockQueue(){pthread_mutex_unlock(&mutex_);}bool queueEmpty(){return taskQueue_.empty();}void waitFortask(){pthread_cond_wait(&cond_,&mutex_);}void wakeThread(){pthread_cond_signal(&cond_);}//依次唤醒等待队列中的线程T pop()//从任务队列中取出任务{T temp = taskQueue_.front();taskQueue_.pop();return temp;}
private:bool isStart_;int threadNum_;queue taskQueue_;pthread_mutex_t mutex_;pthread_cond_t cond_;// 单例模式static ThreadPool *instance;
};template
ThreadPool * ::ThreadPool::instance = nullptr;
#pragma once
#include
#include "log.hpp"
class task
{
public:// 等价于typedefusing callBack_t = function;task(int sock,string ip,uint16_t port,callBack_t func): sock_(sock),ip_(ip), port_(port), func_(func){}~task(){}void operator()(){logMessage(DEBUG,"线程[%p],处理[%s:%d]的请求开始了",pthread_self(),ip_.c_str(),port_);func_(sock_,ip_,port_);logMessage(DEBUG,"线程[%p],处理[%s:%d]的请求结束了",pthread_self(),ip_.c_str(),port_);}private:int sock_;string ip_;uint16_t port_;callBack_t func_;
};
效果演示: