Project

General

Profile

Revision 072f2a16

ID072f2a16b076178218f5a968bd38c6aaca25b418
Parent eab949e2
Child 00d2e1d1

Added by Francois POIROTTE over 14 years ago

Ménage dans les tests de VigiBoard + légères corrections pour pylint.

git-svn-id: https://vigilo-dev.si.c-s.fr/svn@2171 b22e2e97-25c9-44ff-b637-2e5ceca36478

View differences:

vigiboard/controllers/root.py
217 217
            events.items.c.hostname,
218 218
            events.items.c.servicename,
219 219
        )
220
        events.add_join((EVENTSAGGREGATE_TABLE, EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent))
221
        events.add_join((CorrEvent, CorrEvent.idcorrevent == EVENTSAGGREGATE_TABLE.c.idcorrevent))
220
        events.add_join((EVENTSAGGREGATE_TABLE, \
221
            EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent))
222
        events.add_join((CorrEvent, CorrEvent.idcorrevent == \
223
            EVENTSAGGREGATE_TABLE.c.idcorrevent))
222 224
        events.add_join((events.items, 
223 225
            Event.idsupitem == events.items.c.idsupitem))
224 226
        events.add_filter(Event.idevent != CorrEvent.idcause)
......
292 294
            events.items.c.hostname,
293 295
            events.items.c.servicename,
294 296
        )
295
        events.add_join((EVENTSAGGREGATE_TABLE, EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent))
296
        events.add_join((CorrEvent, CorrEvent.idcorrevent == EVENTSAGGREGATE_TABLE.c.idcorrevent))
297
        events.add_join((EVENTSAGGREGATE_TABLE, \
298
            EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent))
299
        events.add_join((CorrEvent, CorrEvent.idcorrevent == \
300
            EVENTSAGGREGATE_TABLE.c.idcorrevent))
297 301
        events.add_join((events.items, 
298 302
            Event.idsupitem == events.items.c.idsupitem))
299 303
        events.add_filter(Event.idevent == idevent)
vigiboard/tests/functional/test_correvents_table.py
1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

  
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

  
11
from vigilo.models.configure import DBSession
12
from vigilo.models import Event, CorrEvent, \
13
                            Permission, StateName, \
14
                            Host, HostGroup, LowLevelService
15
from vigiboard.tests import TestController
16

  
17
def populate_DB():
18
    """ Peuple la base de données. """
19
    # On ajoute les groupes et leurs dépendances
20
    hosteditors = HostGroup(name=u'editorsgroup')
21
    DBSession.add(hosteditors)
22
    hostmanagers = HostGroup(name=u'managersgroup', parent=hosteditors)
23
    DBSession.add(hostmanagers)
24
    DBSession.flush()
25

  
26
    manage_perm = Permission.by_permission_name(u'manage')
27
    edit_perm = Permission.by_permission_name(u'edit')
28

  
29
    hostmanagers.permissions.append(manage_perm)
30
    hosteditors.permissions.append(edit_perm)
31
    DBSession.flush()
32

  
33
    # Création des hôtes de test.
34
    host_template = {
35
        'checkhostcmd': u'halt',
36
        'snmpcommunity': u'public',
37
        'hosttpl': u'/dev/null',
38
        'mainip': u'192.168.1.1',
39
        'snmpport': 42,
40
        'weight': 42,
41
    }
42

  
43
    managerhost = Host(name=u'managerhost', **host_template)
44
    editorhost = Host(name=u'editorhost', **host_template)
45
    DBSession.add(managerhost)
46
    DBSession.add(editorhost)
47

  
48
    # Affectation des hôtes aux groupes.
49
    hosteditors.hosts.append(editorhost)
50
    hostmanagers.hosts.append(managerhost)
51
    DBSession.flush()
52

  
53
    # Création des services techniques de test.
54
    service_template = {
55
        'command': u'halt',
56
        'op_dep': u'+',
57
        'weight': 42,
58
    }
59

  
60
    service1 = LowLevelService(
61
        host=managerhost,
62
        servicename=u'managerservice',
63
        **service_template
64
    )
65

  
66
    service2 = LowLevelService(
67
        host=editorhost,
68
        servicename=u'managerservice',
69
        **service_template
70
    )
71

  
72
    service3 = LowLevelService(
73
        host=managerhost,
74
        servicename=u'editorservice',
75
        **service_template
76
    )
77

  
78
    service4 = LowLevelService(
79
        host=editorhost,
80
        servicename=u'editorservice',
81
        **service_template
82
    )
83

  
84
    DBSession.add(service1)
85
    DBSession.add(service2)
86
    DBSession.add(service3)
87
    DBSession.add(service4)
88
    DBSession.flush()
89

  
90
    # Ajout des événements eux-mêmes
91
    event_template = {
92
        'message': u'foo',
93
        'current_state': StateName.statename_to_value(u'WARNING'),
94
    }
95

  
96
    event1 = Event(supitem=service1, **event_template)
97
    event2 = Event(supitem=service2, **event_template)
98
    event3 = Event(supitem=service3, **event_template)
99
    event4 = Event(supitem=service4, **event_template)
100
    event5 = Event(supitem=editorhost, **event_template)
101
    event6 = Event(supitem=managerhost, **event_template)
102

  
103
    DBSession.add(event1)
104
    DBSession.add(event2)
105
    DBSession.add(event3)
106
    DBSession.add(event4)
107
    DBSession.add(event5)
108
    DBSession.add(event6)
109
    DBSession.flush()
110

  
111
    # Ajout des événements corrélés
112
    aggregate_template = {
113
        'timestamp_active': datetime.now(),
114
        'priority': 1,
115
        'status': u'None',
116
    }
117
    aggregate1 = CorrEvent(
118
        idcause=event1.idevent, **aggregate_template)
119
    aggregate2 = CorrEvent(
120
        idcause=event4.idevent, **aggregate_template)
121
    aggregate3 = CorrEvent(
122
        idcause=event5.idevent, **aggregate_template)
123
    aggregate4 = CorrEvent(
124
        idcause=event6.idevent, **aggregate_template)
125

  
126
    aggregate1.events.append(event1)
127
    aggregate1.events.append(event3)
128
    aggregate2.events.append(event4)
129
    aggregate2.events.append(event2)
130
    aggregate3.events.append(event5)
131
    aggregate4.events.append(event6)
132
    DBSession.add(aggregate1)
133
    DBSession.add(aggregate2)
134
    DBSession.add(aggregate3)
135
    DBSession.add(aggregate4)
136
    DBSession.flush()
137
    transaction.commit()
