Project

General

Profile

Revision 0c8b0e15

ID0c8b0e15ab89bb02fc1a6f8a30920f35f4272083
Parent 00d2e1d1
Child e069ba79

Added by Francois POIROTTE over 14 years ago

Dans le RootController : passage des paramètres entrés au template (pour réutilisation dans les liens de pagination).
Quelques corrections dans les tests unitaires. Ils restent encore des tests qui échouent (en partie liés aux thèmes).

git-svn-id: https://vigilo-dev.si.c-s.fr/svn@2181 b22e2e97-25c9-44ff-b637-2e5ceca36478

View differences:

vigiboard/controllers/root.py
248 248
            id_first_row += 1
249 249

  
250 250
        return dict(
251
            idcorrevent = idcorrevent,
251 252
            events = events.events,
252 253
            plugins = get_plugins_instances(),
253 254
            rows_info = {
......
325 326
            id_first_row += 1
326 327

  
327 328
        return dict(
329
            idevent = idevent,
328 330
            plugins = get_plugins_instances(),
329 331
            rows_info = {
330 332
                'id_first_row': id_first_row,
......
402 404
        else:
403 405
            id_first_row += 1
404 406
        
405
        item_label = host
406
        if service:
407
            item_label = "(%(hostname)s, %(servicename)s)" % \
408
                {"hostname": host, "servicename": service}
409

  
410 407
        return dict(
408
            hostname = host,
409
            servicename = service,
411 410
            events = aggregates.events,
412 411
            item_label = item_label,
413 412
            plugins = get_plugins_instances(),
vigiboard/tests/functional/plugins/__init__.py
1
"""Tests fonctionnels des plugins de Vigiboard"""
vigiboard/tests/functional/plugins/test_plugin_hls.py
1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

  
4
from datetime import datetime
5
import transaction
6
from nose.tools import assert_equal
7

  
8
from vigilo.models.configure import DBSession
9
from vigilo.models import Permission, StateName, \
10
                            HostGroup, Host, HighLevelService, \
11
                            Event, CorrEvent, ImpactedPath, ImpactedHLS
12
from vigiboard.tests import TestController
13

  
14
def populate_DB():
15
    """ Peuple la base de données. """
16

  
17
    # On ajoute un groupe d'hôtes
18
    hostmanagers = HostGroup(name = u'managersgroup')
19
    DBSession.add(hostmanagers)
20
    DBSession.flush()
21

  
22
    # On lui octroie les permissions
23
    manage_perm = Permission.by_permission_name(u'manage')
24
    hostmanagers.permissions.append(manage_perm)
25
    DBSession.flush()
26

  
27
    # On crée un hôte de test.
28
    host = Host(
29
        name = u'host', 
30
        checkhostcmd = u'halt',
31
        snmpcommunity = u'public',
32
        hosttpl = u'/dev/null',
33
        mainip = u'192.168.1.1',
34
        snmpport = 42,
35
        weight = 42,
36
        )
37
    DBSession.add(host)
38

  
39
    # On affecte cet hôte au groupe précédemment créé.
40
    hostmanagers.hosts.append(host)
41
    DBSession.flush()
42

  
43
    # On ajoute un évènement causé par cet hôte.
44
    event1 = Event(
45
        supitem = host,
46
        message = u'foo',
47
        current_state = StateName.statename_to_value(u'WARNING'),
48
        timestamp = datetime.now(),
49
    )
50
    DBSession.add(event1)
51
    DBSession.flush()
52

  
53
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
54
    aggregate = CorrEvent(
55
        idcause = event1.idevent, 
56
        timestamp_active = datetime.now(),
57
        priority = 1,
58
        status = u'None')
59
    aggregate.events.append(event1)
60
    DBSession.add(aggregate)
61
    DBSession.flush()
62
    
63
    transaction.commit()
64

  
65
    return aggregate
66

  
67
def add_paths(path_number, path_length, idsupitem):
68
    """ 
69
    Ajoute path_number chemins de services de haut niveau impactés
70
    dans la base de donnée. Leur longeur sera égale à path_length.
71
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
72
     
73
    path_number * path_length services de 
74
    haut niveau sont créés dans l'opération.
75
    """
76

  
77
    # Création de services de haut niveau dans la BDD.
78
    hls_template = {
79
        'op_dep': u'&',
80
        'message': u'Bar',
81
        'warning_threshold': 60,
82
        'critical_threshold': 80,
83
        'weight': None,
84
        'priority': 2,
85
    }
86

  
87
    # Création des chemins de services de haut niveau impactés.
88
    for j in range(path_number):
89
        
90
        # On crée le chemin en lui-même
91
        path = ImpactedPath(idsupitem = idsupitem)
92
        DBSession.add(path)
93
        DBSession.flush()
94
        
95
        # Pour chaque étage du chemin,
96
        for i in range(path_length):
97
            # on ajoute un service de haut niveau dans la BDD,
98
            hls = HighLevelService(
99
                servicename = u'HLS' + str(j + 1) + str(i + 1), 
100
                **hls_template)
101
            DBSession.add(hls)
102
            # et on ajoute un étage au chemin contenant ce service. 
103
            DBSession.add(
104
                ImpactedHLS(
105
                    path = path,
106
                    hls = hls,
107
                    distance = i + 1,
108
                    ))
109
    
110
    DBSession.flush()
111
    transaction.commit()
112

  
113

  
114
class TestHLSPlugin(TestController):
115
    """
116
    Classe de test du contrôleur listant les services 
117
    de haut niveau impactés par un évènement corrélé.
118
    """
119
    
120
    def test_no_impacted_hls(self):
121
        """
122
        Retour du plugin HLS pour 0 HLS impacté
123
        Teste la valeur de retour du plugin lorsque
124
        aucun service de haut niveau n'est impacté.
125
        """
126
        
127
        # On peuple la base de données avant le test.
128
        aggregate = populate_DB()
129
        DBSession.add(aggregate)
130
        add_paths(0, 0, aggregate.events[0].idsupitem)
131
        DBSession.add(aggregate)
132
        
133
        ### 1er cas : l'utilisateur n'est pas connecté.
134
        # On vérifie que le plugin retourne bien une erreur 401.
135
        resp = self.app.post(
136
            '/get_plugin_value', 
137
            {"idcorrevent" : str(aggregate.idcorrevent),
138
             "plugin_name" : "hls"},
139
            status = 401,)
140
        
141
        ### 2ème cas : l'utilisateur n'a pas les
142
        ### droits sur l'hôte ayant causé le correvent.
143
        # On vérifie que le plugin retourne bien une erreur 404.
144
        resp = self.app.post(
145
            '/get_plugin_value', 
146
            {"idcorrevent" : str(aggregate.idcorrevent),
147
             "plugin_name" : "hls"},
148
            status = 404,
149
            extra_environ={'REMOTE_USER': 'editor'})
150
        
151
        ### 3ème cas : l'utilisateur a cette fois les droits.
152
        resp = self.app.post(
153
            '/get_plugin_value', 
154
            {"idcorrevent" : str(aggregate.idcorrevent),
155
             "plugin_name" : "hls"},
156
            extra_environ={'REMOTE_USER': 'manager'})
157
        # On vérifie que le plugin ne retourne toujours rien.
158
        assert_equal(resp.json, {"services": []})
159
    
160
    def test_1_impacted_hls_path(self):
161
        """
162
        Retour du plugin HLS pour 1 chemin impacté
163
        Teste la valeur de retour du plugin lorsqu'un
164
        chemin de services de haut niveau est impacté.
165
        """
166
        
167
        # On peuple la base de données avant le test.
168
        aggregate = populate_DB()
169
        DBSession.add(aggregate)
170
        add_paths(1, 2, aggregate.events[0].idsupitem)
171
        DBSession.add(aggregate)
172
        
173
        ### 1er cas : l'utilisateur n'est pas connecté.
174
        # On vérifie que le plugin retourne bien une erreur 401.
175
        resp = self.app.post(
176
            '/get_plugin_value', 
177
            {"idcorrevent" : str(aggregate.idcorrevent),
178
             "plugin_name" : "hls"},
179
            status = 401,)
180
        
181
        ### 2ème cas : l'utilisateur n'a pas les droits
182
        ### sur l'hôte ayant causé le correvent, on doit
183
        ### obtenir une erreur 404 (pas d'événement trouvé
184
        ### avec les informations liées à cet utilisateur).
185
        resp = self.app.post(
186
            '/get_plugin_value', 
187
            {"idcorrevent" : str(aggregate.idcorrevent),
188
             "plugin_name" : "hls"},
189
            status = 404,
190
            extra_environ={'REMOTE_USER': 'editor'})
191
        
192
        ### 3ème cas : l'utilisateur a cette fois les droits.
193
        resp = self.app.post(
194
            '/get_plugin_value', 
195
            {"idcorrevent" : str(aggregate.idcorrevent),
196
             "plugin_name" : "hls"},
197
            extra_environ={'REMOTE_USER': 'manager'})
198
        # On vérifie que le plugin retourne bien les 2 HLS impactés.
199
        assert_equal(resp.json, {"services": ['HLS12']})
200
    
201
    def test_2_impacted_hls_path(self):
202
        """
203
        Retour du plugin HLS pour 2 chemins impactés
204
        Teste la valeur de retour du plugin lorsque deux
205
        chemins de services de haut niveau sont impactés.
206
        """
207
        
208
        # On peuple la base de données avant le test.
209
        aggregate = populate_DB()
210
        DBSession.add(aggregate)
211
        add_paths(2, 2, aggregate.events[0].idsupitem)
212
        DBSession.add(aggregate)
213
        
214
        ### 1er cas : l'utilisateur n'est pas connecté.
215
        # On vérifie que le plugin retourne bien une erreur 401.
216
        resp = self.app.post(
217
            '/get_plugin_value', 
218
            {"idcorrevent" : str(aggregate.idcorrevent),
219
             "plugin_name" : "hls"},
220
            status = 401,)
221
        
222
        ### 2ème cas : l'utilisateur n'a pas les
223
        ### droits sur l'hôte ayant causé le correvent.
224
        resp = self.app.post(
225
            '/get_plugin_value', 
226
            {"idcorrevent" : str(aggregate.idcorrevent),
227
             "plugin_name" : "hls"},
228
            status = 404,
229
            extra_environ={'REMOTE_USER': 'editor'})
230
        
231
        ### 3ème cas : l'utilisateur a cette fois les droits.
232
        resp = self.app.post(
233
            '/get_plugin_value', 
234
            {"idcorrevent" : str(aggregate.idcorrevent),
235
             "plugin_name" : "hls"},
236
            extra_environ={'REMOTE_USER': 'manager'})
237
        # On vérifie que le plugin retourne bien les 4 HLS impactés.
238
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})
239
        
vigiboard/tests/functional/test_correvents_table.py
14 14
                            Host, HostGroup, LowLevelService
15 15
from vigiboard.tests import TestController
16 16

  
17
def populate_DB():
17
def populate_DB(caused_by_service):
18 18
    """ Peuple la base de données. """
19 19
    # On ajoute les groupes et leurs dépendances
20 20
    hosteditors = HostGroup(name=u'editorsgroup')
......
91 91
    event_template = {
92 92
        'message': u'foo',
93 93
        'current_state': StateName.statename_to_value(u'WARNING'),
94
        'timestamp': datetime.now(),
94 95
    }
95 96

  
96 97
    event1 = Event(supitem=service1, **event_template)
......
114 115
        'priority': 1,
115 116
        'status': u'None',
116 117
    }
118

  
117 119
    aggregate1 = CorrEvent(
118 120
        idcause=event1.idevent, **aggregate_template)
119 121
    aggregate2 = CorrEvent(
......
133 135
    DBSession.add(aggregate2)
134 136
    DBSession.add(aggregate3)
135 137
    DBSession.add(aggregate4)
138

  
136 139
    DBSession.flush()
137 140
    transaction.commit()
138 141

  
......
141 144
    Test du tableau d'événements de Vigiboard
142 145
    """
143 146

  
144
    def test_event_table(self):
147
    def test_homepage(self):
145 148
        """
146
        Test du tableau d'évènements de la page d'accueil de Vigiboard.
149
        Tableau des événements (page d'accueil).
147 150
        """
151
        populate_DB(True)
148 152

  
149
        populate_DB()
153
        # L'utilisateur n'est pas authentifié.
154
        response = self.app.get('/', status=401)
150 155

  
151
        ### 1er cas : L'utilisateur utilisé pour
152
        # se connecter à Vigiboard est 'editor'.
156
        # L'utilisateur est authentifié avec des permissions réduites.
153 157
        environ = {'REMOTE_USER': 'editor'}
154 158
        response = self.app.get('/', extra_environ=environ)
155 159

  
......
163 167
        print "There are %d columns in the result set" % len(cols)
164 168
        assert_true(len(cols) > 1)
165 169

  
166
        ### 2nd cas : L'utilisateur utilisé pour
167
        # se connecter à Vigiboard est 'manager'.
170
        # L'utilisateur est authentifié avec des permissions plus étendues.
168 171
        environ = {'REMOTE_USER': 'manager'}
169 172
        response = self.app.get('/', extra_environ=environ)
170 173

  
......
178 181
        print "There are %d columns in the result set" % len(cols)
179 182
        assert_true(len(cols) > 1)
180 183

  
184
    def test_correvents_table_for_LLS(self):
185
        """
186
        Tableau des événements corrélés pour un service de bas niveau.
187
        """
188
        populate_DB(True)
189
        url = '/item/%s/%s' % ('managerhost', 'managerservice')
190

  
191
        # L'utilisateur n'est pas authentifié.
192
        response = self.app.get(url, status=401)
193

  
194
        # L'utilisateur dispose de permissions restreintes.
195
        # Il n'a pas accès aux événements corrélés sur le service donné.
196
        # Donc, on s'attend à être redirigé avec un message d'erreur.
197
        environ = {'REMOTE_USER': 'editor'}
198
        response = self.app.get(url, extra_environ=environ, status=302)
199
        response = response.follow(status = 200, extra_environ = environ)
200
        assert_true(len(response.lxml.xpath(
201
            '//div[@id="flash"]/div[@class="error"]')))
202

  
203
        # L'utilisateur dispose de permissions plus étendues.
204
        # Il doit avoir accès à l'historique.
205
        # Ici, il n'y a qu'un seul événement corrélé pour ce service.
206
        environ = {'REMOTE_USER': 'manager'}
207
        response = self.app.get(url, extra_environ=environ, status=200)
208

  
209
        # Il doit y avoir 1 ligne de résultats.
210
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
211
        print "There are %d rows in the result set" % len(rows)
212
        assert_equal(len(rows), 1)
213

  
214
    def test_correvents_table_for_host(self):
215
        """
216
        Tableau des événements corrélés pour un hôte.
217
        """
218
        populate_DB(False)
219
        url = '/item/%s/' % ('managerhost', )
220

  
221
        # L'utilisateur n'est pas authentifié.
222
        response = self.app.get(url, status=401)
223

  
224
        # L'utilisateur dispose de permissions restreintes.
225
        # Il n'a pas accès aux événements corrélés sur le service donné.
226
        # Donc, on s'attend à être redirigé avec un message d'erreur.
227
        environ = {'REMOTE_USER': 'editor'}
228
        response = self.app.get(url, extra_environ=environ, status=302)
229
        response = response.follow(status = 200, extra_environ = environ)
230
        assert_true(len(response.lxml.xpath(
231
            '//div[@id="flash"]/div[@class="error"]')))
232

  
233
        # L'utilisateur dispose de permissions plus étendues.
234
        # Il doit avoir accès à l'historique.
235
        # Ici, il n'y a qu'un seul événement corrélé pour ce service.
236
        environ = {'REMOTE_USER': 'manager'}
237
        response = self.app.get(url, extra_environ=environ, status=200)
238

  
239
        # Il doit y avoir 1 ligne de résultats.
240
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
241
        print "There are %d rows in the result set" % len(rows)
242
        assert_equal(len(rows), 1)
243

  
vigiboard/tests/functional/test_history_table.py
70 70
        supitem = supitem, 
71 71
        message = u'foo',
72 72
        current_state = StateName.statename_to_value(u"WARNING"),
73
        timestamp = datetime.now(),
73 74
    )
74 75
    DBSession.add(event)
75 76
    DBSession.flush()
......
100 101

  
101 102
class TestHistoryTable(TestController):
102 103
    """
103
    Teste .
104
    Teste la table qui affiche l'historique des actions
105
    sur un événement brut.
104 106
    """
105 107

  
106 108
    def setUp(self):
107
        super(TestEventTable, self).setUp()
109
        super(TestHistoryTable, self).setUp()
108 110

  
109 111
    def test_cause_host_history(self):
110 112
        """Historique de la cause d'un événement corrélé sur un hôte."""
vigiboard/tests/functional/test_raw_events_table.py
14 14
                            Host, HostGroup, LowLevelService, ServiceGroup
15 15
from vigiboard.tests import TestController
16 16

  
17
def populate_DB():
17
def populate_DB(caused_by_service):
18 18
    """ Peuple la base de données. """
19 19
    # On ajoute un groupe d'hôtes et un groupe de services.
20 20
    hostmanagers = HostGroup(name = u'managersgroup')
......
54 54
    DBSession.add(managerservice)
55 55
    servicemanagers.services.append(managerservice)
56 56
    DBSession.flush()
57
    
58
    return (managerhost, managerservice)
59 57

  
60
def add_correvent_caused_by(supitem, 
61
        correvent_status=u"None", event_status=u"WARNING"):
62
    """
63
    Ajoute dans la base de données un évènement corrélé causé 
64
    par un incident survenu sur l'item passé en paramètre.
65
    Génère un historique pour les tests.
66
    """
58
    if caused_by_service:
59
        supitem = managerservice
60
    else:
61
        supitem = managerhost
67 62

  
68 63
    # Ajout d'un événement
69 64
    event = Event(
70 65
        supitem = supitem, 
71 66
        message = u'foo',
72
        current_state = StateName.statename_to_value(event_status),
67
        current_state = StateName.statename_to_value(u"WARNING"),
68
        timestamp = datetime.now(),
73 69
    )
74 70
    DBSession.add(event)
75 71
    DBSession.flush()
......
90 86
        idcause = event.idevent, 
91 87
        timestamp_active = datetime.now(),
92 88
        priority = 1,
93
        status = correvent_status)
89
        status = u"None")
94 90
    aggregate.events.append(event)
95 91
    DBSession.add(aggregate)
96 92
    DBSession.flush()
97
    
98
    return event.idevent
99
    
93
    return aggregate.idcorrevent
100 94

  
101
class TestRawEventsTable(TestController):
102
    """
103
    Test des historiques de Vigiboard.
104
    """
95
def add_masked_event(idcorrevent):
96
    transaction.begin()
97
    hostmanagers = HostGroup.by_group_name(u'managersgroup')
98
    nb_hosts = DBSession.query(Host).count()
99

  
100
    masked_host = Host(
101
        name = u'masked host #%d' % nb_hosts,      
102
        checkhostcmd = u'halt',
103
        snmpcommunity = u'public',
104
        hosttpl = u'/dev/null',
105
        mainip = u'192.168.1.%d' % nb_hosts,
106
        snmpport = 42,
107
        weight = 42,
108
    )
109
    DBSession.add(masked_host)
110
    hostmanagers.hosts.append(masked_host)
111
    DBSession.flush()
112

  
113
    event = Event(
114
        supitem = masked_host,
115
        message = u'masked event #%d' % nb_hosts,
116
        current_state = StateName.statename_to_value(u"CRITICAL"),
117
        timestamp = datetime.now(),
118
    )
119
    DBSession.add(event)
120
    DBSession.flush()
121

  
122
    aggregate = DBSession.query(CorrEvent).filter(
123
        CorrEvent.idcorrevent == idcorrevent).one()
124
    aggregate.events.append(event)
125
    DBSession.add(aggregate)
126
    DBSession.flush()
127
    transaction.commit()
105 128

  
106
    def setUp(self):
107
        super(TestEventTable, self).setUp()
108 129

  
109
    def test_host_history(self):
110
        """Affichage de l'historique d'un évènement corrélé pour un hôte."""
130
class TestRawEventsTableAnonymousLLS(TestController):
131
    """
132
    Teste l'affichage des événements bruts masqués par un agrégat.
133
    Dans ces tests, l'utilisateur n'est pas authentifié et l'agrégat
134
    a été causé par un LLS.
135
    """
136
    test_service = True
111 137

  
138
    def test_table(self):
139
        """Événements masqués d'un agrégat sur un LLS en anonyme."""
112 140
        # On peuple la BDD avec un hôte, un service de bas niveau,
113 141
        # et un groupe d'hôtes et de services associés à ces items.
114
        (managerhost, managerservice) = populate_DB()
115
        
116
        # On ajoute un évènement corrélé causé par l'hôte
117
        idevent = add_correvent_caused_by(managerhost)
142
        idcorrevent = populate_DB(self.test_service)
118 143
        transaction.commit()
119 144

  
120 145
        # L'utilisateur n'est pas authentifié.
121 146
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
122 147
        # demandant à l'utilisateur de s'authentifier.
123 148
        response = self.app.get(
124
            '/event/%d' % idevent,
149
            '/masked_events/%d' % idcorrevent,
125 150
            status = 401)
126 151

  
127
        # L'utilisateur N'A PAS les bonnes permissions.
128
        environ = {'REMOTE_USER': 'editor'}
129
        
152
class TestRawEventsTableAnonymousHost(TestRawEventsTableAnonymousLLS):
153
    """Idem que TestRawEventsTableAnonymousLLS mais avec un hôte."""
154
    test_service = False
155

  
156
    def test_table(self):
157
        """Événements masqués d'un agrégat sur un hôte en anonyme."""
158
        super(TestRawEventsTableAnonymousHost, self).test_table()
159

  
160

  
161
class TestRawEventsTableWithoutPermsLLS(TestController):
162
    """
163
    Teste l'affichage des événements bruts masqués par un agrégat.
164
    Dans ces tests, l'utilisateur n'a pas les bonnes permissions
165
    et l'agrégat a été causé par un LLS.
166
    """
167
    test_service = True
168

  
169
    def test_table(self):
170
        """Événements masqués d'un agrégat sur un LLS sans permissions."""
171
        # On peuple la BDD avec un hôte, un service de bas niveau,
172
        # et un groupe d'hôtes et de services associés à ces items.
173
        idcorrevent = populate_DB(self.test_service)
174
        transaction.commit()
175

  
176
        environ = {'REMOTE_USER': 'manager'}
177

  
130 178
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
131 179
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
132 180
        # n'a pas accès aux informations concernant cet évènement.
133 181
        response = self.app.get(
134
            '/event/%d' % idevent,
182
            '/masked_events/%d' % idcorrevent,
135 183
            status = 302, 
136 184
            extra_environ = environ)
137

  
138
        # On suit la redirection.
139 185
        response = response.follow(status = 200, extra_environ = environ)
140 186
        assert_true(len(response.lxml.xpath(
141 187
            '//div[@id="flash"]/div[@class="error"]')))
142 188

  
143
        # L'utilisateur a les bonnes permissions.
144
        environ = {'REMOTE_USER': 'manager'}
145
        
146
        # On s'attend à ce que le statut de la requête soit 200.
189
        # L'erreur 302 peut venir de l'absence de permissions
190
        # ou bien simplement de l'absence d'événements masqués.
191
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
192
        # on ajoute un événement masqué. On doit à nouveau obtenir
193
        # une erreur 302.
194
        add_masked_event(idcorrevent)
147 195
        response = self.app.get(
148
            '/event/%d' % idevent,
149
            status = 200, 
196
            '/masked_events/%d' % idcorrevent,
197
            status = 302, 
150 198
            extra_environ = environ)
199
        response = response.follow(status = 200, extra_environ = environ)
200
        assert_true(len(response.lxml.xpath(
201
            '//div[@id="flash"]/div[@class="error"]')))
151 202

  
152
        # Il doit y avoir 2 lignes de résultats.
153
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
154
        assert_equal(len(rows), 2)
203
class TestRawEventsTableWithoutPermsHost(TestRawEventsTableWithoutPermsLLS):
204
    """Idem que TestRawEventsTableWithoutPermsLLS mais avec un hôte."""
205
    test_service = False
155 206

  
156
    def test_service_history(self):
157
        """Affichage de l'historique d'un évènement corrélé pour un SBN."""
207
    def test_table(self):
208
        """Événements masqués d'un agrégat sur un hôte sans permissions."""
209
        super(TestRawEventsTableWithoutPermsHost, self).test_table()
158 210

  
211
class TestRawEventsTableWithPermsLLS(TestController):
212
    """
213
    Teste l'affichage d'une table des événements bruts
214
    rattachés à un agrégat, hormis la cause de cet agrégat.
215
    """
216
    test_service = True
217

  
218
    def test_table(self):
219
        """Événements masqués d'un agrégat sur un LLS avec permissions."""
159 220
        # On peuple la BDD avec un hôte, un service de bas niveau,
160 221
        # et un groupe d'hôtes et de services associés à ces items.
161
        (managerhost, managerservice) = populate_DB()
162
        
163
        # On ajoute un évènement corrélé causé par le service
164
        idevent = add_correvent_caused_by(managerservice)
165
        
222
        idcorrevent = populate_DB(True)
166 223
        transaction.commit()
167 224

  
168
        # L'utilisateur n'est pas authentifié.
169
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
170
        # demandant à l'utilisateur de s'authentifier.
171
        response = self.app.get(
172
            '/event/%d' % idevent,
173
            status = 401)
225
        environ = {'REMOTE_USER': 'manager'}
174 226

  
175
        # L'utilisateur N'A PAS les bonnes permissions.
176
        environ = {'REMOTE_USER': 'editor'}
177
        
178 227
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
179 228
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
180 229
        # n'a pas accès aux informations concernant cet évènement.
181 230
        response = self.app.get(
182
            '/event/%d' % idevent,
183
            status = 302, 
231
            '/masked_events/%d' % idcorrevent,
232
            status = 302,
184 233
            extra_environ = environ)
185

  
186
        # On suit la redirection.
187 234
        response = response.follow(status = 200, extra_environ = environ)
188 235
        assert_true(len(response.lxml.xpath(
189 236
            '//div[@id="flash"]/div[@class="error"]')))
190 237

  
191
        # L'utilisateur a les bonnes permissions.
192
        environ = {'REMOTE_USER': 'manager'}
193
        
194
        # On s'attend à ce que le statut de la requête soit 200.
238
        # L'erreur 302 peut venir de l'absence de permissions
239
        # ou bien simplement de l'absence d'événements masqués.
240
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
241
        # on ajoute un événement masqué. On doit avoir accès à exactement
242
        # 1 événement masqué à présent.
243
        add_masked_event(idcorrevent)
195 244
        response = self.app.get(
196
            '/event/%d' % idevent,
197
            status = 200, 
245
            '/masked_events/%d' % idcorrevent,
246
            status = 200,
198 247
            extra_environ = environ)
199 248

  
200
        # Il doit y avoir 2 lignes de résultats.
201 249
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
202
        assert_equal(len(rows), 2)
250
        assert_equal(len(rows), 1)
251

  
252
class TestRawEventsTableWithPermsHost(TestRawEventsTableWithPermsLLS):
253
    """Idem que TestRawEventsTableWithPermsLLS mais avec un hôte."""
254
    test_service = False
255

  
256
    def test_table(self):
257
        """Événements masqués d'un agrégat sur un hôte avec permissions."""
258
        super(TestRawEventsTableWithPermsHost, self).test_table()
203 259

  
vigiboard/tests/functional/test_root.py
98 98
        supitem = supitem, 
99 99
        message = u'foo',
100 100
        current_state = StateName.statename_to_value(event_status),
101
        timestamp = datetime.now(),
101 102
    )
102 103
    DBSession.add(event)
103 104
    DBSession.flush()
vigiboard/tests/functional/vigiboard_plugin/__init__.py
1
"""Tests fonctionnels des plugins de Vigiboard"""
vigiboard/tests/functional/vigiboard_plugin/test_plugin_hls.py
1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

  
4
from datetime import datetime
5
import transaction
6
from vigiboard.tests import TestController
7
from nose.tools import assert_equal
8
from vigilo.models import Permission, StateName, \
9
                            HostGroup, Host, HighLevelService, \
10
                            Event, CorrEvent, ImpactedPath, ImpactedHLS
11
from vigilo.models.configure import DBSession
12

  
13
def populate_DB():
14
    """ Peuple la base de données. """
15

  
16
    # On ajoute un groupe d'hôtes
17
    hostmanagers = HostGroup(name = u'managersgroup')
18
    DBSession.add(hostmanagers)
19
    DBSession.flush()
20

  
21
    # On lui octroie les permissions
22
    manage_perm = Permission.by_permission_name(u'manage')
23
    hostmanagers.permissions.append(manage_perm)
24
    DBSession.flush()
25

  
26
    # On crée un hôte de test.
27
    host = Host(
28
        name = u'host', 
29
        checkhostcmd = u'halt',
30
        snmpcommunity = u'public',
31
        hosttpl = u'/dev/null',
32
        mainip = u'192.168.1.1',
33
        snmpport = 42,
34
        weight = 42,
35
        )
36
    DBSession.add(host)
37

  
38
    # On affecte cet hôte au groupe précédemment créé.
39
    hostmanagers.hosts.append(host)
40
    DBSession.flush()
41

  
42
    # On ajoute un évènement causé par cet hôte.
43
    event1 = Event(
44
        supitem = host,
45
        message = u'foo',
46
        current_state = StateName.statename_to_value(u'WARNING'))
47
    DBSession.add(event1)
48
    DBSession.flush()
49

  
50
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
51
    aggregate = CorrEvent(
52
        idcause = event1.idevent, 
53
        timestamp_active = datetime.now(),
54
        priority = 1,
55
        status = u'None')
56
    aggregate.events.append(event1)
57
    DBSession.add(aggregate)
58
    DBSession.flush()
59
    
60
    transaction.commit()
61

  
62
    return aggregate
63

  
64
def add_paths(path_number, path_length, idsupitem):
65
    """ 
66
    Ajoute path_number chemins de services de haut niveau impactés
67
    dans la base de donnée. Leur longeur sera égale à path_length.
68
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
69
     
70
    path_number * path_length services de 
71
    haut niveau sont créés dans l'opération.
72
    """
73

  
74
    # Création de services de haut niveau dans la BDD.
75
    hls_template = {
76
        'op_dep': u'&',
77
        'message': u'Bar',
78
        'warning_threshold': 60,
79
        'critical_threshold': 80,
80
        'weight': None,
81
        'priority': 2,
82
    }
83

  
84
    # Création des chemins de services de haut niveau impactés.
85
    for j in range(path_number):
86
        
87
        # On crée le chemin en lui-même
88
        path = ImpactedPath(idsupitem = idsupitem)
89
        DBSession.add(path)
90
        DBSession.flush()
91
        
92
        # Pour chaque étage du chemin,
93
        for i in range(path_length):
94
            # on ajoute un service de haut niveau dans la BDD,
95
            hls = HighLevelService(
96
                servicename = u'HLS' + str(j + 1) + str(i + 1), 
97
                **hls_template)
98
            DBSession.add(hls)
99
            # et on ajoute un étage au chemin contenant ce service. 
100
            DBSession.add(
101
                ImpactedHLS(
102
                    path = path,
103
                    hls = hls,
104
                    distance = i + 1,
105
                    ))
106
    
107
    DBSession.flush()
108
    transaction.commit()
109

  
110

  
111
class TestHLSPlugin(TestController):
112
    """
113
    Classe de test du contrôleur listant les services 
114
    de haut niveau impactés par un évènement corrélé.
115
    """
116
    
117
    def test_no_impacted_hls(self):
118
        """
119
        Retour du plugin HLS pour 0 HLS impacté
120
        Teste la valeur de retour du plugin lorsque
121
        aucun service de haut niveau n'est impacté.
122
        """
123
        
124
        # On peuple la base de données avant le test.
125
        aggregate = populate_DB()
126
        DBSession.add(aggregate)
127
        add_paths(0, 0, aggregate.events[0].idsupitem)
128
        DBSession.add(aggregate)
129
        
130
        ### 1er cas : l'utilisateur n'est pas connecté.
131
        # On vérifie que le plugin retourne bien une erreur 401.
132
        resp = self.app.post(
133
            '/get_plugin_value', 
134
            {"idcorrevent" : str(aggregate.idcorrevent),
135
             "plugin_name" : "hls"},
136
            status = 401,)
137
        
138
        ### 2ème cas : l'utilisateur n'a pas les
139
        ### droits sur l'hôte ayant causé le correvent.
140
        # On vérifie que le plugin retourne bien une erreur 404.
141
        resp = self.app.post(
142
            '/get_plugin_value', 
143
            {"idcorrevent" : str(aggregate.idcorrevent),
144
             "plugin_name" : "hls"},
145
            status = 404,
146
            extra_environ={'REMOTE_USER': 'editor'})
147
        
148
        ### 3ème cas : l'utilisateur a cette fois les droits.
149
        resp = self.app.post(
150
            '/get_plugin_value', 
151
            {"idcorrevent" : str(aggregate.idcorrevent),
152
             "plugin_name" : "hls"},
153
            extra_environ={'REMOTE_USER': 'manager'})
154
        # On vérifie que le plugin ne retourne toujours rien.
155
        assert_equal(resp.json, {"services": []})
156
    
157
    def test_1_impacted_hls_path(self):
158
        """
159
        Retour du plugin HLS pour 1 chemin impacté
160
        Teste la valeur de retour du plugin lorsqu'un
161
        chemin de services de haut niveau est impacté.
162
        """
163
        
164
        # On peuple la base de données avant le test.
165
        aggregate = populate_DB()
166
        DBSession.add(aggregate)
167
        add_paths(1, 2, aggregate.events[0].idsupitem)
168
        DBSession.add(aggregate)
169
        
170
        ### 1er cas : l'utilisateur n'est pas connecté.
171
        # On vérifie que le plugin retourne bien une erreur 401.
172
        resp = self.app.post(
173
            '/get_plugin_value', 
174
            {"idcorrevent" : str(aggregate.idcorrevent),
175
             "plugin_name" : "hls"},
176
            status = 401,)
177
        
178
        ### 2ème cas : l'utilisateur n'a pas les droits
179
        ### sur l'hôte ayant causé le correvent, on doit
180
        ### obtenir une erreur 404 (pas d'événement trouvé
181
        ### avec les informations liées à cet utilisateur).
182
        resp = self.app.post(
183
            '/get_plugin_value', 
184
            {"idcorrevent" : str(aggregate.idcorrevent),
185
             "plugin_name" : "hls"},
186
            status = 404,
187
            extra_environ={'REMOTE_USER': 'editor'})
188
        
189
        ### 3ème cas : l'utilisateur a cette fois les droits.
190
        resp = self.app.post(
191
            '/get_plugin_value', 
192
            {"idcorrevent" : str(aggregate.idcorrevent),
193
             "plugin_name" : "hls"},
194
            extra_environ={'REMOTE_USER': 'manager'})
195
        # On vérifie que le plugin retourne bien les 2 HLS impactés.
196
        assert_equal(resp.json, {"services": ['HLS12']})
197
    
198
    def test_2_impacted_hls_path(self):
199
        """
200
        Retour du plugin HLS pour 2 chemins impactés
201
        Teste la valeur de retour du plugin lorsque deux
202
        chemins de services de haut niveau sont impactés.
203
        """
204
        
205
        # On peuple la base de données avant le test.
206
        aggregate = populate_DB()
207
        DBSession.add(aggregate)
208
        add_paths(2, 2, aggregate.events[0].idsupitem)
209
        DBSession.add(aggregate)
210
        
211
        ### 1er cas : l'utilisateur n'est pas connecté.
212
        # On vérifie que le plugin retourne bien une erreur 401.
213
        resp = self.app.post(
214
            '/get_plugin_value', 
215
            {"idcorrevent" : str(aggregate.idcorrevent),
216
             "plugin_name" : "hls"},
217
            status = 401,)
218
        
219
        ### 2ème cas : l'utilisateur n'a pas les
220
        ### droits sur l'hôte ayant causé le correvent.
221
        resp = self.app.post(
222
            '/get_plugin_value', 
223
            {"idcorrevent" : str(aggregate.idcorrevent),
224
             "plugin_name" : "hls"},
225
            status = 404,
226
            extra_environ={'REMOTE_USER': 'editor'})
227
        
228
        ### 3ème cas : l'utilisateur a cette fois les droits.
229
        resp = self.app.post(
230
            '/get_plugin_value', 
231
            {"idcorrevent" : str(aggregate.idcorrevent),
232
             "plugin_name" : "hls"},
233
            extra_environ={'REMOTE_USER': 'manager'})
234
        # On vérifie que le plugin retourne bien les 4 HLS impactés.
235
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})
236
        

Also available in: Unified diff