Locust est basé sur le mécanisme de coroutine de Python, qui brise les limitations des processus de thread et peut exécuter une concurrence élevée sur une machine de test
1. Vitesse : mesurer l'efficacité de traitement du système : temps de réponse
2 . Combien : Mesurer les capacités de traitement du système : combien de transactions (tps) peuvent être traitées par unité de temps
Les tests de performances sont le plus souvent divisés en trois catégories suivantes en fonction des exigences de test
1 Test de charge
Pressuriser en continu le serveur, des indicateurs dignes de réservation ou certaines ressources du système ont atteint des goulots d'étranglement. Le but est de trouver la capacité de charge maximale du système
2 Test de stress
Vérifier si le système est stable grâce à une charge élevée qui dure longtemps
3 Test de concurrence :
Soumettre au serveur en même temps Demande, le but est de savoir s'il y a des conflits de transactions ou des mises à niveau de verrouillage dans le système
Modèle de charge de performance
Là S'il y a des problèmes avec l'installation, vous pouvez le télécharger via la source Douban
pip install locust
Basiquement, la plupart Nous pouvons apporter des modifications à la scène en fonction de ce modèle read.py
from locust import HttpUser, TaskSet, task, tag, events # 启动locust时运行 @events.test_start.add_listener def setup(environment, **kwargs): # print("task setup") # 停止locust时运行 @events.test_stop.add_listener def teardown(environment, **kwargs): print("task teardown") class UserBehavor(TaskSet): #虚拟用户启用task运行 def on_start(self): print("start") locusts_spawned.wait() #虚拟用户结束task运行 def on_stop(self): print("stop") @tag('test1') @task(2) def index(self): self.client.get('/yetangjian/p/17320268.html') @task(1) def info(self): self.client.get("/yetangjian/p/17253215.html") class WebsiteUser(HttpUser): def setup(self): print("locust setup") def teardown(self): print("locust teardown") host = "https://www.cnblogs.com" task_set = task(UserBehavor) min_wait = 3000 max_wait = 5000
Remarque : Ici, nous avons donné un hébergeur afin que nous peut ouvrir le criquet directement dans le navigateur
Bien sûr, nous pouvons mettre l'opération du point de rendez-vous Entrez la configuration du modèle ci-dessus pour l'exécuter
locusts_spawned = Semaphore() locusts_spawned.acquire() def on_hatch_complete(**kwargs): """ select_task类的钩子函数 :param kwargs: :return: """ locusts_spawned.release() events.spawning_complete.add_listener(on_hatch_complete) n = 0 class UserBehavor(TaskSet): def login(self): global n n += 1 print(f"第{n}个用户登陆") def on_start(self): self.login() locusts_spawned.wait() @task def test1(self): #catch_response获取返回 with self.client.get("/yetangjian/p/17253215.html",catch_response=True): print("查询结束") class WebsiteUser(HttpUser): host = "https://www.cnblogs.com" task_set = task(UserBehavor) wait_time = between(1,3) if __name__ == '__main__': os.system('locust -f read.py --web-host="127.0.0.1"')
Dans les deux exemples ci-dessus, nous en avons vu , comme le décorateur events.test_start.add_listener; events.test_stop.add_listener est utilisé avant et après le test de charge. Effectuez certaines opérations, telles que on_start et on_stop, avant et après l'exécution de la tâche. Par exemple, la tâche peut être utilisée pour allouer la tâche. poids de la tâche
Temps d'attente
# wait between 3.0 and 10.5 seconds after each task #wait_time = between(3.0, 10.5) #固定时间等待 # wait_time = constant(3) #确保每秒运行多少次 constant_throughput(task_runs_per_second) #确保每多少秒运行一次 constant_pacing(wait_time)
Vous pouvez également réécrire wait_time dans la classe User pour obtenir une définition automatique du
tag mark
@tag('test1') @task(2) def index(self): self.client.get('/yetangjian/p/17320268.html')
En marquant des tâches, vous pouvez exécuter certaines tâches au moment de l'exécution :
#只执行标记test1 os.system('locust -f read.py --tags test1 --web-host="127.0.0.1"') #不执行标记过的 os.system('locust -f read.py --exclude-tags --web-host="127.0.0.1"') #除去test1执行所有 os.system('locust -f read.py --exclude-tags test1 --web-host="127.0.0.1"')
Échec de la personnalisation
#定义响应时间超过0.1就为失败 with self.client.get("/yetangjian/p/17253215.html", catch_response=True) as response: if response.elapsed.total_seconds() > 0.1: response.failure("Request took too long") #定义响应码是200就为失败 with self.client.get("/yetangjian/p/17320268.html", catch_response=True) as response: if response.status_code == 200: response.failure("响应码200,但我定义为失败")
Forme de chargement personnalisée
Auto-définition d'un shape.py en héritant de LoadTestShape et en remplaçant tick
Cette classe de forme augmentera le nombre d'utilisateurs par unités de 100 blocs à un rythme de 20 , puis arrêtez le test de charge après 10 minutes (à partir de la 51e seconde de l'exécution, user_count sera arrondi à 100)
from locust import LoadTestShape class MyCustomShape(LoadTestShape): time_limit = 600 spawn_rate = 20 def tick(self): run_time = self.get_run_time() if run_time < self.time_limit: # User count rounded to nearest hundred. user_count = round(run_time, -2) return (user_count, self.spawn_rate) return None
Le graphique en cours est le suivant
Exemples de déclenchement
os.system('locust -f read.py,shape.py --web-host="127.0.0.1"')
à différentes étapes temporelles via la ligne de commande
from locust import LoadTestShape class StagesShapeWithCustomUsers(LoadTestShape): stages = [ {"duration": 10, "users": 10, "spawn_rate": 10}, {"duration": 30, "users": 50, "spawn_rate": 10}, {"duration": 60, "users": 100, "spawn_rate": 10}, {"duration": 120, "users": 100, "spawn_rate": 10}] def tick(self): run_time = self.get_run_time() for stage in self.stages: if run_time < stage["duration"]: tick_data = (stage["users"], stage["spawn_rate"]) return tick_data return None
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!