138

  
139
class TestEventTable(TestController):
140
    """
141
    Test du tableau d'événements de Vigiboard
142
    """
143

  
144
    def test_event_table(self):
145
        """
146
        Test du tableau d'évènements de la page d'accueil de Vigiboard.
147
        """
148

  
149
        populate_DB()
150

  
151
        ### 1er cas : L'utilisateur utilisé pour
152
        # se connecter à Vigiboard est 'editor'.
153
        environ = {'REMOTE_USER': 'editor'}
154
        response = self.app.get('/', extra_environ=environ)
155

  
156
        # Il doit y avoir 2 lignes de résultats.
157
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
158
        print "There are %d rows in the result set" % len(rows)
159
        assert_equal(len(rows), 2)
160

  
161
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
162
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
163
        print "There are %d columns in the result set" % len(cols)
164
        assert_true(len(cols) > 1)
165

  
166
        ### 2nd cas : L'utilisateur utilisé pour
167
        # se connecter à Vigiboard est 'manager'.
168
        environ = {'REMOTE_USER': 'manager'}
169
        response = self.app.get('/', extra_environ=environ)
170

  
171
        # Il doit y avoir 4 lignes de résultats.
172
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
173
        print "There are %d rows in the result set" % len(rows)
174
        assert_equal(len(rows), 4)
175

  
176
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
177
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
178
        print "There are %d columns in the result set" % len(cols)
179
        assert_true(len(cols) > 1)
180

  
vigiboard/tests/functional/test_details_plugin.py
1
# -*- coding: utf-8 -*-
2
"""
3
Teste le formulaire donnant les liens vers les outils extérieurs
4
et les données de l'historique.
5
"""
6
from nose.tools import assert_true, assert_equal
7
from datetime import datetime
8
import transaction
9

  
10
from vigiboard.tests import TestController
11
from vigilo.models.configure import DBSession
12
from vigilo.models import ServiceGroup, HostGroup, \
13
                            Host, Permission, StateName, \
14
                            LowLevelService, Event, CorrEvent
15

  
16
def insert_deps(return_service):
17
    """
18
    Insère les dépendances nécessaires aux tests.
19

  
20
    @param return_service: Indique si les événements générés
21
        concernent un hôte (False) ou un service de bas niveau (True).
22
    @type return_service: C{bool}
23
    @return: Renvoie un tuple avec le groupe d'hôte créé,
24
        l'identifiant du L{CorrEvent} généré et enfin,
25
        l'identifiant de l'L{Event} généré.
26
    @rtype: C{tuple}
27
    """
28
    timestamp = datetime.now()
29

  
30
    hostgroup = HostGroup(
31
        name=u'foo',
32
    )
33
    DBSession.add(hostgroup)
34

  
35
    host = Host(
36
        name=u'bar',
37
        checkhostcmd=u'',
38
        description=u'',
39
        hosttpl=u'',
40
        mainip=u'127.0.0.1',
41
        snmpport=42,
42
        snmpcommunity=u'public',
43
        snmpversion=u'3',
44
        weight=42,
45
    )
46
    DBSession.add(host)
47
    DBSession.flush()
48

  
49
    hostgroup.hosts.append(host)
50
    DBSession.flush()
51

  
52
    servicegroup = ServiceGroup(
53
        name=u'foo',
54
    )
55
    DBSession.add(servicegroup)
56

  
57
    service = LowLevelService(
58
        host=host,
59
        command=u'',
60
        weight=42,
61
        servicename=u'baz',
62
        op_dep=u'&',
63
    )
64
    DBSession.add(service)
65
    DBSession.flush()
66

  
67
    servicegroup.services.append(service)
68
    DBSession.flush()
69

  
70
    event = Event(
71
        timestamp=timestamp,
72
        current_state=StateName.statename_to_value(u'WARNING'),
73
        message=u'Hello world',
74
    )
75
    if return_service:
76
        event.supitem = service
77
    else:
78
        event.supitem = host
79
    DBSession.add(event)
80
    DBSession.flush()
81

  
82
    correvent = CorrEvent(
83
        impact=42,
84
        priority=42,
85
        trouble_ticket=None,
86
        status=u'None',
87
        occurrence=42,
88
        timestamp_active=timestamp,
89
        cause=event,
90
    )
91
    correvent.events.append(event)
92
    DBSession.add(correvent)
93
    DBSession.flush()
94

  
95
    return (hostgroup, correvent.idcorrevent, event.idevent)
96

  
97
class TestDetailsPlugin(TestController):
98
    """Teste le dialogue pour l'accès aux historiques."""
99

  
100
    def test_details_plugin_LLS_alert_when_allowed(self):
101
        """Dialogue des détails avec un LLS et les bons droits."""
102
        hostgroup, idcorrevent, idcause = insert_deps(True)
103
        manage = Permission.by_permission_name(u'manage')
104
        manage.hostgroups.append(hostgroup)
105
        DBSession.flush()
106
        transaction.commit()
107

  
108
        response = self.app.post('/get_plugin_value', {
109
                'idcorrevent': idcorrevent,
110
                'plugin_name': 'details',
111
            }, extra_environ={'REMOTE_USER': 'manager'})
112
        json = response.json
113

  
114
        # Le contenu de "eventdetails" varie facilement.
115
        # On le teste séparément.
116
        json.pop('eventdetails', None)
117
        assert_true('eventdetails' in response.json)
118

  
119
        assert_equal(json, {
120
            "idcorrevent": idcorrevent,
121
            "idcause": idcause,
122
            "service": "baz",
123
            "peak_state": "WARNING",
124
            "current_state": "WARNING",
125
            "host": "bar",
126
            "initial_state": "WARNING"
127
        })
128

  
129
    def test_details_plugin_host_alert_when_allowed(self):
130
        """Dialogue des détails avec un hôte et les bons droits."""
131
        hostgroup, idcorrevent, idcause = insert_deps(False)
132
        manage = Permission.by_permission_name(u'manage')
133
        manage.hostgroups.append(hostgroup)
134
        DBSession.flush()
135
        transaction.commit()
136

  
137
        response = self.app.post('/get_plugin_value', {
138
                'idcorrevent': idcorrevent,
139
                'plugin_name': 'details',
140
            }, extra_environ={'REMOTE_USER': 'manager'})
141
        json = response.json
142

  
143
        # Le contenu de "eventdetails" varie facilement.
144
        # On le teste séparément.
145
        json.pop('eventdetails', None)
146
        assert_true('eventdetails' in response.json)
147

  
148
        assert_equal(json, {
149
            "idcorrevent": idcorrevent,
150
            "idcause": idcause,
151
            "service": None,
152
            "peak_state": "WARNING",
153
            "current_state": "WARNING",
154
            "host": "bar",
155
            "initial_state": "WARNING"
156
        })
