基于select的sokct ftp 服务端

由于时间问题,没有实现并发上传下载的功能。后面再完善。
import select,socket,queue,json,os,threading,time,gevent,hashlib
from gevent import monkey
monkey.patch_all()

class MyselectTCP(object):
    '''
        用select 实现的socket 服务端,没能实现并发
    '''
    socket_familly = socket.AF_INET
    socket_type = socket.SOCK_STREAM
    socket_request_numb = 1000
    inputs = []
    outputs = []
    readerable, writeable, exceptinal = None,None,None
    msg_dict ={}
    BASEADDR = os.path.abspath("..\log\home")

    def __init__(self,ip,port):
        '''
        实例化一个socket服务端
        :param ip: 
        :param port: 
        '''
        self.sever = socket.socket(self.socket_familly,self.socket_type)
        self.sever.bind((ip,port))
        self.sever.listen(self.socket_request_numb)
        self.sever.setblocking(False)
        self.inputs.append(self.sever)
        self.connection()

    def com_get(self,w_conn,bcak_dict):
        '''
        客户端请求下载处理方法
        :param w_conn: 
        :param bcak_dict: 
        :return: 
        '''
        filename = bcak_dict["file"]
        fileaddr = self.BASEADDR + '\\%s' % filename
        #判断下载文件是否存在
        if os.path.isfile(fileaddr):
            bcak_dict["file"] = fileaddr
            bcak_dict["size"] = str(os.stat(fileaddr).st_size)
        else:
            bcak_dict["error"] = '777'


        print("back data is [%s]"%bcak_dict)
        #发送文件信息给客户端
        w_conn.send(json.dumps(bcak_dict).encode())
        md5 = hashlib.md5()
        #开始发送
        if os.path.isfile(fileaddr):
            if w_conn.recv(2) == b'ok':
                file_obj = open(fileaddr,'rb')
                for line in file_obj:
                    md5.update(line)
                    w_conn.send(line)
                    time.sleep(1)
                w_conn.send(md5.hexdigest().encode())

    def com_put(self,w_conn,bcak_dict):
        '''
        客户端请求上传处理方法
        :param w_conn: 
        :param bcak_dict: 
        :return: 
        '''

        filename = bcak_dict["file"]
        #格式和文件地址
        fileaddr = self.BASEADDR + '\\%s' % filename
        #获取文件大小
        filesize = int(bcak_dict["size"])
        #是否重命名
        if os.path.isfile(fileaddr):
            fileaddr += '_new'
        size = 0
        recv_size = 0
        file_obj = open(fileaddr,'wb')
        #开始接收
        w_conn.send(b'ok')
        md5 = hashlib.md5()
        #开始接收文件
        while recv_size <filesize:
            if filesize - recv_size >1024:
                size = 1024
            else:
                size = filesize - recv_size

            recv_date = w_conn.recv(size)
            recv_size += len(recv_date)
            file_obj.write(recv_date)
            file_obj.flush()
            md5.update(recv_date)
            less = int(float(recv_size / filesize) * 100)
            print("file get now  %s " % str(less))
            # time.sleep(5)
        else:
            # 文件校验,检查是否接收出错
            check_md5 = w_conn.recv(1024)
            print("\033[31;1m [%s] \033[0m  \033[30;1m [%s] \033[0m" % (md5.hexdigest(), check_md5))
            if md5.hexdigest() == check_md5.decode():
                print("put the file successful!")
            file_obj.close()

    def send(self):
        '''
        响应方法,处理接收数据并作出动作
        :return: 
        '''
        for w_conn in self.writeable:
            try:
                try:
                    recv_date = self.msg_dict[w_conn].get_nowait()
                except queue.Empty as e:
                    print('{} is no data'.format(w_conn.getpeername()))
                    print("**********************************************")
                    self.outputs.remove(w_conn)
                else:
                    try:
                        if recv_date.get("action") is not None:
                            if hasattr(self, 'com_%s'%recv_date['action']):
                                func = getattr(self, 'com_%s'%recv_date['action'])
                                # threading.Thread(target= func ,args=(w_conn,recv_date)).start()
                                # gevent.spawn(func,w_conn,recv_date)
                                func(w_conn,recv_date)
                    except Exception as e:
                        w_conn.send(recv_date[0].upper().encode())

            except ConnectionResetError as e:
                print(e)
                if w_conn in self.inputs:
                    self.inputs.remove(w_conn)
                if w_conn in self.outputs:
                    self.outputs.remove(w_conn)
                del self.msg_dict[w_conn]
                w_conn.close()

    def connection(self):
        '''
        等待文件描述符,活动的文件描述符会被监听
        :return: 
        '''
        while True:
            self.readerable, self.writeable, self.exceptinal = select.select(self.inputs, self.outputs, self.inputs)

            for r_conn in self.readerable:
                if r_conn is self.sever:
                    conn, addr = self.sever.accept()
                    print('new connect [%s]'%str(addr))
                    self.inputs.append(conn)
                    self.msg_dict[conn] = queue.Queue()

                else:
                    try:
                        recv_data = r_conn.recv(1024)

                        #收到数据
                        if recv_data:
                            print("recv data \033[31;1m [%s] \033[0m from [%s]" % (recv_data, r_conn.getpeername()))
                            recv_data = json.loads(recv_data.decode())
                            self.msg_dict[r_conn].put(recv_data)
                            #加入输出列表
                            if r_conn not in self.outputs:
                                self.outputs.append(r_conn)

                        else:
                            print("client lost")
                            self.inputs.remove(r_conn)
                            if r_conn in self.outputs:
                                self.outputs.remove(r_conn)
                            del self.msg_dict[r_conn]
                            r_conn.close()

                    except ConnectionResetError as e:
                        print(e)
                        if r_conn in self.inputs:
                            self.inputs.remove(r_conn)
                        if r_conn in self.outputs:
                            self.outputs.remove(r_conn)
                        del self.msg_dict[r_conn]
                        r_conn.close()

            self.send()

            for e in self.exceptinal:
                if e in self.outputs:
                    self.outputs.remove(e)
                self.inputs.remove(e)
                del self.msg_dict[e]
                print("err", e)
                e.close()

if __name__ =="__main__":
    tcp = MyselectTCP('localhost',9999)

  

上一篇:SYN_RECV处理方案


下一篇:PythonDay05---from DFZY