Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / vigiboard_plugin / test_plugin_hls.py @ 6ab72614

History | View | Annotate | Download (8.21 KB)

1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

    
4
from datetime import datetime
5
import transaction
6
from vigiboard.tests import TestController
7
from nose.tools import assert_equal
8
from vigilo.models import Permission, StateName, \
9
                            HostGroup, Host, HighLevelService, \
10
                            Event, CorrEvent, ImpactedPath, ImpactedHLS
11
from vigilo.models.session import DBSession
12

    
13

    
14
def populate_DB():
15
    """ Peuple la base de données. """
16

    
17
    # On ajoute des noms d'états.
18
    DBSession.add(StateName(statename = u'OK', order=0))
19
    DBSession.add(StateName(statename = u'WARNING', order=2))
20
    DBSession.flush()
21
    transaction.commit()
22

    
23
    # On ajoute un groupe d'hôtes
24
    hostmanagers = HostGroup(name = u'managersgroup')
25
    DBSession.add(hostmanagers)
26
    DBSession.flush()
27

    
28
    # On lui octroie les permissions
29
    manage_perm = Permission.by_permission_name(u'manage')
30
    hostmanagers.permissions.append(manage_perm)
31
    DBSession.flush()
32

    
33
    # On crée un hôte de test.
34
    host = Host(
35
        name = u'host', 
36
        checkhostcmd = u'halt',
37
        snmpcommunity = u'public',
38
        hosttpl = u'/dev/null',
39
        mainip = u'192.168.1.1',
40
        snmpport = 42,
41
        weight = 42,
42
        )
43
    DBSession.add(host)
44

    
45
    # On affecte cet hôte au groupe précédemment créé.
46
    hostmanagers.hosts.append(host)
47
    DBSession.flush()
48

    
49
    # On ajoute un évènement causé par cet hôte.
50
    event1 = Event(
51
        supitem = host,
52
        message = u'foo',
53
        current_state = StateName.statename_to_value(u'WARNING'))
54
    DBSession.add(event1)
55
    DBSession.flush()
56

    
57
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
58
    aggregate = CorrEvent(
59
        idcause = event1.idevent, 
60
        timestamp_active = datetime.now(),
61
        priority = 1,
62
        status = u'None')
63
    aggregate.events.append(event1)
64
    DBSession.add(aggregate)
65
    DBSession.flush()
66
    
67
    transaction.commit()
68

    
69
    return aggregate
70

    
71
def add_paths(path_number, path_length, idsupitem):
72
    """ 
73
    Ajoute path_number chemins de services de haut niveau impactés
74
    dans la base de donnée. Leur longeur sera égale à path_length.
75
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
76
     
77
    path_number * path_length services de 
78
    haut niveau sont créés dans l'opération.
79
    """
80

    
81
    # Création de services de haut niveau dans la BDD.
82
    hls_template = {
83
        'op_dep': u'&',
84
        'message': u'Bar',
85
        'warning_threshold': 60,
86
        'critical_threshold': 80,
87
        'weight': None,
88
        'priority': 2,
89
    }
90

    
91
    # Création des chemins de services de haut niveau impactés.
92
    for j in range(path_number):
93
        
94
        # On crée le chemin en lui-même
95
        path = ImpactedPath(idsupitem = idsupitem)
96
        DBSession.add(path)
97
        DBSession.flush()
98
        
99
        # Pour chaque étage du chemin,
100
        for i in range(path_length):
101
            # on ajoute un service de haut niveau dans la BDD,
102
            hls = HighLevelService(
103
                servicename = u'HLS' + str(j + 1) + str(i + 1), 
104
                **hls_template)
105
            DBSession.add(hls)
106
            # et on ajoute un étage au chemin contenant ce service. 
107
            DBSession.add(
108
                ImpactedHLS(
109
                    path = path,
110
                    hls = hls,
111
                    distance = i + 1,
112
                    ))
113
    
114
    DBSession.flush()
115
    transaction.commit()
116

    
117

    
118
class TestHLSPlugin(TestController):
119
    """
120
    Classe de test du contrôleur listant les services 
121
    de haut niveau impactés par un évènement corrélé.
122
    """
123
    
124
    def test_no_impacted_hls(self):
125
        """
126
        Retour du plugin SHN pour 0 SHN impacté
127
        Teste la valeur de retour du plugin lorsque
128
        aucun service de haut niveau n'est impacté.
129
        """
130
        
131
        # On peuple la base de données avant le test.
132
        aggregate = populate_DB()
133
        DBSession.add(aggregate)
134
        add_paths(0, 0, aggregate.events[0].idsupitem)
135
        DBSession.add(aggregate)
136
        
137
        ### 1er cas : l'utilisateur n'est pas connecté.