157

  
158

  
159
    def test_details_plugin_LLS_when_forbidden(self):
160
        """Dialogue des détails avec un LLS et des droits insuffisants."""
161
        idcorrevent = insert_deps(True)[1]
162
        DBSession.flush()
163
        transaction.commit()
164

  
165
        # Le contrôleur renvoie une erreur 404 (HTTPNotFound)
166
        # lorsque l'utilisateur n'a pas les permissions nécessaires sur
167
        # les données ou qu'aucun événement correspondant n'est trouvé.
168
        self.app.post('/get_plugin_value', {
169
                'idcorrevent': idcorrevent,
170
                'plugin_name': 'details',
171
            }, extra_environ={'REMOTE_USER': 'manager'},
172
            status=404)
173

  
174
    def test_details_plugin_host_when_forbidden(self):
175
        """Dialogue des détails avec un hôte et des droits insuffisants."""
176
        idcorrevent = insert_deps(False)[1]
177
        DBSession.flush()
178
        transaction.commit()
179

  
180
        # Le contrôleur renvoie une erreur 404 (HTTPNotFound)
181
        # lorsque l'utilisateur n'a pas les permissions nécessaires sur
182
        # les données ou qu'aucun événement correspondant n'est trouvé.
183
        self.app.post('/get_plugin_value', {
184
                'idcorrevent': idcorrevent,
185
                'plugin_name': 'details',
186
            }, extra_environ={'REMOTE_USER': 'manager'},
187
            status=404)
188

  
189
    def test_details_plugin_LLS_anonymous(self):
190
        """Dialogue des détails avec un LLS et en anonyme."""
191
        idcorrevent = insert_deps(True)[1]
192
        DBSession.flush()
193
        transaction.commit()
194

  
195
        # Le contrôleur renvoie une erreur 401 (HTTPUnauthorized)
196
        # lorsque l'utilisateur n'est pas authentifié.
197
        self.app.post('/get_plugin_value', {
198
                'idcorrevent': idcorrevent,
199
                'plugin_name': 'details',
200
            }, status=401)
201

  
202
    def test_details_plugin_host_anonymous(self):
203
        """Dialogue des détails avec un hôte et en anonyme."""
204
        idcorrevent = insert_deps(False)[1]
205
        DBSession.flush()
206
        transaction.commit()
207

  
208
        # Le contrôleur renvoie une erreur 401 (HTTPUnauthorized)
209
        # lorsque l'utilisateur n'est pas authentifié.
210
        self.app.post('/get_plugin_value', {
211
                'idcorrevent': idcorrevent,
212
                'plugin_name': 'details',
213
            }, status=401)
214

  
vigiboard/tests/functional/test_event_table.py
1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

  
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

  
11
from vigilo.models.configure import DBSession
12
from vigilo.models import Event, CorrEvent, \
13
                            Permission, StateName, \
14
                            Host, HostGroup, LowLevelService
15
from vigiboard.tests import TestController
16

  
17
def populate_DB():
18
    """ Peuple la base de données. """
19
    # On ajoute les groupes et leurs dépendances
20
    hosteditors = HostGroup(name=u'editorsgroup')
21
    DBSession.add(hosteditors)
22
    hostmanagers = HostGroup(name=u'managersgroup', parent=hosteditors)
23
    DBSession.add(hostmanagers)
24
    DBSession.flush()
25

  
26
    manage_perm = Permission.by_permission_name(u'manage')
27
    edit_perm = Permission.by_permission_name(u'edit')
28

  
29
    hostmanagers.permissions.append(manage_perm)
30
    hosteditors.permissions.append(edit_perm)
31
    DBSession.flush()
32

  
33
    # Création des hôtes de test.
34
    host_template = {
35
        'checkhostcmd': u'halt',
36
        'snmpcommunity': u'public',
37
        'hosttpl': u'/dev/null',
38
        'mainip': u'192.168.1.1',
39
        'snmpport': 42,
40
        'weight': 42,
41
    }
42

  
43
    managerhost = Host(name=u'managerhost', **host_template)
44
    editorhost = Host(name=u'editorhost', **host_template)
45
    DBSession.add(managerhost)
46
    DBSession.add(editorhost)
47

  
48
    # Affectation des hôtes aux groupes.
49
    hosteditors.hosts.append(editorhost)
50
    hostmanagers.hosts.append(managerhost)
51
    DBSession.flush()
52

  
53
    # Création des services techniques de test.
54
    service_template = {
55
        'command': u'halt',
56
        'op_dep': u'+',
57
        'weight': 42,
58
    }
59

  
60
    service1 = LowLevelService(
61
        host=managerhost,
62
        servicename=u'managerservice',
63
        **service_template
64
    )
65

  
66
    service2 = LowLevelService(
67
        host=editorhost,
68
        servicename=u'managerservice',
69
        **service_template
70
    )
71

  
72
    service3 = LowLevelService(
73
        host=managerhost,
74
        servicename=u'editorservice',
75
        **service_template
76
    )
77

  
78
    service4 = LowLevelService(
79
        host=editorhost,
80
        servicename=u'editorservice',
81
        **service_template
82
    )
83

  
84
    DBSession.add(service1)
85
    DBSession.add(service2)
86
    DBSession.add(service3)
87
    DBSession.add(service4)
88
    DBSession.flush()
89

  
90
    # Ajout des événements eux-mêmes
91
    event_template = {
92
        'message': u'foo',
93
        'current_state': StateName.statename_to_value(u'WARNING'),
94
    }
95

  
96
    event1 = Event(supitem=service1, **event_template)
97
    event2 = Event(supitem=service2, **event_template)
98
    event3 = Event(supitem=service3, **event_template)
99
    event4 = Event(supitem=service4, **event_template)
100
    event5 = Event(supitem=editorhost, **event_template)
101
    event6 = Event(supitem=managerhost, **event_template)
102

  
103
    DBSession.add(event1)
104
    DBSession.add(event2)
105
    DBSession.add(event3)
106
    DBSession.add(event4)
107
    DBSession.add(event5)
108
    DBSession.add(event6)
109
    DBSession.flush()
110

  
111
    # Ajout des événements corrélés
112
    aggregate_template = {
113
        'timestamp_active': datetime.now(),
114
        'priority': 1,
115
        'status': u'None',
116
    }
117
    aggregate1 = CorrEvent(
118
        idcause=event1.idevent, **aggregate_template)
119
    aggregate2 = CorrEvent(
120
        idcause=event4.idevent, **aggregate_template)
