Locust 使用自定义客户端测试其他系统

🌙
手机阅读
本文目录结构

使用自定义客户端测试其他系统

Locust以HTTP为主要目标而构建。但是,通过编写触发 request_successrequest_failure 事件的自定义客户端,可以轻松扩展它,以对任何基于请求/响应的系统进行负载测试 。

示例XML-RPC Locust 客户端

这是Locust 类 XmlRpcLocust 的示例,该类提供了XML-RPC客户端 XmlRpcClient 并跟踪所有发出的请求:

import time
import xmlrpclib

from locust import Locust, TaskSet, events, task


class XmlRpcClient(xmlrpclib.ServerProxy):
    """
    Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and
    fires locust events on request_success and request_failure, so that all requests
    gets tracked in locust's statistics.
    """
    def __getattr__(self, name):
        func = xmlrpclib.ServerProxy.__getattr__(self, name)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
            except xmlrpclib.Fault as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(request_type="xmlrpc", name=name, response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(request_type="xmlrpc", name=name, response_time=total_time, response_length=0)
                # In this example, I've hardcoded response_length=0. If we would want the response length to be
                # reported correctly in the statistics, we would probably need to hook in at a lower level

        return wrapper


class XmlRpcLocust(Locust):
    """
    This is the abstract Locust class which should be subclassed. It provides an XML-RPC client
    that can be used to make XML-RPC requests that will be tracked in Locust's statistics.
    """
    def __init__(self, *args, **kwargs):
        super(XmlRpcLocust, self).__init__(*args, **kwargs)
        self.client = XmlRpcClient(self.host)


class ApiUser(XmlRpcLocust):

    host = "http://127.0.0.1:8877/"
    min_wait = 100
    max_wait = 1000

    class task_set(TaskSet):
        @task(10)
        def get_time(self):
            self.client.get_time()

        @task(5)
        def get_random_number(self):
            self.client.get_random_number(0, 100)

如果您以前写过 Locust 的测试;

您会认识到名为 ApiUser 的类,它是普通的 Locust 类,在 task_set 属性中具有 TaskSet 带有 tasks 的类。

但是,ApiUser 继承自 XmlRpcLocust ,您可以在 ApiUser 上方看到它。

该 XmlRpcLocust 类下提供 XmlRpcClient 的实例客户端属性。

该 XmlRpcClient 是围绕标准库的一个 xmlrpclib.ServerProxy包装 。

它基本上只是作为代理函数调用,但重要的是增加了触发 locust.events.request_successlocust.events.request_failure 事件,这将使所有调用都报告在Locust的统计信息中。

下面是XML-RPC服务器的实现,该服务器可用作上述代码的服务器:

import random
import time
from SimpleXMLRPCServer import SimpleXMLRPCServer


def get_time():
    time.sleep(random.random())
    return time.time()

def get_random_number(low, high):
    time.sleep(random.random())
    return random.randint(low, high)

server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()

AXIHE / 精选资源

浏览全部教程

面试题

学习网站

前端培训
自己甄别

前端书籍

关于朱安邦

我叫 朱安邦,阿西河的站长,在杭州。

以前是一名平面设计师,后来开始接接触前端开发,主要研究前端技术中的JS方向。

业余时间我喜欢分享和交流自己的技术,欢迎大家关注我的 Bilibili

关注我: Github / 知乎

于2021年离开前端领域,目前重心放在研究区块链上面了

我叫朱安邦,阿西河的站长

目前在杭州从事区块链周边的开发工作,机械专业,以前从事平面设计工作。

2014年底脱产在老家自学6个月的前端技术,自学期间几乎从未出过家门,最终找到了满意的前端工作。更多>

于2021年离开前端领域,目前从事区块链方面工作了