项目: 基于Python socket模块实现的简单 ftp 项目:

需要 自己创建一个 info 文件 用来存储用户信息

服务器:

 import socket
import pickle
import struct
import os
import time ''.startswith('vip') class Mysocket(socket.socket):
"""此类 重写socket类的reve和send方法, 表示如果发送的是字符串 直接调用这两个方法即可""" def __init__(self, encoding='utf-8'):
self.encoding = encoding
super().__init__() def my_reve(self, num):
msg = self.recv(num).decode(self.encoding)
return msg def my_send(self, msg): return self.send(msg.encode(self.encoding)) class ServerFTP(socket.socket):
def __init__(self):
self.flag = True
self.path_dir = None
self.vip = ''
super().__init__() def up_vip(self,dict_client, login_dict_2):
try:
if not ('vip' in self.path_dir):
print(dict_client['username'])
old_name = dict_client['username']
self.vip = 'vip'
print(7758521)
self.new_name = self.vip + dict_client['username']
os.rename(old_name,self.new_name)
login_dict_2['vip_ret'] = '升级成功'
except Exception:login_dict_2['vip_ret'] ='您已经是vip了,扩容失败' def upload(self, path_dir, dict_client_2, login_dict_2,dict_client):
file_name = dict_client_2['file_name']
file_size = dict_client_2['file_size']
now_dir_name = os.path.join(path_dir, file_name)
with open(now_dir_name, 'wb') as f:
while file_size>0:
content = self.conn.recv(1024)
f.write(content)
file_size -= len(content)
login_dict_2['listdir'] = os.listdir(path_dir)
print(1) def download(self, path_dir, dict_client_2, login_dict_2):
file_path = os.path.join(path_dir, dict_client_2['file_name'])
dict_client_2['file_size'] = os.path.getsize(file_path)
file_size = dict_client_2['file_size']
self.conn.send(struct.pack('i', file_size))
with open(file_path, 'rb') as f:
while file_size:
if file_size<1024:content = f.read(file_size)
else:content = f.read(1024)
self.conn.send(content)
file_size -= len(content)
login_dict_2['listdir'] = os.listdir(path_dir) def login(self, dict_client):
login_dict = {}
with open('../db/info', encoding='utf-8') as f:
for line in f:
if dict_client['username'] + '\t' + dict_client['password'] == line.strip():
login_dict['opt'] = '登录成功'
try:
name = 'vip'+dict_client['username']
self.path_dir = os.path.join(os.path.dirname(__file__), name)
login_dict['listdir'] = os.listdir(self.path_dir)
except Exception:
self.path_dir = os.path.join(os.path.dirname(__file__), dict_client['username'])
login_dict['listdir'] = os.listdir(self.path_dir)
try:
login_dict['容量'] = '%s/%s (单位:字节)' % (
os.path.getsize(os.path.dirname(__file__) + '/vip' +dict_client['username']), 1024 * 10000)
except Exception:
login_dict['容量'] = '%s/%s (单位:字节)' % (
os.path.getsize(os.path.dirname(__file__) + '/' + dict_client['username']), 1024 * 100)
self.conn.send(struct.pack('i', len(pickle.dumps(login_dict))) + pickle.dumps(login_dict))
while self.flag:
self.main_2(dict_client) return
login_dict['opt'] = '登录失败'
self.conn.send(struct.pack('i', len(pickle.dumps(login_dict))) + pickle.dumps(login_dict))
return def register(self, dict_client):
register_dict = {}
with open('../db/info', 'r+', encoding='utf-8') as f:
for line in f:
if dict_client['username'] == line.strip().split('\t')[0]:
register_dict['opt'] = '用户名已存在'
self.conn.send(struct.pack('i', len(pickle.dumps(register_dict))) + pickle.dumps(register_dict))
return
register_dict['opt'] = '注册成功'
f.seek(0, 2)
f.write(dict_client['username'] + '\t' + dict_client['password'] + '\n')
os.mkdir(dict_client['username'])
path_user = os.path.join(os.path.dirname(__file__), dict_client['username'])
for i in range(3):
os.mkdir(os.path.join(path_user, str(i)))
self.conn.send(struct.pack('i', len(pickle.dumps(register_dict))) + pickle.dumps(register_dict))
time.sleep(5)
return def main_2(self, dict_client):
login_dict_2 = {}
dict_client_2 = pickle.loads(self.conn.recv(struct.unpack('i', self.conn.recv(4))[0])) # reve_2 接受 选择和文件内容
if dict_client_2['opt'] == '':
self.path_dir = os.path.join(os.path.dirname(self.path_dir),self.vip+os.path.basename(self.path_dir))
self.path_dir = os.path.join(self.path_dir, dict_client_2['filename'])
login_dict_2['listdir'] = os.listdir(self.path_dir)
print('进入下一层了')
elif dict_client_2['opt'] == '':
print('想访问上一层了')
print(self.path_dir)
self.path_dir_2 = os.path.dirname(self.path_dir)
print(self.path_dir_2)
if self.path_dir_2 == os.path.dirname(__file__):
login_dict_2['listdir'] = '已经到底了 没有内容了哦'
else:
self.path_dir = self.path_dir_2
login_dict_2['listdir'] = os.listdir(self.path_dir)
elif dict_client_2['opt'] == '':
print('想传东西了')
self.upload(self.path_dir, dict_client_2, login_dict_2, dict_client)
print('传完了')
elif dict_client_2['opt'] == '':
self.download(self.path_dir, dict_client_2, login_dict_2)
elif dict_client_2['opt'] == '':
self.up_vip(dict_client,login_dict_2)
elif dict_client_2['opt'] == '':
self.flag = False
return
self.conn.send(struct.pack('i', len(pickle.dumps(login_dict_2))) + pickle.dumps(login_dict_2)) def main(self, conn):
while 1:
head_client = conn.recv(4)
try:
dict_client = pickle.loads(conn.recv(struct.unpack('i', head_client)[0]))
print('来看看他发的啥哈哈哈')
if dict_client['opt'] == '':
self.login(dict_client)
elif dict_client['opt'] == '':
self.register(dict_client)
except struct.error:
return def __call__(self, *args, **kwargs):
sk = socket.socket()
sk.bind(('192.168.19.15', 8888))
sk.listen()
while 1:
self.conn, addr = sk.accept()
print('连上了')
self.main(self.conn)
self.conn.close() if __name__ == '__main__':
ftp_s = ServerFTP()
ftp_s()