121
    aggregate3 = CorrEvent(
122
        idcause=event5.idevent, **aggregate_template)
123
    aggregate4 = CorrEvent(
124
        idcause=event6.idevent, **aggregate_template)
125

  
126
    aggregate1.events.append(event1)
127
    aggregate1.events.append(event3)
128
    aggregate2.events.append(event4)
129
    aggregate2.events.append(event2)
130
    aggregate3.events.append(event5)
131
    aggregate4.events.append(event6)
132
    DBSession.add(aggregate1)
133
    DBSession.add(aggregate2)
134
    DBSession.add(aggregate3)
135
    DBSession.add(aggregate4)
136
    DBSession.flush()
137
    transaction.commit()
138

  
139
class TestEventTable(TestController):
140
    """
141
    Test du tableau d'événements de Vigiboard
142
    """
143

  
144
    def test_event_table(self):
145
        """
146
        Test du tableau d'évènements de la page d'accueil de Vigiboard.
147
        """
148

  
149
        populate_DB()
150

  
151
        ### 1er cas : L'utilisateur utilisé pour
152
        # se connecter à Vigiboard est 'editor'.
153
        environ = {'REMOTE_USER': 'editor'}
154
        response = self.app.get('/', extra_environ=environ)
155

  
156
        # Il doit y avoir 2 lignes de résultats.
157
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
158
        print "There are %d rows in the result set" % len(rows)
159
        assert_equal(len(rows), 2)
160

  
161
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
162
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
163
        print "There are %d columns in the result set" % len(cols)
164
        assert_true(len(cols) > 1)
165

  
166
        ### 2nd cas : L'utilisateur utilisé pour
167
        # se connecter à Vigiboard est 'manager'.
168
        environ = {'REMOTE_USER': 'manager'}
169
        response = self.app.get('/', extra_environ=environ)
170

  
171
        # Il doit y avoir 4 lignes de résultats.
172
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
173
        print "There are %d rows in the result set" % len(rows)
174
        assert_equal(len(rows), 4)
175

  
176
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
177
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
178
        print "There are %d columns in the result set" % len(cols)
179
        assert_true(len(cols) > 1)
180

  
vigiboard/tests/functional/test_history_form.py
1
# -*- coding: utf-8 -*-
2
"""
3
Teste le formulaire donnant les liens vers les outils extérieurs
4
et les données de l'historique.
5
"""
6
from nose.tools import assert_true, assert_equal
7
from datetime import datetime
8
import transaction
9

  
10
from vigiboard.tests import TestController
11
from vigilo.models.configure import DBSession
12
from vigilo.models import ServiceGroup, HostGroup, \
13
                            Host, Permission, StateName, \
14
                            LowLevelService, Event, CorrEvent
15

  
16
def insert_deps(return_service):
17
    """Insère les dépendances nécessaires aux tests."""
18
    timestamp = datetime.now()
19

  
20
    hostgroup = HostGroup(
21
        name=u'foo',
22
    )
23
    DBSession.add(hostgroup)
24

  
25
    host = Host(
26
        name=u'bar',
27
        checkhostcmd=u'',
28
        description=u'',
29
        hosttpl=u'',
30
        mainip=u'127.0.0.1',
31
        snmpport=42,
32
        snmpcommunity=u'public',
33
        snmpversion=u'3',
34
        weight=42,
35
    )
36
    DBSession.add(host)
37
    DBSession.flush()
38

  
39
    hostgroup.hosts.append(host)
40
    DBSession.flush()
41

  
42
    servicegroup = ServiceGroup(
43
        name=u'foo',
44
    )
45
    DBSession.add(servicegroup)
46

  
47
    service = LowLevelService(
48
        host=host,
49
        command=u'',
50
        weight=42,
51
        servicename=u'baz',
52
        op_dep=u'&',
53
    )
54
    DBSession.add(service)
55
    DBSession.flush()
56

  
57
    servicegroup.services.append(service)
58
    DBSession.flush()
59

  
60
    event = Event(
61
        timestamp=timestamp,
62
        current_state=StateName.statename_to_value(u'WARNING'),
63
        message=u'Hello world',
64
    )
65
    if return_service:
66
        event.supitem = service
67
    else:
68
        event.supitem = host
69
    DBSession.add(event)
70
    DBSession.flush()
71

  
72
    correvent = CorrEvent(
73
        impact=42,
74
        priority=42,
75
        trouble_ticket=None,
76
        status=u'None',
77
        occurrence=42,
78
        timestamp_active=timestamp,
79
        cause=event,
80
    )
81
    correvent.events.append(event)
82
    DBSession.add(correvent)
83
    DBSession.flush()
84

  
85
    return (hostgroup, correvent.idcorrevent)
86

  
87
class TestHistoryForm(TestController):
88
    """Teste le dialogue pour l'accès aux historiques."""
89

  
90
    def test_history_form_LLS_alert_when_allowed(self):
91
        """Teste le formulaire d'historique avec un LLS (alerte visible)."""
92
        hostgroup, idcorrevent = insert_deps(True)
93
        manage = Permission.by_permission_name(u'manage')
94
        manage.hostgroups.append(hostgroup)
95
        DBSession.flush()
96
        transaction.commit()
97

  
98
        response = self.app.post('/get_plugin_value', {
99
                'idcorrevent': idcorrevent,
100
                'plugin_name': 'history',
101
            }, extra_environ={'REMOTE_USER': 'manager'})
102
        json = response.json
103

  
104
        # Le contenu de "eventdetails" varie facilement.
105
        # On le teste séparément.
106
        json.pop('eventdetails', None)
107
        assert_true('eventdetails' in response.json)
108

  
109
        assert_equal(json, {
110
            "idcorrevent": idcorrevent,
111
            "service": "baz",
112
            "peak_state": "WARNING",
113
            "current_state": "WARNING",
114
            "host": "bar",
115
            "initial_state": "WARNING"
116
        })
117

  
118
    def test_history_form_host_alert_when_allowed(self):
119
        """Teste le formulaire d'historique avec un hôte (alerte visible)."""
120
        hostgroup, idcorrevent = insert_deps(False)
121
        manage = Permission.by_permission_name(u'manage')
122
        manage.hostgroups.append(hostgroup)
123
        DBSession.flush()
124
        transaction.commit()
125

  
126
        response = self.app.post('/get_plugin_value', {
127
                'idcorrevent': idcorrevent,
128
                'plugin_name': 'history',
129
            }, extra_environ={'REMOTE_USER': 'manager'})
130
        json = response.json
131

  
132
        # Le contenu de "eventdetails" varie facilement.
133
        # On le teste séparément.
