Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_raw_events_table.py @ e7e3d45e

History | View | Annotate | Download (9.4 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Event, EventHistory, CorrEvent, \
13
                            Permission, StateName, \
14
                            Host, HostGroup, LowLevelService, ServiceGroup
15
from vigiboard.tests import TestController
16

    
17
def populate_DB(caused_by_service):
18
    """ Peuple la base de données. """
19
    # On ajoute un groupe d'hôtes et un groupe de services.
20
    hostmanagers = HostGroup(name = u'managersgroup')
21
    DBSession.add(hostmanagers)
22
    servicemanagers = ServiceGroup(name = u'managersgroup')
23
    DBSession.add(servicemanagers)
24
    DBSession.flush()
25

    
26
    # On ajoute la permission 'manage' à ces deux groupes.
27
    manage_perm = Permission.by_permission_name(u'manage')
28
    hostmanagers.permissions.append(manage_perm)
29
    servicemanagers.permissions.append(manage_perm)
30
    DBSession.flush()
31

    
32
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
33
    managerhost = Host(
34
        name = u'managerhost',      
35
        checkhostcmd = u'halt',
36
        snmpcommunity = u'public',
37
        hosttpl = u'/dev/null',
38
        mainip = u'192.168.1.1',
39
        snmpport = 42,
40
        weight = 42,
41
    )
42
    DBSession.add(managerhost)
43
    hostmanagers.hosts.append(managerhost)
44
    DBSession.flush()
45

    
46
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
47
    managerservice = LowLevelService(
48
        host = managerhost,
49
        servicename = u'managerservice',
50
        command = u'halt',
51
        op_dep = u'+',
52
        weight = 42,
53
    )
54
    DBSession.add(managerservice)
55
    servicemanagers.services.append(managerservice)
56
    DBSession.flush()
57

    
58
    if caused_by_service:
59
        supitem = managerservice
60
    else:
61
        supitem = managerhost
62

    
63
    # Ajout d'un événement
64
    event = Event(
65
        supitem = supitem, 
66
        message = u'foo',
67
        current_state = StateName.statename_to_value(u"WARNING"),
68
        timestamp = datetime.now(),
69
    )
70
    DBSession.add(event)
71
    DBSession.flush()
72

    
73
    # Ajout des historiques
74
    DBSession.add(EventHistory(
75
        type_action=u'Nagios update state',
76
        idevent=event.idevent, 
77
        timestamp=datetime.now()))
78
    DBSession.add(EventHistory(
79
        type_action=u'Acknowlegement change state',
80
        idevent=event.idevent, 
81
        timestamp=datetime.now()))
82
    DBSession.flush()
83

    
84
    # Ajout d'un événement corrélé
85
    aggregate = CorrEvent(
86
        idcause = event.idevent, 
87
        timestamp_active = datetime.now(),
88
        priority = 1,
89
        status = u"None")
90
    aggregate.events.append(event)
91
    DBSession.add(aggregate)
92
    DBSession.flush()
93
    return aggregate.idcorrevent
94

    
95
def add_masked_event(idcorrevent):
96
    transaction.begin()
97
    hostmanagers = HostGroup.by_group_name(u'managersgroup')
98
    nb_hosts = DBSession.query(Host).count()
99

    
100
    masked_host = Host(
101
        name = u'masked host #%d' % nb_hosts,      
102
        checkhostcmd = u'halt',
103
        snmpcommunity = u'public',
104
        hosttpl = u'/dev/null',
105
        mainip = u'192.168.1.%d' % nb_hosts,
106
        snmpport = 42,
107
        weight = 42,
108
    )
109
    DBSession.add(masked_host)
110
    hostmanagers.hosts.append(masked_host)
111
    DBSession.flush()
112

    
113
    event = Event(
114
        supitem = masked_host,
115
        message = u'masked event #%d' % nb_hosts,
116
        current_state = StateName.statename_to_value(u"CRITICAL"),
117
        timestamp = datetime.now(),
118
    )
119
    DBSession.add(event)
120
    DBSession.flush()
121

    
122
    aggregate = DBSession.query(CorrEvent).filter(
123
        CorrEvent.idcorrevent == idcorrevent).one()
124
    aggregate.events.append(event)
125
    DBSession.add(aggregate)
126
    DBSession.flush()
127
    transaction.commit()
128

    
129

    
130
class TestRawEventsTableAnonymousLLS(TestController):
131
    """
132
    Teste l'affichage des événements bruts masqués par un agrégat.
133
    Dans ces tests, l'utilisateur n'est pas authentifié et l'agrégat
134
    a été causé par un LLS.
135
    """
136
    test_service = True
137

    
138
    def test_table(self):
139
        """Événements masqués d'un agrégat sur un LLS en anonyme."""
140
        # On peuple la BDD avec un hôte, un service de bas niveau,
141
        # et un groupe d'hôtes et de services associés à ces items.
142
        idcorrevent = populate_DB(self.test_service)
143
        transaction.commit()
144

    
145
        # L'utilisateur n'est pas authentifié.
146
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
147
        # demandant à l'utilisateur de s'authentifier.
148
        response = self.app.get(
149
            '/masked_events/%d' % idcorrevent,
150
            status = 401)
151

    
152
class TestRawEventsTableAnonymousHost(TestRawEventsTableAnonymousLLS):
153
    """Idem que TestRawEventsTableAnonymousLLS mais avec un hôte."""
154
    test_service = False
155

    
156
    def test_table(self):
157
        """Événements masqués d'un agrégat sur un hôte en anonyme."""
158
        super(TestRawEventsTableAnonymousHost, self).test_table()
159

    
160

    
161
class TestRawEventsTableWithoutPermsLLS(TestController):
162
    """
163
    Teste l'affichage des événements bruts masqués par un agrégat.
164
    Dans ces tests, l'utilisateur n'a pas les bonnes permissions
165
    et l'agrégat a été causé par un LLS.
166
    """
167
    test_service = True
168

    
169
    def test_table(self):
170
        """Événements masqués d'un agrégat sur un LLS sans permissions."""
171
        # On peuple la BDD avec un hôte, un service de bas niveau,
172
        # et un groupe d'hôtes et de services associés à ces items.
173
        idcorrevent = populate_DB(self.test_service)
174
        transaction.commit()
175

    
176
        environ = {'REMOTE_USER': 'editor'}
177

    
178
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
179
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
180
        # n'a pas accès aux informations concernant cet évènement.
181
        response = self.app.get(
182
            '/masked_events/%d' % idcorrevent,
183
            status = 302, 
184
            extra_environ = environ)
185
        response = response.follow(status = 200, extra_environ = environ)
186
        assert_true(len(response.lxml.xpath(
187
            '//div[@id="flash"]/div[@class="error"]')))
188

    
189
        # L'erreur 302 peut venir de l'absence de permissions
190
        # ou bien simplement de l'absence d'événements masqués.
191
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
192
        # on ajoute un événement masqué. On doit à nouveau obtenir
193
        # une erreur 302.
194
        add_masked_event(idcorrevent)
195
        response = self.app.get(
196
            '/masked_events/%d' % idcorrevent,
197
            status = 302, 
198
            extra_environ = environ)
199
        response = response.follow(status = 200, extra_environ = environ)
200
        assert_true(len(response.lxml.xpath(
201
            '//div[@id="flash"]/div[@class="error"]')))
202

    
203
class TestRawEventsTableWithoutPermsHost(TestRawEventsTableWithoutPermsLLS):
204
    """Idem que TestRawEventsTableWithoutPermsLLS mais avec un hôte."""
205
    test_service = False
206

    
207
    def test_table(self):
208
        """Événements masqués d'un agrégat sur un hôte sans permissions."""
209
        super(TestRawEventsTableWithoutPermsHost, self).test_table()
210

    
211
class TestRawEventsTableWithPermsLLS(TestController):
212
    """
213
    Teste l'affichage d'une table des événements bruts
214
    rattachés à un agrégat, hormis la cause de cet agrégat.
215
    """
216
    test_service = True
217

    
218
    def test_table(self):
219
        """Événements masqués d'un agrégat sur un LLS avec permissions."""
220
        # On peuple la BDD avec un hôte, un service de bas niveau,
221
        # et un groupe d'hôtes et de services associés à ces items.
222
        idcorrevent = populate_DB(True)
223
        transaction.commit()
224

    
225
        environ = {'REMOTE_USER': 'manager'}
226

    
227
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
228
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
229
        # n'a pas accès aux informations concernant cet évènement.
230
        response = self.app.get(
231
            '/masked_events/%d' % idcorrevent,
232
            status = 302,
233
            extra_environ = environ)
234
        response = response.follow(status = 200, extra_environ = environ)
235
        assert_true(len(response.lxml.xpath(
236
            '//div[@id="flash"]/div[@class="error"]')))
237

    
238
        # L'erreur 302 peut venir de l'absence de permissions
239
        # ou bien simplement de l'absence d'événements masqués.
240
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
241
        # on ajoute un événement masqué. On doit avoir accès à exactement
242
        # 1 événement masqué à présent.
243
        add_masked_event(idcorrevent)
244
        response = self.app.get(
245
            '/masked_events/%d' % idcorrevent,
246
            status = 200,
247
            extra_environ = environ)
248

    
249
        # On s'attend à trouver exactement 1 événement masqué.
250
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
251
        # de rechercher directement une valeur dans une liste. Elle devrait
252
        # néanmoins suffire pour les besoins des tests.
253
        rows = response.lxml.xpath('//table[contains(@class, "vigitable")]/tbody/tr')
254
        assert_equal(len(rows), 1)
255

    
256
class TestRawEventsTableWithPermsHost(TestRawEventsTableWithPermsLLS):
257
    """Idem que TestRawEventsTableWithPermsLLS mais avec un hôte."""
258
    test_service = False
259

    
260
    def test_table(self):
261
        """Événements masqués d'un agrégat sur un hôte avec permissions."""
262
        super(TestRawEventsTableWithPermsHost, self).test_table()
263