客户端:

 import socket
import hashlib
import pickle
import struct
import os class Mysocket(socket.socket):
"""此类 重写socket类的reve和send方法, 表示如果发送的是字符串 直接调用这两个方法即可""" def __init__(self, encoding='utf-8'):
self.encoding = encoding
super().__init__() def my_reve(self, num):
msg = self.recv(num).decode(self.encoding)
return msg def my_send(self, msg): return self.send(msg.encode(self.encoding)) def upload(login_dict):
file_dir = input('>>>请输入文件名:').strip()
file_name = os.path.basename(file_dir)
login_dict['file_name'] = file_name
file_size = os.path.getsize(file_dir)
login_dict['file_size'] = file_size
sk.send(struct.pack('i', len(pickle.dumps(login_dict))) + pickle.dumps(login_dict)) # send_2 : 发送选择 或者 选择和文件
with open(file_dir, 'rb') as f:
while file_size:
content = f.read(1024)
sk.send(content)
file_size -= len(content) def download(login_dict):
file_name = input('>>>下载文件的名字:').strip()
down_path = input('>>>本地下载文件路径:').strip()
login_dict['file_name'] = file_name
sk.send(struct.pack('i', len(pickle.dumps(login_dict))) + pickle.dumps(login_dict))
down_file_size = struct.unpack('i', sk.recv(4))[0]
with open(down_path, 'wb') as f:
while down_file_size:
if down_file_size < 1024:
content = sk.recv(down_file_size)
else:content = sk.recv(1024)
f.write(content)
down_file_size -= len(content) def up_vip(usn): pass def login(dict_server, usn):
login_dict = {}
print(dict_server['listdir'])
while 1:
print('1, 进入下级文件目录\n'
'2, 返回上层目录\n'
'3, 在该目录上传文件\n'
'4, 下载该目录下的某个文件\n'
'5, 升级为VIP\n'
'6, 退出程序')
opt = input('>>>请选择功能:').strip()
login_dict['opt'] = opt
if opt == '':
login_dict['filename'] = input('>>>文件名')
sk.send(
struct.pack('i', len(pickle.dumps(login_dict))) + pickle.dumps(login_dict)) # send_2 : 发送选择 或者 选择和文件 elif opt == '':
upload(login_dict)
print('上传成功')
elif opt == '':
download(login_dict)
print('下载成功')
elif opt == '': # 升级VIP还待补充.
print('将扩大100倍的容量')
up_vip(usn)
sk.send(
struct.pack('i', len(pickle.dumps(login_dict))) + pickle.dumps(login_dict)) # send_2 : 发送选择 或者 选择和文件 if opt == '':
global flag
flag = False
return
dict_server_2 = pickle.loads(sk.recv(struct.unpack('i', sk.recv(4))[0]))
try:
print(dict_server_2['listdir'])
except KeyError:
print(dict_server_2['vip_ret']) def main():
while flag:
print('1, 登录\n2, 注册')
opt = input('>>>选择功能:').strip()
if opt != '' and opt != '':
print('输入错误,程序结束')
return
dic['opt'] = opt
usn = input('>>>Username:').strip()
dic['username'] = usn
pwd = input('>>>Password:').strip()
md5_obj = hashlib.md5(usn.encode('utf-8'))
md5_obj.update(pwd.encode('utf-8'))
pwd = md5_obj.hexdigest()
dic['password'] = pwd
dic_pic = pickle.dumps(dic)
head = struct.pack('i', len(dic_pic))
sk.send(head + dic_pic) # send 1: 把登录/注册 + 用户名 + 密码发送给服务器
head_server = sk.recv(4)
dict_server = pickle.loads(sk.recv(struct.unpack('i', head_server)[0]))
print(dict_server['opt'])
if dict_server['opt'] == '登录成功':
print(dict_server['容量'])
login(dict_server, usn) if __name__ == '__main__':
flag = True
dic = {'opt': '请求的功能', } sk = Mysocket()
sk.connect_ex(('192.168.19.15', 8888)) main() sk.close()
上一篇:Qt5.8以上版本编译Oracle数据库的OCI驱动教程


下一篇:【阿里聚安全·安全周刊】互联网时代人类还有被遗忘的权利吗 | Android与中兴