134
        json.pop('eventdetails', None)
135
        assert_true('eventdetails' in response.json)
136

  
137
        assert_equal(json, {
138
            "idcorrevent": idcorrevent,
139
            "service": None,
140
            "peak_state": "WARNING",
141
            "current_state": "WARNING",
142
            "host": "bar",
143
            "initial_state": "WARNING"
144
        })
145

  
146

  
147
    def test_history_form_LLS_when_forbidden(self):
148
        """Teste le formulaire d'historique avec un LLS (alerte invisible)."""
149
        idcorrevent = insert_deps(True)[1]
150
        DBSession.flush()
151
        transaction.commit()
152

  
153
        self.app.post('/get_plugin_value', {
154
                'idcorrevent': idcorrevent,
155
                'plugin_name': 'history',
156
            }, extra_environ={'REMOTE_USER': 'manager'},
157
            status=302)
158

  
159
    def test_history_form_host_when_forbidden(self):
160
        """Teste le formulaire d'historique avec un hôte (alerte invisible)."""
161
        idcorrevent = insert_deps(False)[1]
162
        DBSession.flush()
163
        transaction.commit()
164

  
165
        self.app.post('/get_plugin_value', {
166
                'idcorrevent': idcorrevent,
167
                'plugin_name': 'history',
168
            }, extra_environ={'REMOTE_USER': 'manager'},
169
            status=302)
170

  
vigiboard/tests/functional/test_history_table.py
1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Vérifie que la page qui affiche l'historique des actions sur un événement
5
brut fonctionne correctement.
6
"""
7

  
8
from nose.tools import assert_true, assert_equal
9
from datetime import datetime
10
import transaction
11

  
12
from vigilo.models.configure import DBSession
13
from vigilo.models import Event, EventHistory, CorrEvent, \
14
                            Permission, StateName, \
15
                            Host, HostGroup, LowLevelService, ServiceGroup
16
from vigiboard.tests import TestController
17

  
18
def populate_DB():
19
    """ Peuple la base de données. """
20
    # On ajoute un groupe d'hôtes et un groupe de services.
21
    hostmanagers = HostGroup(name = u'managersgroup')
22
    DBSession.add(hostmanagers)
23
    servicemanagers = ServiceGroup(name = u'managersgroup')
24
    DBSession.add(servicemanagers)
25
    DBSession.flush()
26

  
27
    # On ajoute la permission 'manage' à ces deux groupes.
28
    manage_perm = Permission.by_permission_name(u'manage')
29
    hostmanagers.permissions.append(manage_perm)
30
    servicemanagers.permissions.append(manage_perm)
31
    DBSession.flush()
32

  
33
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
34
    managerhost = Host(
35
        name = u'managerhost',      
36
        checkhostcmd = u'halt',
37
        snmpcommunity = u'public',
38
        hosttpl = u'/dev/null',
39
        mainip = u'192.168.1.1',
40
        snmpport = 42,
41
        weight = 42,
42
    )
43
    DBSession.add(managerhost)
44
    hostmanagers.hosts.append(managerhost)
45
    DBSession.flush()
46

  
47
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
48
    managerservice = LowLevelService(
49
        host = managerhost,
50
        servicename = u'managerservice',
51
        command = u'halt',
52
        op_dep = u'+',
53
        weight = 42,
54
    )
55
    DBSession.add(managerservice)
56
    servicemanagers.services.append(managerservice)
57
    DBSession.flush()
58
    
59
    return (managerhost, managerservice)
60

  
61
def add_correvent_caused_by(supitem):
62
    """
63
    Ajoute dans la base de données un évènement corrélé causé 
64
    par un incident survenu sur l'item passé en paramètre.
65
    Génère un historique pour les tests.
66
    """
67

  
68
    # Ajout d'un événement
69
    event = Event(
70
        supitem = supitem, 
71
        message = u'foo',
72
        current_state = StateName.statename_to_value(u"WARNING"),
73
    )
74
    DBSession.add(event)
75
    DBSession.flush()
76

  
77
    # Ajout des historiques
78
    DBSession.add(EventHistory(
79
        type_action=u'Nagios update state',
80
        idevent=event.idevent, 
81
        timestamp=datetime.now()))
82
    DBSession.add(EventHistory(
83
        type_action=u'Acknowlegement change state',
84
        idevent=event.idevent, 
85
        timestamp=datetime.now()))
86
    DBSession.flush()
87

  
88
    # Ajout d'un événement corrélé
89
    aggregate = CorrEvent(
90
        idcause = event.idevent, 
91
        timestamp_active = datetime.now(),
92
        priority = 1,
93
        status = u"None")
94
    aggregate.events.append(event)
95
    DBSession.add(aggregate)
96
    DBSession.flush()
97
    
98
    return event.idevent
99
    
100

  
101
class TestHistoryTable(TestController):
102
    """
103
    Teste .
104
    """
105

  
106
    def setUp(self):
107
        super(TestEventTable, self).setUp()
108

  
109
    def test_cause_host_history(self):
110
        """Historique de la cause d'un événement corrélé sur un hôte."""
111

  
112
        # On peuple la BDD avec un hôte, un service de bas niveau,
113
        # et un groupe d'hôtes et de services associés à ces items.
114
        (managerhost, managerservice) = populate_DB()
115
        
116
        # On ajoute un évènement corrélé causé par l'hôte
117
        idevent = add_correvent_caused_by(managerhost)
118
        transaction.commit()
119

  
120
        # L'utilisateur n'est pas authentifié.
121
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
122
        # demandant à l'utilisateur de s'authentifier.
123
        response = self.app.get(
124
            '/event/%d' % idevent,
125
            status = 401)
126

  
127
        # L'utilisateur N'A PAS les bonnes permissions.
128
        environ = {'REMOTE_USER': 'editor'}
129
        
130
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
131
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
132
        # n'a pas accès aux informations concernant cet évènement.
133
        response = self.app.get(
134
            '/event/%d' % idevent,
135
            status = 302, 
136
            extra_environ = environ)
137

  
138
        # On suit la redirection.
139
        response = response.follow(status = 200, extra_environ = environ)
140
        assert_true(len(response.lxml.xpath(
141
            '//div[@id="flash"]/div[@class="error"]')))
142

  
143
        # L'utilisateur a les bonnes permissions.
144
        environ = {'REMOTE_USER': 'manager'}
145
        
146
        # On s'attend à ce que le statut de la requête soit 200.
147
        response = self.app.get(
148
            '/event/%d' % idevent,
149
            status = 200, 
150
            extra_environ = environ)
151

  
152
        # Il doit y avoir 2 lignes de résultats.
