阿西河

所有教程

公众号
🌙
阿西河前端的公众号

我的收藏

    最近访问  (文章)

      教程列表

      抓包专区
      测试专区

      Locust 使用自定义客户端测试其他系统

      使用自定义客户端测试其他系统

      Locust以HTTP为主要目标而构建。但是,通过编写触发 request_successrequest_failure 事件的自定义客户端,可以轻松扩展它,以对任何基于请求/响应的系统进行负载测试 。

      示例XML-RPC Locust 客户端

      这是Locust 类 XmlRpcLocust 的示例,该类提供了XML-RPC客户端 XmlRpcClient 并跟踪所有发出的请求:

      import time
      import xmlrpclib
      
      from locust import Locust, TaskSet, events, task
      
      
      class XmlRpcClient(xmlrpclib.ServerProxy):
          """
          Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and
          fires locust events on request_success and request_failure, so that all requests
          gets tracked in locust's statistics.
          """
          def __getattr__(self, name):
              func = xmlrpclib.ServerProxy.__getattr__(self, name)
              def wrapper(*args, **kwargs):
                  start_time = time.time()
                  try:
                      result = func(*args, **kwargs)
                  except xmlrpclib.Fault as e:
                      total_time = int((time.time() - start_time) * 1000)
                      events.request_failure.fire(request_type="xmlrpc", name=name, response_time=total_time, exception=e)
                  else:
                      total_time = int((time.time() - start_time) * 1000)
                      events.request_success.fire(request_type="xmlrpc", name=name, response_time=total_time, response_length=0)
                      # In this example, I've hardcoded response_length=0. If we would want the response length to be
                      # reported correctly in the statistics, we would probably need to hook in at a lower level
      
              return wrapper
      
      
      class XmlRpcLocust(Locust):
          """
          This is the abstract Locust class which should be subclassed. It provides an XML-RPC client
          that can be used to make XML-RPC requests that will be tracked in Locust's statistics.
          """
          def __init__(self, *args, **kwargs):
              super(XmlRpcLocust, self).__init__(*args, **kwargs)
              self.client = XmlRpcClient(self.host)
      
      
      class ApiUser(XmlRpcLocust):
      
          host = "http://127.0.0.1:8877/"
          min_wait = 100
          max_wait = 1000
      
          class task_set(TaskSet):
              @task(10)
              def get_time(self):
                  self.client.get_time()
      
              @task(5)
              def get_random_number(self):
                  self.client.get_random_number(0, 100)
      
      

      如果您以前写过 Locust 的测试;

      您会认识到名为 ApiUser 的类,它是普通的 Locust 类,在 task_set 属性中具有 TaskSet 带有 tasks 的类。

      但是,ApiUser 继承自 XmlRpcLocust ,您可以在 ApiUser 上方看到它。

      该 XmlRpcLocust 类下提供 XmlRpcClient 的实例客户端属性。

      该 XmlRpcClient 是围绕标准库的一个 xmlrpclib.ServerProxy包装 。

      它基本上只是作为代理函数调用,但重要的是增加了触发 locust.events.request_successlocust.events.request_failure 事件,这将使所有调用都报告在Locust的统计信息中。

      下面是XML-RPC服务器的实现,该服务器可用作上述代码的服务器:

      import random
      import time
      from SimpleXMLRPCServer import SimpleXMLRPCServer
      
      
      def get_time():
          time.sleep(random.random())
          return time.time()
      
      def get_random_number(low, high):
          time.sleep(random.random())
          return random.randint(low, high)
      
      server = SimpleXMLRPCServer(("localhost", 8877))
      print("Listening on port 8877...")
      server.register_function(get_time, "get_time")
      server.register_function(get_random_number, "get_random_number")
      server.serve_forever()
      
      
      目录
      目录