pyhon-高并发测试

使用gevent。对注册、接口进行200的并发量进行测试。

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#设置路径:Defualt Settings---Editor--File and Code Templates
# Author:肖遥
import time
import json
import random
from urllib.error import URLError
from urllib import request
import http.client
import requests
import gevent
from gevent import monkey

# 补丁
monkey.patch_all()

#** 请求URL **
url = 'http://172.17.0.128:15124/mdm/public/query'
headers = {'Content-Type': 'application/json'}

error=[]


def make_data(num):
    """制造请求数据"""
    # data = {
    #     "id": num,
    #     "name": "test" + str(num),
    # }
    data={"Request": {"Head": {"LicId": "", "ContentType": "text/json", "TranCode": "DeptInfo", "ServiceVersion": "5.0.2",
                          "ContentEncoding": "", "Timestamp": "2018-10-19 17:09:55.137", "TransferType": "",
                          "SecurityContent": "", "OrgId": "01", "Callback": "", "AppId": "HIS0311", "Version": "1.1",
                          "SecurityPolicy": "", "AppType": "PC", "MessageId": "EA455467-A04A-4E73-8BE4-190078D3C258"}
        , "Body": {"Page": 1, "PageSize": 30, "Params": [{"key": "deptCode", "value": "T234"}]}}}
    return data


def run():
    """三种模拟请求"""
    num = random.randint(1, 9)
    data = make_data(num)
    global error
    try:
        # # s1:request请求
        # req = request.Request(url=url, data=data, headers=headers, method="POST")
        # response = request.urlopen(req)
        # resp = response.read()
        # print("服务器返回值为:\n", resp.decode('utf-8'))

        # s2:httpclient请求
        # httpclient = http.client.HTTPConnection(host='127.0.0.1', port=8001)
        # httpclient.request("POST", '/insert', data, headers)
        # response = httpclient.getresponse()
        # print(response.read().decode())

        # s3:requests请求
        resp = requests.post(url=url, json=data, headers=headers)
        print("状态:\n", resp)
        print("请求头:\n", resp.headers)
        print("服务器返回值为:\n", resp.content.decode())
        if resp.status_code != 200:
            error.append("0")
    except URLError as e:
        print('请求', e)
        # error.append("ue0")
    except Exception as e:
        error.append("re0")
        print('请求错误:', e)


def call_gevent(count):
    """调用gevent 模拟高并发"""
    begin_time = time.time()
    run_gevent_list = []
    for i in range(count):
        print('--------------%d--Test-------------' % i)
        run_gevent_list.append(gevent.spawn(run()))
    gevent.joinall(run_gevent_list)
    end = time.time()
    print('单次测试时间(平均)s:', (end - begin_time) / count)
    print('累计测试时间 s:', end - begin_time)
    print("运行失败用例数:",error.count("0"))
    print("运行报错数:",error.count("re0") )


if __name__ == '__main__':
    # 10万并发请求
    test_count = 200
    call_gevent(count=test_count)

 

上一篇:Python协程之Gevent模块


下一篇:38 协程 greenlet模块实现并发 Gevent