153
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
154
        assert_equal(len(rows), 2)
155

  
156
    def test_cause_service_history(self):
157
        """Historique de la cause d'un événement corrélé sur un LLS."""
158

  
159
        # On peuple la BDD avec un hôte, un service de bas niveau,
160
        # et un groupe d'hôtes et de services associés à ces items.
161
        (managerhost, managerservice) = populate_DB()
162
        
163
        # On ajoute un évènement corrélé causé par le service
164
        idevent = add_correvent_caused_by(managerservice)
165
        
166
        transaction.commit()
167

  
168
        # L'utilisateur n'est pas authentifié.
169
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
170
        # demandant à l'utilisateur de s'authentifier.
171
        response = self.app.get(
172
            '/event/%d' % idevent,
173
            status = 401)
174

  
175
        # L'utilisateur N'A PAS les bonnes permissions.
176
        environ = {'REMOTE_USER': 'editor'}
177
        
178
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
179
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
180
        # n'a pas accès aux informations concernant cet évènement.
181
        response = self.app.get(
182
            '/event/%d' % idevent,
183
            status = 302, 
184
            extra_environ = environ)
185

  
186
        # On suit la redirection.
187
        response = response.follow(status = 200, extra_environ = environ)
188
        assert_true(len(response.lxml.xpath(
189
            '//div[@id="flash"]/div[@class="error"]')))
190

  
191
        # L'utilisateur a les bonnes permissions.
192
        environ = {'REMOTE_USER': 'manager'}
193
        
194
        # On s'attend à ce que le statut de la requête soit 200.
195
        response = self.app.get(
196
            '/event/%d' % idevent,
197
            status = 200, 
198
            extra_environ = environ)
199

  
200
        # Il doit y avoir 2 lignes de résultats.
201
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
202
        assert_equal(len(rows), 2)
203

  
vigiboard/tests/functional/test_history_tables.py
1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

  
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

  
11
from vigilo.models.configure import DBSession
12
from vigilo.models import Event, EventHistory, CorrEvent, \
13
                            Permission, StateName, \
14
                            Host, HostGroup, LowLevelService, ServiceGroup
15
from vigiboard.tests import TestController
16

  
17
def populate_DB():
18
    """ Peuple la base de données. """
19
    # On ajoute un groupe d'hôtes et un groupe de services.
20
    hostmanagers = HostGroup(name = u'managersgroup')
21
    DBSession.add(hostmanagers)
22
    servicemanagers = ServiceGroup(name = u'managersgroup')
23
    DBSession.add(servicemanagers)
24
    DBSession.flush()
25

  
26
    # On ajoute la permission 'manage' à ces deux groupes.
27
    manage_perm = Permission.by_permission_name(u'manage')
28
    hostmanagers.permissions.append(manage_perm)
29
    servicemanagers.permissions.append(manage_perm)
30
    DBSession.flush()
31

  
32
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
33
    managerhost = Host(
34
        name = u'managerhost',      
35
        checkhostcmd = u'halt',
36
        snmpcommunity = u'public',
37
        hosttpl = u'/dev/null',
38
        mainip = u'192.168.1.1',
39
        snmpport = 42,
40
        weight = 42,
41
    )
42
    DBSession.add(managerhost)
43
    hostmanagers.hosts.append(managerhost)
44
    DBSession.flush()
45

  
46
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
47
    managerservice = LowLevelService(
48
        host = managerhost,
49
        servicename = u'managerservice',
50
        command = u'halt',
51
        op_dep = u'+',
52
        weight = 42,
53
    )
54
    DBSession.add(managerservice)
55
    servicemanagers.services.append(managerservice)
56
    DBSession.flush()
57
    
58
    return (managerhost, managerservice)
59

  
60
def add_correvent_caused_by(supitem, 
61
        correvent_status=u"None", event_status=u"WARNING"):
62
    """
63
    Ajoute dans la base de données un évènement corrélé causé 
64
    par un incident survenu sur l'item passé en paramètre.
65
    Génère un historique pour les tests.
66
    """
67

  
68
    # Ajout d'un événement
69
    event = Event(
70
        supitem = supitem, 
71
        message = u'foo',
72
        current_state = StateName.statename_to_value(event_status),
73
    )
74
    DBSession.add(event)
75
    DBSession.flush()
76

  
77
    # Ajout des historiques
78
    DBSession.add(EventHistory(
79
        type_action=u'Nagios update state',
80
        idevent=event.idevent, 
81
        timestamp=datetime.now()))
82
    DBSession.add(EventHistory(
83
        type_action=u'Acknowlegement change state',
84
        idevent=event.idevent, 
85
        timestamp=datetime.now()))
86
    DBSession.flush()
87

  
88
    # Ajout d'un événement corrélé
89
    aggregate = CorrEvent(
90
        idcause = event.idevent, 
91
        timestamp_active = datetime.now(),
92
        priority = 1,
93
        status = correvent_status)
94
    aggregate.events.append(event)
95
    DBSession.add(aggregate)
96
    DBSession.flush()
97
    
98
    return aggregate.idcorrevent
99
    
100

  
101
class TestEventTable(TestController):
102
    """
103
    Test des historiques de Vigiboard.
104
    """
105

  
106
    def setUp(self):
107
        super(TestEventTable, self).setUp()
108

  
109
    def test_host_event_history(self):
110
        """Affichage de l'historique d'un évènement corrélé pour un hôte."""
111

  
112
        # On peuple la BDD avec un hôte, un service de bas niveau,
113
        # et un groupe d'hôtes et de services associés à ces items.
114
        (managerhost, managerservice) = populate_DB()
115
        
116
        # On ajoute un évènement corrélé causé par l'hôte
117
        aggregate_id = add_correvent_caused_by(managerhost)
118
        
119
        transaction.commit()
120
        
121
        ### 1er cas : L'utilisateur utilisé pour
122
        # se connecter à Vigiboard est 'editor'.
123
        environ = {'REMOTE_USER': 'editor'}
124
        
125
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
126
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
127
        # n'a pas accès aux informations concernant cet évènement.
128
        response = self.app.get(
129
            '/event?idcorrevent=' + str(aggregate_id),
130
            status = 302, 
131
            extra_environ = environ)
132
        
133
        response = self.app.get(
134
            '/', 
135
            status = 200, 
136
            extra_environ = environ)
137
        assert_true(response.lxml.xpath(
138
            '//div[@id="flash"]/div[@class="error"]'))
139

  
140
        ### 2nd cas : L'utilisateur utilisé pour
141
        # se connecter à Vigiboard est 'manager'.
142
        environ = {'REMOTE_USER': 'manager'}
143
        