138
        # On vérifie que le plugin retourne bien une erreur 404.
139
        resp = self.app.post(
140
            '/get_plugin_value', 
141
            {"idcorrevent" : str(aggregate.idcorrevent),
142
             "plugin_name" : "shn"},
143
            status = 401,)
144
        
145
        ### 2ème cas : l'utilisateur n'a pas les
146
        ### droits sur l'hôte ayant causé le correvent.
147
        # On vérifie que le plugin retourne bien une erreur 404.
148
        resp = self.app.post(
149
            '/get_plugin_value', 
150
            {"idcorrevent" : str(aggregate.idcorrevent),
151
             "plugin_name" : "shn"},
152
            status = 404,
153
            extra_environ={'REMOTE_USER': 'editor'},)
154
        
155
        ### 3ème cas : l'utilisateur a cette fois les droits.        
156
        resp = self.app.post(
157
            '/get_plugin_value', 
158
            {"idcorrevent" : str(aggregate.idcorrevent),
159
             "plugin_name" : "shn"},
160
            extra_environ={'REMOTE_USER': 'manager'})
161
        # On vérifie que le plugin ne retourne toujours rien.
162
        assert_equal(resp.json, {"services": []})
163
    
164
    def test_1_impacted_hls_path(self):
165
        """
166
        Retour du plugin SHN pour 1 chemin impacté
167
        Teste la valeur de retour du plugin lorsqu'un
168
        chemin de services de haut niveau est impacté.
169
        """
170
        
171
        # On peuple la base de données avant le test.
172
        aggregate = populate_DB()
173
        DBSession.add(aggregate)
174
        add_paths(1, 2, aggregate.events[0].idsupitem)
175
        DBSession.add(aggregate)
176
        
177
        ### 1er cas : l'utilisateur n'est pas connecté.
178
        # On vérifie que le plugin retourne bien une erreur 404.
179
        resp = self.app.post(
180
            '/get_plugin_value', 
181
            {"idcorrevent" : str(aggregate.idcorrevent),
182
             "plugin_name" : "shn"},
183
            status = 401,)
184
        
185
        ### 2ème cas : l'utilisateur n'a pas les
186
        ### droits sur l'hôte ayant causé le correvent.
187
        resp = self.app.post(
188
            '/get_plugin_value', 
189
            {"idcorrevent" : str(aggregate.idcorrevent),
190
             "plugin_name" : "shn"},
191
            status = 404,
192
            extra_environ={'REMOTE_USER': 'editor'})
193
        
194
        ### 3ème cas : l'utilisateur a cette fois les droits.        
195
        resp = self.app.post(
196
            '/get_plugin_value', 
197
            {"idcorrevent" : str(aggregate.idcorrevent),
198
             "plugin_name" : "shn"},
199
            extra_environ={'REMOTE_USER': 'manager'})
200
        # On vérifie que le plugin retourne bien les 2 SHN impactés..
201
        assert_equal(resp.json, {"services": ['HLS12']})
202
    
203
    def test_2_impacted_hls_path(self):
204
        """
205
        Retour du plugin SHN pour 2 chemins impactés
206
        Teste la valeur de retour du plugin lorsque deux
207
        chemins de services de haut niveau sont impactés.
208
        """
209
        
210
        # On peuple la base de données avant le test.
211
        aggregate = populate_DB()
212
        DBSession.add(aggregate)
213
        add_paths(2, 2, aggregate.events[0].idsupitem)
214
        DBSession.add(aggregate)
215
        
216
        ### 1er cas : l'utilisateur n'est pas connecté.
217
        # On vérifie que le plugin retourne bien une erreur 404.
218
        resp = self.app.post(
219
            '/get_plugin_value', 
220
            {"idcorrevent" : str(aggregate.idcorrevent),
221
             "plugin_name" : "shn"},
222
            status = 401,)
223
        
224
        ### 2ème cas : l'utilisateur n'a pas les
225
        ### droits sur l'hôte ayant causé le correvent.
226
        resp = self.app.post(
227
            '/get_plugin_value', 
228
            {"idcorrevent" : str(aggregate.idcorrevent),
229
             "plugin_name" : "shn"},
230
            status = 404,
231
            extra_environ={'REMOTE_USER': 'editor'})
232
        
233
        ### 3ème cas : l'utilisateur a cette fois les droits.        
234
        resp = self.app.post(
235
            '/get_plugin_value', 
236
            {"idcorrevent" : str(aggregate.idcorrevent),
237
             "plugin_name" : "shn"},
238
            extra_environ={'REMOTE_USER': 'manager'})
239
        # On vérifie que le plugin retourne bien les 4 SHN impactés..
240
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})
241