144
        # On s'attend à ce que le statut de la requête soit 200.
145
        response = self.app.get(
146
            '/event?idcorrevent=' + str(aggregate_id),
147
            status = 200, 
148
            extra_environ = environ)
149

  
150
        # Il doit y avoir 2 lignes de résultats.
151
        rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr')
152
        assert_equal(len(rows), 2)
153

  
154
    def test_service_event_history(self):
155
        """Affichage de l'historique d'un évènement corrélé pour un SBN."""
156

  
157
        # On peuple la BDD avec un hôte, un service de bas niveau,
158
        # et un groupe d'hôtes et de services associés à ces items.
159
        (managerhost, managerservice) = populate_DB()
160
        
161
        # On ajoute un évènement corrélé causé par le service
162
        aggregate_id = add_correvent_caused_by(managerservice)
163
        
164
        transaction.commit()
165
        
166
        ### 1er cas : L'utilisateur utilisé pour
167
        # se connecter à Vigiboard est 'editor'.
168
        environ = {'REMOTE_USER': 'editor'}
169
        
170
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
171
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
172
        # n'a pas accès aux informations concernant cet évènement.
173
        response = self.app.get(
174
            '/event?idcorrevent=' + str(aggregate_id),
175
            status = 302, 
176
            extra_environ = environ)
177
        
178
        response = self.app.get(
179
            '/', 
180
            status = 200, 
181
            extra_environ = environ)
182
        assert_true(response.lxml.xpath(
183
            '//div[@id="flash"]/div[@class="error"]'))
184

  
185
        ### 2nd cas : L'utilisateur utilisé pour
186
        # se connecter à Vigiboard est 'manager'.
187
        environ = {'REMOTE_USER': 'manager'}
188
        
189
        # On s'attend à ce que le statut de la requête soit 200.
190
        response = self.app.get(
191
            '/event?idcorrevent=' + str(aggregate_id),
192
            status = 200, 
193
            extra_environ = environ)
194

  
195
        # Il doit y avoir 2 lignes de résultats.
196
        rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr')
197
        assert_equal(len(rows), 2)
198

  
199
    def test_host_history(self):
200
        """Affichage de l'historique d'un hôte."""
201

  
202
        # On peuple la BDD avec un hôte, un service de bas niveau,
203
        # et un groupe d'hôtes et de services associés à ces items.
204
        (managerhost, managerservice) = populate_DB()
205
        
206
        # On ajoute deux évènements corrélés causés par l'hôte :
207
        # le premier encore ouvert, le second clos par un utilisateur.
208
        add_correvent_caused_by(managerhost)
209
        add_correvent_caused_by(managerhost, u"AAClosed", u"OK")
210
        
211
        transaction.commit()
212
        DBSession.add(managerhost)
213
        
214
        ### 1er cas : L'utilisateur utilisé pour
215
        # se connecter à Vigiboard est 'editor'.
216
        environ = {'REMOTE_USER': 'editor'}
217
        
218
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
219
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
220
        # n'a pas accès aux informations concernant cet évènement.
221
        response = self.app.get(
222
            '/host_service/' + managerhost.name,
223
            status = 302, 
224
            extra_environ = environ)
225
        
226
        response = self.app.get(
227
            '/', 
228
            status = 200, 
229
            extra_environ = environ)
230
        assert_true(response.lxml.xpath(
231
            '//div[@id="flash"]/div[@class="error"]'))
232

  
233
        ### 2nd cas : L'utilisateur utilisé pour
234
        # se connecter à Vigiboard est 'manager'.
235
        environ = {'REMOTE_USER': 'manager'}
236
        
237
        # On s'attend à ce que le statut de la requête soit 200.
238
        response = self.app.get(
239
            '/host_service/' + managerhost.name,
240
            status = 200, 
241
            extra_environ = environ)
242

  
243
        # Il doit y avoir 2 lignes d'évènements 
244
        # + 2 lignes contenant les tableaux d'historiques.
245
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
246
        assert_equal(len(rows), 2 + 2)
247
        # Et 4 lignes d'historiques dans les tableaux d'historiques.
248
        rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr')
249
        assert_equal(len(rows), 4)
250

  
251
    def test_service_history(self):
252
        """Affichage de l'historique d'un service de bas niveau."""
253

  
254
        # On peuple la BDD avec un hôte, un service de bas niveau,
255
        # et un groupe d'hôtes et de services associés à ces items.
256
        (managerhost, managerservice) = populate_DB()
257
        
258
        # On ajoute deux évènements corrélés causés par le service :
259
        # le premier encore ouvert, le second clos par un utilisateur.
260
        add_correvent_caused_by(managerservice)
261
        add_correvent_caused_by(managerservice, u"AAClosed", u"OK")
262
        
263
        transaction.commit()
264
        DBSession.add(managerhost)
265
        DBSession.add(managerservice)
266
        
267
        ### 1er cas : L'utilisateur utilisé pour
268
        # se connecter à Vigiboard est 'editor'.
269
        environ = {'REMOTE_USER': 'editor'}
270
        
271
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
272
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
273
        # n'a pas accès aux informations concernant cet évènement.
274
        response = self.app.get(
275
            '/host_service/' + managerhost.name 
276
                                + '/' + managerservice.servicename,
277
            status = 302, 
278
            extra_environ = environ)
279
        
280
        response = self.app.get(
281
            '/', 
282
            status = 200, 
283
            extra_environ = environ)
284
        assert_true(response.lxml.xpath(
285
            '//div[@id="flash"]/div[@class="error"]'))
286

  
287
        ### 2nd cas : L'utilisateur utilisé pour
288
        # se connecter à Vigiboard est 'manager'.
289
        environ = {'REMOTE_USER': 'manager'}
290
        
291
        # On s'attend à ce que le statut de la requête soit 200.
292
        response = self.app.get(
293
            '/host_service/' + managerhost.name 
294
                                + '/' + managerservice.servicename,
295
            status = 200, 
296
            extra_environ = environ)
297

  
298
        # Il doit y avoir 2 lignes d'évènements 
299
        # + 2 lignes contenant les tableaux d'historiques.
300
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
301
        assert_equal(len(rows), 2 + 2)
302
        # Et 4 lignes d'historiques dans les tableaux d'historiques.
303
        rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr')
304
        assert_equal(len(rows), 4)
305

  
vigiboard/tests/functional/test_host_vigiboardrequest.py
1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test de la classe Vigiboard Request pour des requêtes concernant les hôtes
5
"""
6

  
7
from nose.tools import assert_true
8
from datetime import datetime
9
import tg
10
import transaction
11

  
12
from vigilo.models.configure import DBSession
13
from vigilo.models import Event, EventHistory, CorrEvent, \
14
                            Permission, User, StateName, \
15
                            Host, HostGroup
16
from vigiboard.tests import TestController
17
from vigiboard.controllers.vigiboardrequest import VigiboardRequest
18

  
19
class TestHostVigiboardRequest(TestController):
20
    """ Préparation de la base de données en vue des tests. """
21

  
22
    def setUp(self):
23
        super(TestHostVigiboardRequest, self).setUp()
24

  
25
        # On peuple la base de données.
26

  
27
        # On ajoute les groupes et leurs dépendances
28
        self.hosteditors = HostGroup(name=u'editorsgroup')
29
        DBSession.add(self.hosteditors)
30
        self.hostmanagers = HostGroup(name=u'managersgroup', 
31
                                        parent=self.hosteditors)
32
        DBSession.add(self.hostmanagers)
33
        DBSession.flush()
34

  
35
        manage_perm = Permission.by_permission_name(u'manage')
36
        edit_perm = Permission.by_permission_name(u'edit')
37

  
38
        self.hostmanagers.permissions.append(manage_perm)
39
        self.hosteditors.permissions.append(edit_perm)
40
        DBSession.flush()
41

  
42
        # Création des hôtes de test.
43
        host_template = {
44
            'checkhostcmd': u'halt',
45
            'snmpcommunity': u'public',
46
            'hosttpl': u'/dev/null',
47
            'mainip': u'192.168.1.1',
48
            'snmpport': 42,
49
            'weight': 42,
50
        }
51

  
52
        managerhost = Host(name=u'managerhost', **host_template)
53
        editorhost = Host(name=u'editorhost', **host_template)
54
        DBSession.add(managerhost)
55
        DBSession.add(editorhost)
56

  
57
        # Affectation des hôtes aux groupes.
58
        self.hosteditors.hosts.append(editorhost)
59
        self.hostmanagers.hosts.append(managerhost)
60
        DBSession.flush()
61

  
62
        # Ajout des événements eux-mêmes
63
        event_template = {
64
            'message': u'foo',
65
            'current_state': StateName.statename_to_value(u'WARNING'),
66
        }
67
        event1 = Event(supitem=managerhost, **event_template)
68
        event2 = Event(supitem=editorhost, **event_template)
69
        DBSession.add(event1)
70
        DBSession.add(event2)
71
        DBSession.flush()
72

  
73
        # Ajout des historiques
74
        DBSession.add(EventHistory(type_action=u'Nagios update state',
75
            idevent=event1.idevent, timestamp=datetime.now()))
76
        DBSession.add(EventHistory(type_action=u'Acknowledgement change state',
77
            idevent=event1.idevent, timestamp=datetime.now()))
78
        DBSession.add(EventHistory(type_action=u'Nagios update state',
79
            idevent=event2.idevent, timestamp=datetime.now()))
80
        DBSession.add(EventHistory(type_action=u'Acknowledgement change state',
81
            idevent=event2.idevent, timestamp=datetime.now()))
82
        DBSession.flush()
83

  
84
        # Ajout des événements corrélés
85
        aggregate_template = {
86
            'timestamp_active': datetime.now(),
87
            'priority': 1,
88
            'status': u'None',
89
        }
90
        self.aggregate1 = CorrEvent(
91
            idcause=event1.idevent, **aggregate_template)
92
        self.aggregate2 = CorrEvent(
93
            idcause=event2.idevent, **aggregate_template)
94

  
95
        self.aggregate1.events.append(event1)
96
        self.aggregate2.events.append(event2)
97
        DBSession.add(self.aggregate1)
98
        DBSession.add(self.aggregate2)
99
        DBSession.flush()
100
        transaction.commit()
101

  
102
    def tearDown(self):
103
        """ Nettoyage de la base de données après les tests. """
104
        super(TestHostVigiboardRequest, self).tearDown()
105

  
106

  
107
    def test_request_creation(self):
108
        """Génération d'une requête avec plugin et permissions."""
109

  
110
        # On indique qui on est et on envoie une requête sur 
111
        # l'index pour obtenir toutes les variables de sessions.
112
        environ = {'REMOTE_USER': 'editor'}
113
        response = self.app.get('/', extra_environ=environ)
114
        tg.request = response.request
115

  
116
        # Derrière, VigiboardRequest doit charger le plugin de tests tout seul
117
        vigi_req = VigiboardRequest(User.by_user_name(u'editor'))
118
        vigi_req.add_table(
119
            CorrEvent,
120
            vigi_req.items.c.hostname,
121
            vigi_req.items.c.servicename,
122
        )
123
        vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent))
124
        vigi_req.add_join((vigi_req.items, 
125
            Event.idsupitem == vigi_req.items.c.idsupitem))
126

  
127
        # On vérifie que le nombre d'événements corrélés 
128
        # trouvés par la requête est bien égal à 1.
129
        num_rows = vigi_req.num_rows()
130
        assert_true(num_rows == 1, 
131
            msg = "One history should be available for " +
132
            "the user 'editor' but there are %d" % num_rows)
133

  
134
        vigi_req.format_events(0, 10)
135
        vigi_req.format_history()
136
        assert_true(len(vigi_req.events) == 1 + 1,
137
            msg = "One history should be available for user " +
138
            "'editor' but there are %d" % (len(vigi_req.events) - 1))
139

  
140

  
141
        # On recommence les tests précédents avec l'utilisateur
142
        # manager (qui dispose de plus de droits).
143
        environ = {'REMOTE_USER': 'manager'}
144
        response = self.app.get('/', extra_environ=environ)
145
        tg.request = response.request
146

  
147
        vigi_req = VigiboardRequest(User.by_user_name(u'manager'))
148
        vigi_req.add_plugin(MonPlugin)
149
        vigi_req.add_table(
150
            CorrEvent,
151
            vigi_req.items.c.hostname,
152
            vigi_req.items.c.servicename,
153
        )
154
        vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent))
155
        vigi_req.add_join((vigi_req.items, 
156
            Event.idsupitem == vigi_req.items.c.idsupitem))
157

  
158
        # On vérifie que le nombre d'événements corrélés 
159
        # trouvés par la requête est bien égal à 2.
160
        num_rows = vigi_req.num_rows()
161
        assert_true(num_rows == 2,  
162
            msg = "2 histories should be available for " +
163
            "the user 'manager' but there are %d" % num_rows)
164

  
165
        vigi_req.format_events(0, 10)
166
        vigi_req.format_history()
167
        assert_true(len(vigi_req.events) == 2 + 1,
168
            msg = "2 histories should be available for user " +
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff