Revision 072f2a16
Ménage dans les tests de VigiBoard + légères corrections pour pylint.
git-svn-id: https://vigilo-dev.si.c-s.fr/svn@2171 b22e2e97-25c9-44ff-b637-2e5ceca36478
vigiboard/controllers/root.py | ||
---|---|---|
217 | 217 |
events.items.c.hostname, |
218 | 218 |
events.items.c.servicename, |
219 | 219 |
) |
220 |
events.add_join((EVENTSAGGREGATE_TABLE, EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent)) |
|
221 |
events.add_join((CorrEvent, CorrEvent.idcorrevent == EVENTSAGGREGATE_TABLE.c.idcorrevent)) |
|
220 |
events.add_join((EVENTSAGGREGATE_TABLE, \ |
|
221 |
EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent)) |
|
222 |
events.add_join((CorrEvent, CorrEvent.idcorrevent == \ |
|
223 |
EVENTSAGGREGATE_TABLE.c.idcorrevent)) |
|
222 | 224 |
events.add_join((events.items, |
223 | 225 |
Event.idsupitem == events.items.c.idsupitem)) |
224 | 226 |
events.add_filter(Event.idevent != CorrEvent.idcause) |
... | ... | |
292 | 294 |
events.items.c.hostname, |
293 | 295 |
events.items.c.servicename, |
294 | 296 |
) |
295 |
events.add_join((EVENTSAGGREGATE_TABLE, EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent)) |
|
296 |
events.add_join((CorrEvent, CorrEvent.idcorrevent == EVENTSAGGREGATE_TABLE.c.idcorrevent)) |
|
297 |
events.add_join((EVENTSAGGREGATE_TABLE, \ |
|
298 |
EVENTSAGGREGATE_TABLE.c.idevent == Event.idevent)) |
|
299 |
events.add_join((CorrEvent, CorrEvent.idcorrevent == \ |
|
300 |
EVENTSAGGREGATE_TABLE.c.idcorrevent)) |
|
297 | 301 |
events.add_join((events.items, |
298 | 302 |
Event.idsupitem == events.items.c.idsupitem)) |
299 | 303 |
events.add_filter(Event.idevent == idevent) |
vigiboard/tests/functional/test_correvents_table.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# vim:set expandtab tabstop=4 shiftwidth=4: |
|
3 |
""" |
|
4 |
Test du tableau d'événements de Vigiboard |
|
5 |
""" |
|
6 |
|
|
7 |
from nose.tools import assert_true, assert_equal |
|
8 |
from datetime import datetime |
|
9 |
import transaction |
|
10 |
|
|
11 |
from vigilo.models.configure import DBSession |
|
12 |
from vigilo.models import Event, CorrEvent, \ |
|
13 |
Permission, StateName, \ |
|
14 |
Host, HostGroup, LowLevelService |
|
15 |
from vigiboard.tests import TestController |
|
16 |
|
|
17 |
def populate_DB(): |
|
18 |
""" Peuple la base de données. """ |
|
19 |
# On ajoute les groupes et leurs dépendances |
|
20 |
hosteditors = HostGroup(name=u'editorsgroup') |
|
21 |
DBSession.add(hosteditors) |
|
22 |
hostmanagers = HostGroup(name=u'managersgroup', parent=hosteditors) |
|
23 |
DBSession.add(hostmanagers) |
|
24 |
DBSession.flush() |
|
25 |
|
|
26 |
manage_perm = Permission.by_permission_name(u'manage') |
|
27 |
edit_perm = Permission.by_permission_name(u'edit') |
|
28 |
|
|
29 |
hostmanagers.permissions.append(manage_perm) |
|
30 |
hosteditors.permissions.append(edit_perm) |
|
31 |
DBSession.flush() |
|
32 |
|
|
33 |
# Création des hôtes de test. |
|
34 |
host_template = { |
|
35 |
'checkhostcmd': u'halt', |
|
36 |
'snmpcommunity': u'public', |
|
37 |
'hosttpl': u'/dev/null', |
|
38 |
'mainip': u'192.168.1.1', |
|
39 |
'snmpport': 42, |
|
40 |
'weight': 42, |
|
41 |
} |
|
42 |
|
|
43 |
managerhost = Host(name=u'managerhost', **host_template) |
|
44 |
editorhost = Host(name=u'editorhost', **host_template) |
|
45 |
DBSession.add(managerhost) |
|
46 |
DBSession.add(editorhost) |
|
47 |
|
|
48 |
# Affectation des hôtes aux groupes. |
|
49 |
hosteditors.hosts.append(editorhost) |
|
50 |
hostmanagers.hosts.append(managerhost) |
|
51 |
DBSession.flush() |
|
52 |
|
|
53 |
# Création des services techniques de test. |
|
54 |
service_template = { |
|
55 |
'command': u'halt', |
|
56 |
'op_dep': u'+', |
|
57 |
'weight': 42, |
|
58 |
} |
|
59 |
|
|
60 |
service1 = LowLevelService( |
|
61 |
host=managerhost, |
|
62 |
servicename=u'managerservice', |
|
63 |
**service_template |
|
64 |
) |
|
65 |
|
|
66 |
service2 = LowLevelService( |
|
67 |
host=editorhost, |
|
68 |
servicename=u'managerservice', |
|
69 |
**service_template |
|
70 |
) |
|
71 |
|
|
72 |
service3 = LowLevelService( |
|
73 |
host=managerhost, |
|
74 |
servicename=u'editorservice', |
|
75 |
**service_template |
|
76 |
) |
|
77 |
|
|
78 |
service4 = LowLevelService( |
|
79 |
host=editorhost, |
|
80 |
servicename=u'editorservice', |
|
81 |
**service_template |
|
82 |
) |
|
83 |
|
|
84 |
DBSession.add(service1) |
|
85 |
DBSession.add(service2) |
|
86 |
DBSession.add(service3) |
|
87 |
DBSession.add(service4) |
|
88 |
DBSession.flush() |
|
89 |
|
|
90 |
# Ajout des événements eux-mêmes |
|
91 |
event_template = { |
|
92 |
'message': u'foo', |
|
93 |
'current_state': StateName.statename_to_value(u'WARNING'), |
|
94 |
} |
|
95 |
|
|
96 |
event1 = Event(supitem=service1, **event_template) |
|
97 |
event2 = Event(supitem=service2, **event_template) |
|
98 |
event3 = Event(supitem=service3, **event_template) |
|
99 |
event4 = Event(supitem=service4, **event_template) |
|
100 |
event5 = Event(supitem=editorhost, **event_template) |
|
101 |
event6 = Event(supitem=managerhost, **event_template) |
|
102 |
|
|
103 |
DBSession.add(event1) |
|
104 |
DBSession.add(event2) |
|
105 |
DBSession.add(event3) |
|
106 |
DBSession.add(event4) |
|
107 |
DBSession.add(event5) |
|
108 |
DBSession.add(event6) |
|
109 |
DBSession.flush() |
|
110 |
|
|
111 |
# Ajout des événements corrélés |
|
112 |
aggregate_template = { |
|
113 |
'timestamp_active': datetime.now(), |
|
114 |
'priority': 1, |
|
115 |
'status': u'None', |
|
116 |
} |
|
117 |
aggregate1 = CorrEvent( |
|
118 |
idcause=event1.idevent, **aggregate_template) |
|
119 |
aggregate2 = CorrEvent( |
|
120 |
idcause=event4.idevent, **aggregate_template) |
|
121 |
aggregate3 = CorrEvent( |
|
122 |
idcause=event5.idevent, **aggregate_template) |
|
123 |
aggregate4 = CorrEvent( |
|
124 |
idcause=event6.idevent, **aggregate_template) |
|
125 |
|
|
126 |
aggregate1.events.append(event1) |
|
127 |
aggregate1.events.append(event3) |
|
128 |
aggregate2.events.append(event4) |
|
129 |
aggregate2.events.append(event2) |
|
130 |
aggregate3.events.append(event5) |
|
131 |
aggregate4.events.append(event6) |
|
132 |
DBSession.add(aggregate1) |
|
133 |
DBSession.add(aggregate2) |
|
134 |
DBSession.add(aggregate3) |
|
135 |
DBSession.add(aggregate4) |
|
136 |
DBSession.flush() |
|
137 |
transaction.commit() |
|
138 |
|
|
139 |
class TestEventTable(TestController): |
|
140 |
""" |
|
141 |
Test du tableau d'événements de Vigiboard |
|
142 |
""" |
|
143 |
|
|
144 |
def test_event_table(self): |
|
145 |
""" |
|
146 |
Test du tableau d'évènements de la page d'accueil de Vigiboard. |
|
147 |
""" |
|
148 |
|
|
149 |
populate_DB() |
|
150 |
|
|
151 |
### 1er cas : L'utilisateur utilisé pour |
|
152 |
# se connecter à Vigiboard est 'editor'. |
|
153 |
environ = {'REMOTE_USER': 'editor'} |
|
154 |
response = self.app.get('/', extra_environ=environ) |
|
155 |
|
|
156 |
# Il doit y avoir 2 lignes de résultats. |
|
157 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
158 |
print "There are %d rows in the result set" % len(rows) |
|
159 |
assert_equal(len(rows), 2) |
|
160 |
|
|
161 |
# Il doit y avoir plusieurs colonnes dans la ligne de résultats. |
|
162 |
cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td') |
|
163 |
print "There are %d columns in the result set" % len(cols) |
|
164 |
assert_true(len(cols) > 1) |
|
165 |
|
|
166 |
### 2nd cas : L'utilisateur utilisé pour |
|
167 |
# se connecter à Vigiboard est 'manager'. |
|
168 |
environ = {'REMOTE_USER': 'manager'} |
|
169 |
response = self.app.get('/', extra_environ=environ) |
|
170 |
|
|
171 |
# Il doit y avoir 4 lignes de résultats. |
|
172 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
173 |
print "There are %d rows in the result set" % len(rows) |
|
174 |
assert_equal(len(rows), 4) |
|
175 |
|
|
176 |
# Il doit y avoir plusieurs colonnes dans la ligne de résultats. |
|
177 |
cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td') |
|
178 |
print "There are %d columns in the result set" % len(cols) |
|
179 |
assert_true(len(cols) > 1) |
|
180 |
|
vigiboard/tests/functional/test_details_plugin.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
""" |
|
3 |
Teste le formulaire donnant les liens vers les outils extérieurs |
|
4 |
et les données de l'historique. |
|
5 |
""" |
|
6 |
from nose.tools import assert_true, assert_equal |
|
7 |
from datetime import datetime |
|
8 |
import transaction |
|
9 |
|
|
10 |
from vigiboard.tests import TestController |
|
11 |
from vigilo.models.configure import DBSession |
|
12 |
from vigilo.models import ServiceGroup, HostGroup, \ |
|
13 |
Host, Permission, StateName, \ |
|
14 |
LowLevelService, Event, CorrEvent |
|
15 |
|
|
16 |
def insert_deps(return_service): |
|
17 |
""" |
|
18 |
Insère les dépendances nécessaires aux tests. |
|
19 |
|
|
20 |
@param return_service: Indique si les événements générés |
|
21 |
concernent un hôte (False) ou un service de bas niveau (True). |
|
22 |
@type return_service: C{bool} |
|
23 |
@return: Renvoie un tuple avec le groupe d'hôte créé, |
|
24 |
l'identifiant du L{CorrEvent} généré et enfin, |
|
25 |
l'identifiant de l'L{Event} généré. |
|
26 |
@rtype: C{tuple} |
|
27 |
""" |
|
28 |
timestamp = datetime.now() |
|
29 |
|
|
30 |
hostgroup = HostGroup( |
|
31 |
name=u'foo', |
|
32 |
) |
|
33 |
DBSession.add(hostgroup) |
|
34 |
|
|
35 |
host = Host( |
|
36 |
name=u'bar', |
|
37 |
checkhostcmd=u'', |
|
38 |
description=u'', |
|
39 |
hosttpl=u'', |
|
40 |
mainip=u'127.0.0.1', |
|
41 |
snmpport=42, |
|
42 |
snmpcommunity=u'public', |
|
43 |
snmpversion=u'3', |
|
44 |
weight=42, |
|
45 |
) |
|
46 |
DBSession.add(host) |
|
47 |
DBSession.flush() |
|
48 |
|
|
49 |
hostgroup.hosts.append(host) |
|
50 |
DBSession.flush() |
|
51 |
|
|
52 |
servicegroup = ServiceGroup( |
|
53 |
name=u'foo', |
|
54 |
) |
|
55 |
DBSession.add(servicegroup) |
|
56 |
|
|
57 |
service = LowLevelService( |
|
58 |
host=host, |
|
59 |
command=u'', |
|
60 |
weight=42, |
|
61 |
servicename=u'baz', |
|
62 |
op_dep=u'&', |
|
63 |
) |
|
64 |
DBSession.add(service) |
|
65 |
DBSession.flush() |
|
66 |
|
|
67 |
servicegroup.services.append(service) |
|
68 |
DBSession.flush() |
|
69 |
|
|
70 |
event = Event( |
|
71 |
timestamp=timestamp, |
|
72 |
current_state=StateName.statename_to_value(u'WARNING'), |
|
73 |
message=u'Hello world', |
|
74 |
) |
|
75 |
if return_service: |
|
76 |
event.supitem = service |
|
77 |
else: |
|
78 |
event.supitem = host |
|
79 |
DBSession.add(event) |
|
80 |
DBSession.flush() |
|
81 |
|
|
82 |
correvent = CorrEvent( |
|
83 |
impact=42, |
|
84 |
priority=42, |
|
85 |
trouble_ticket=None, |
|
86 |
status=u'None', |
|
87 |
occurrence=42, |
|
88 |
timestamp_active=timestamp, |
|
89 |
cause=event, |
|
90 |
) |
|
91 |
correvent.events.append(event) |
|
92 |
DBSession.add(correvent) |
|
93 |
DBSession.flush() |
|
94 |
|
|
95 |
return (hostgroup, correvent.idcorrevent, event.idevent) |
|
96 |
|
|
97 |
class TestDetailsPlugin(TestController): |
|
98 |
"""Teste le dialogue pour l'accès aux historiques.""" |
|
99 |
|
|
100 |
def test_details_plugin_LLS_alert_when_allowed(self): |
|
101 |
"""Dialogue des détails avec un LLS et les bons droits.""" |
|
102 |
hostgroup, idcorrevent, idcause = insert_deps(True) |
|
103 |
manage = Permission.by_permission_name(u'manage') |
|
104 |
manage.hostgroups.append(hostgroup) |
|
105 |
DBSession.flush() |
|
106 |
transaction.commit() |
|
107 |
|
|
108 |
response = self.app.post('/get_plugin_value', { |
|
109 |
'idcorrevent': idcorrevent, |
|
110 |
'plugin_name': 'details', |
|
111 |
}, extra_environ={'REMOTE_USER': 'manager'}) |
|
112 |
json = response.json |
|
113 |
|
|
114 |
# Le contenu de "eventdetails" varie facilement. |
|
115 |
# On le teste séparément. |
|
116 |
json.pop('eventdetails', None) |
|
117 |
assert_true('eventdetails' in response.json) |
|
118 |
|
|
119 |
assert_equal(json, { |
|
120 |
"idcorrevent": idcorrevent, |
|
121 |
"idcause": idcause, |
|
122 |
"service": "baz", |
|
123 |
"peak_state": "WARNING", |
|
124 |
"current_state": "WARNING", |
|
125 |
"host": "bar", |
|
126 |
"initial_state": "WARNING" |
|
127 |
}) |
|
128 |
|
|
129 |
def test_details_plugin_host_alert_when_allowed(self): |
|
130 |
"""Dialogue des détails avec un hôte et les bons droits.""" |
|
131 |
hostgroup, idcorrevent, idcause = insert_deps(False) |
|
132 |
manage = Permission.by_permission_name(u'manage') |
|
133 |
manage.hostgroups.append(hostgroup) |
|
134 |
DBSession.flush() |
|
135 |
transaction.commit() |
|
136 |
|
|
137 |
response = self.app.post('/get_plugin_value', { |
|
138 |
'idcorrevent': idcorrevent, |
|
139 |
'plugin_name': 'details', |
|
140 |
}, extra_environ={'REMOTE_USER': 'manager'}) |
|
141 |
json = response.json |
|
142 |
|
|
143 |
# Le contenu de "eventdetails" varie facilement. |
|
144 |
# On le teste séparément. |
|
145 |
json.pop('eventdetails', None) |
|
146 |
assert_true('eventdetails' in response.json) |
|
147 |
|
|
148 |
assert_equal(json, { |
|
149 |
"idcorrevent": idcorrevent, |
|
150 |
"idcause": idcause, |
|
151 |
"service": None, |
|
152 |
"peak_state": "WARNING", |
|
153 |
"current_state": "WARNING", |
|
154 |
"host": "bar", |
|
155 |
"initial_state": "WARNING" |
|
156 |
}) |
|
157 |
|
|
158 |
|
|
159 |
def test_details_plugin_LLS_when_forbidden(self): |
|
160 |
"""Dialogue des détails avec un LLS et des droits insuffisants.""" |
|
161 |
idcorrevent = insert_deps(True)[1] |
|
162 |
DBSession.flush() |
|
163 |
transaction.commit() |
|
164 |
|
|
165 |
# Le contrôleur renvoie une erreur 404 (HTTPNotFound) |
|
166 |
# lorsque l'utilisateur n'a pas les permissions nécessaires sur |
|
167 |
# les données ou qu'aucun événement correspondant n'est trouvé. |
|
168 |
self.app.post('/get_plugin_value', { |
|
169 |
'idcorrevent': idcorrevent, |
|
170 |
'plugin_name': 'details', |
|
171 |
}, extra_environ={'REMOTE_USER': 'manager'}, |
|
172 |
status=404) |
|
173 |
|
|
174 |
def test_details_plugin_host_when_forbidden(self): |
|
175 |
"""Dialogue des détails avec un hôte et des droits insuffisants.""" |
|
176 |
idcorrevent = insert_deps(False)[1] |
|
177 |
DBSession.flush() |
|
178 |
transaction.commit() |
|
179 |
|
|
180 |
# Le contrôleur renvoie une erreur 404 (HTTPNotFound) |
|
181 |
# lorsque l'utilisateur n'a pas les permissions nécessaires sur |
|
182 |
# les données ou qu'aucun événement correspondant n'est trouvé. |
|
183 |
self.app.post('/get_plugin_value', { |
|
184 |
'idcorrevent': idcorrevent, |
|
185 |
'plugin_name': 'details', |
|
186 |
}, extra_environ={'REMOTE_USER': 'manager'}, |
|
187 |
status=404) |
|
188 |
|
|
189 |
def test_details_plugin_LLS_anonymous(self): |
|
190 |
"""Dialogue des détails avec un LLS et en anonyme.""" |
|
191 |
idcorrevent = insert_deps(True)[1] |
|
192 |
DBSession.flush() |
|
193 |
transaction.commit() |
|
194 |
|
|
195 |
# Le contrôleur renvoie une erreur 401 (HTTPUnauthorized) |
|
196 |
# lorsque l'utilisateur n'est pas authentifié. |
|
197 |
self.app.post('/get_plugin_value', { |
|
198 |
'idcorrevent': idcorrevent, |
|
199 |
'plugin_name': 'details', |
|
200 |
}, status=401) |
|
201 |
|
|
202 |
def test_details_plugin_host_anonymous(self): |
|
203 |
"""Dialogue des détails avec un hôte et en anonyme.""" |
|
204 |
idcorrevent = insert_deps(False)[1] |
|
205 |
DBSession.flush() |
|
206 |
transaction.commit() |
|
207 |
|
|
208 |
# Le contrôleur renvoie une erreur 401 (HTTPUnauthorized) |
|
209 |
# lorsque l'utilisateur n'est pas authentifié. |
|
210 |
self.app.post('/get_plugin_value', { |
|
211 |
'idcorrevent': idcorrevent, |
|
212 |
'plugin_name': 'details', |
|
213 |
}, status=401) |
|
214 |
|
vigiboard/tests/functional/test_event_table.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# vim:set expandtab tabstop=4 shiftwidth=4: |
|
3 |
""" |
|
4 |
Test du tableau d'événements de Vigiboard |
|
5 |
""" |
|
6 |
|
|
7 |
from nose.tools import assert_true, assert_equal |
|
8 |
from datetime import datetime |
|
9 |
import transaction |
|
10 |
|
|
11 |
from vigilo.models.configure import DBSession |
|
12 |
from vigilo.models import Event, CorrEvent, \ |
|
13 |
Permission, StateName, \ |
|
14 |
Host, HostGroup, LowLevelService |
|
15 |
from vigiboard.tests import TestController |
|
16 |
|
|
17 |
def populate_DB(): |
|
18 |
""" Peuple la base de données. """ |
|
19 |
# On ajoute les groupes et leurs dépendances |
|
20 |
hosteditors = HostGroup(name=u'editorsgroup') |
|
21 |
DBSession.add(hosteditors) |
|
22 |
hostmanagers = HostGroup(name=u'managersgroup', parent=hosteditors) |
|
23 |
DBSession.add(hostmanagers) |
|
24 |
DBSession.flush() |
|
25 |
|
|
26 |
manage_perm = Permission.by_permission_name(u'manage') |
|
27 |
edit_perm = Permission.by_permission_name(u'edit') |
|
28 |
|
|
29 |
hostmanagers.permissions.append(manage_perm) |
|
30 |
hosteditors.permissions.append(edit_perm) |
|
31 |
DBSession.flush() |
|
32 |
|
|
33 |
# Création des hôtes de test. |
|
34 |
host_template = { |
|
35 |
'checkhostcmd': u'halt', |
|
36 |
'snmpcommunity': u'public', |
|
37 |
'hosttpl': u'/dev/null', |
|
38 |
'mainip': u'192.168.1.1', |
|
39 |
'snmpport': 42, |
|
40 |
'weight': 42, |
|
41 |
} |
|
42 |
|
|
43 |
managerhost = Host(name=u'managerhost', **host_template) |
|
44 |
editorhost = Host(name=u'editorhost', **host_template) |
|
45 |
DBSession.add(managerhost) |
|
46 |
DBSession.add(editorhost) |
|
47 |
|
|
48 |
# Affectation des hôtes aux groupes. |
|
49 |
hosteditors.hosts.append(editorhost) |
|
50 |
hostmanagers.hosts.append(managerhost) |
|
51 |
DBSession.flush() |
|
52 |
|
|
53 |
# Création des services techniques de test. |
|
54 |
service_template = { |
|
55 |
'command': u'halt', |
|
56 |
'op_dep': u'+', |
|
57 |
'weight': 42, |
|
58 |
} |
|
59 |
|
|
60 |
service1 = LowLevelService( |
|
61 |
host=managerhost, |
|
62 |
servicename=u'managerservice', |
|
63 |
**service_template |
|
64 |
) |
|
65 |
|
|
66 |
service2 = LowLevelService( |
|
67 |
host=editorhost, |
|
68 |
servicename=u'managerservice', |
|
69 |
**service_template |
|
70 |
) |
|
71 |
|
|
72 |
service3 = LowLevelService( |
|
73 |
host=managerhost, |
|
74 |
servicename=u'editorservice', |
|
75 |
**service_template |
|
76 |
) |
|
77 |
|
|
78 |
service4 = LowLevelService( |
|
79 |
host=editorhost, |
|
80 |
servicename=u'editorservice', |
|
81 |
**service_template |
|
82 |
) |
|
83 |
|
|
84 |
DBSession.add(service1) |
|
85 |
DBSession.add(service2) |
|
86 |
DBSession.add(service3) |
|
87 |
DBSession.add(service4) |
|
88 |
DBSession.flush() |
|
89 |
|
|
90 |
# Ajout des événements eux-mêmes |
|
91 |
event_template = { |
|
92 |
'message': u'foo', |
|
93 |
'current_state': StateName.statename_to_value(u'WARNING'), |
|
94 |
} |
|
95 |
|
|
96 |
event1 = Event(supitem=service1, **event_template) |
|
97 |
event2 = Event(supitem=service2, **event_template) |
|
98 |
event3 = Event(supitem=service3, **event_template) |
|
99 |
event4 = Event(supitem=service4, **event_template) |
|
100 |
event5 = Event(supitem=editorhost, **event_template) |
|
101 |
event6 = Event(supitem=managerhost, **event_template) |
|
102 |
|
|
103 |
DBSession.add(event1) |
|
104 |
DBSession.add(event2) |
|
105 |
DBSession.add(event3) |
|
106 |
DBSession.add(event4) |
|
107 |
DBSession.add(event5) |
|
108 |
DBSession.add(event6) |
|
109 |
DBSession.flush() |
|
110 |
|
|
111 |
# Ajout des événements corrélés |
|
112 |
aggregate_template = { |
|
113 |
'timestamp_active': datetime.now(), |
|
114 |
'priority': 1, |
|
115 |
'status': u'None', |
|
116 |
} |
|
117 |
aggregate1 = CorrEvent( |
|
118 |
idcause=event1.idevent, **aggregate_template) |
|
119 |
aggregate2 = CorrEvent( |
|
120 |
idcause=event4.idevent, **aggregate_template) |
|
121 |
aggregate3 = CorrEvent( |
|
122 |
idcause=event5.idevent, **aggregate_template) |
|
123 |
aggregate4 = CorrEvent( |
|
124 |
idcause=event6.idevent, **aggregate_template) |
|
125 |
|
|
126 |
aggregate1.events.append(event1) |
|
127 |
aggregate1.events.append(event3) |
|
128 |
aggregate2.events.append(event4) |
|
129 |
aggregate2.events.append(event2) |
|
130 |
aggregate3.events.append(event5) |
|
131 |
aggregate4.events.append(event6) |
|
132 |
DBSession.add(aggregate1) |
|
133 |
DBSession.add(aggregate2) |
|
134 |
DBSession.add(aggregate3) |
|
135 |
DBSession.add(aggregate4) |
|
136 |
DBSession.flush() |
|
137 |
transaction.commit() |
|
138 |
|
|
139 |
class TestEventTable(TestController): |
|
140 |
""" |
|
141 |
Test du tableau d'événements de Vigiboard |
|
142 |
""" |
|
143 |
|
|
144 |
def test_event_table(self): |
|
145 |
""" |
|
146 |
Test du tableau d'évènements de la page d'accueil de Vigiboard. |
|
147 |
""" |
|
148 |
|
|
149 |
populate_DB() |
|
150 |
|
|
151 |
### 1er cas : L'utilisateur utilisé pour |
|
152 |
# se connecter à Vigiboard est 'editor'. |
|
153 |
environ = {'REMOTE_USER': 'editor'} |
|
154 |
response = self.app.get('/', extra_environ=environ) |
|
155 |
|
|
156 |
# Il doit y avoir 2 lignes de résultats. |
|
157 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
158 |
print "There are %d rows in the result set" % len(rows) |
|
159 |
assert_equal(len(rows), 2) |
|
160 |
|
|
161 |
# Il doit y avoir plusieurs colonnes dans la ligne de résultats. |
|
162 |
cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td') |
|
163 |
print "There are %d columns in the result set" % len(cols) |
|
164 |
assert_true(len(cols) > 1) |
|
165 |
|
|
166 |
### 2nd cas : L'utilisateur utilisé pour |
|
167 |
# se connecter à Vigiboard est 'manager'. |
|
168 |
environ = {'REMOTE_USER': 'manager'} |
|
169 |
response = self.app.get('/', extra_environ=environ) |
|
170 |
|
|
171 |
# Il doit y avoir 4 lignes de résultats. |
|
172 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
173 |
print "There are %d rows in the result set" % len(rows) |
|
174 |
assert_equal(len(rows), 4) |
|
175 |
|
|
176 |
# Il doit y avoir plusieurs colonnes dans la ligne de résultats. |
|
177 |
cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td') |
|
178 |
print "There are %d columns in the result set" % len(cols) |
|
179 |
assert_true(len(cols) > 1) |
|
180 |
|
vigiboard/tests/functional/test_history_form.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
""" |
|
3 |
Teste le formulaire donnant les liens vers les outils extérieurs |
|
4 |
et les données de l'historique. |
|
5 |
""" |
|
6 |
from nose.tools import assert_true, assert_equal |
|
7 |
from datetime import datetime |
|
8 |
import transaction |
|
9 |
|
|
10 |
from vigiboard.tests import TestController |
|
11 |
from vigilo.models.configure import DBSession |
|
12 |
from vigilo.models import ServiceGroup, HostGroup, \ |
|
13 |
Host, Permission, StateName, \ |
|
14 |
LowLevelService, Event, CorrEvent |
|
15 |
|
|
16 |
def insert_deps(return_service): |
|
17 |
"""Insère les dépendances nécessaires aux tests.""" |
|
18 |
timestamp = datetime.now() |
|
19 |
|
|
20 |
hostgroup = HostGroup( |
|
21 |
name=u'foo', |
|
22 |
) |
|
23 |
DBSession.add(hostgroup) |
|
24 |
|
|
25 |
host = Host( |
|
26 |
name=u'bar', |
|
27 |
checkhostcmd=u'', |
|
28 |
description=u'', |
|
29 |
hosttpl=u'', |
|
30 |
mainip=u'127.0.0.1', |
|
31 |
snmpport=42, |
|
32 |
snmpcommunity=u'public', |
|
33 |
snmpversion=u'3', |
|
34 |
weight=42, |
|
35 |
) |
|
36 |
DBSession.add(host) |
|
37 |
DBSession.flush() |
|
38 |
|
|
39 |
hostgroup.hosts.append(host) |
|
40 |
DBSession.flush() |
|
41 |
|
|
42 |
servicegroup = ServiceGroup( |
|
43 |
name=u'foo', |
|
44 |
) |
|
45 |
DBSession.add(servicegroup) |
|
46 |
|
|
47 |
service = LowLevelService( |
|
48 |
host=host, |
|
49 |
command=u'', |
|
50 |
weight=42, |
|
51 |
servicename=u'baz', |
|
52 |
op_dep=u'&', |
|
53 |
) |
|
54 |
DBSession.add(service) |
|
55 |
DBSession.flush() |
|
56 |
|
|
57 |
servicegroup.services.append(service) |
|
58 |
DBSession.flush() |
|
59 |
|
|
60 |
event = Event( |
|
61 |
timestamp=timestamp, |
|
62 |
current_state=StateName.statename_to_value(u'WARNING'), |
|
63 |
message=u'Hello world', |
|
64 |
) |
|
65 |
if return_service: |
|
66 |
event.supitem = service |
|
67 |
else: |
|
68 |
event.supitem = host |
|
69 |
DBSession.add(event) |
|
70 |
DBSession.flush() |
|
71 |
|
|
72 |
correvent = CorrEvent( |
|
73 |
impact=42, |
|
74 |
priority=42, |
|
75 |
trouble_ticket=None, |
|
76 |
status=u'None', |
|
77 |
occurrence=42, |
|
78 |
timestamp_active=timestamp, |
|
79 |
cause=event, |
|
80 |
) |
|
81 |
correvent.events.append(event) |
|
82 |
DBSession.add(correvent) |
|
83 |
DBSession.flush() |
|
84 |
|
|
85 |
return (hostgroup, correvent.idcorrevent) |
|
86 |
|
|
87 |
class TestHistoryForm(TestController): |
|
88 |
"""Teste le dialogue pour l'accès aux historiques.""" |
|
89 |
|
|
90 |
def test_history_form_LLS_alert_when_allowed(self): |
|
91 |
"""Teste le formulaire d'historique avec un LLS (alerte visible).""" |
|
92 |
hostgroup, idcorrevent = insert_deps(True) |
|
93 |
manage = Permission.by_permission_name(u'manage') |
|
94 |
manage.hostgroups.append(hostgroup) |
|
95 |
DBSession.flush() |
|
96 |
transaction.commit() |
|
97 |
|
|
98 |
response = self.app.post('/get_plugin_value', { |
|
99 |
'idcorrevent': idcorrevent, |
|
100 |
'plugin_name': 'history', |
|
101 |
}, extra_environ={'REMOTE_USER': 'manager'}) |
|
102 |
json = response.json |
|
103 |
|
|
104 |
# Le contenu de "eventdetails" varie facilement. |
|
105 |
# On le teste séparément. |
|
106 |
json.pop('eventdetails', None) |
|
107 |
assert_true('eventdetails' in response.json) |
|
108 |
|
|
109 |
assert_equal(json, { |
|
110 |
"idcorrevent": idcorrevent, |
|
111 |
"service": "baz", |
|
112 |
"peak_state": "WARNING", |
|
113 |
"current_state": "WARNING", |
|
114 |
"host": "bar", |
|
115 |
"initial_state": "WARNING" |
|
116 |
}) |
|
117 |
|
|
118 |
def test_history_form_host_alert_when_allowed(self): |
|
119 |
"""Teste le formulaire d'historique avec un hôte (alerte visible).""" |
|
120 |
hostgroup, idcorrevent = insert_deps(False) |
|
121 |
manage = Permission.by_permission_name(u'manage') |
|
122 |
manage.hostgroups.append(hostgroup) |
|
123 |
DBSession.flush() |
|
124 |
transaction.commit() |
|
125 |
|
|
126 |
response = self.app.post('/get_plugin_value', { |
|
127 |
'idcorrevent': idcorrevent, |
|
128 |
'plugin_name': 'history', |
|
129 |
}, extra_environ={'REMOTE_USER': 'manager'}) |
|
130 |
json = response.json |
|
131 |
|
|
132 |
# Le contenu de "eventdetails" varie facilement. |
|
133 |
# On le teste séparément. |
|
134 |
json.pop('eventdetails', None) |
|
135 |
assert_true('eventdetails' in response.json) |
|
136 |
|
|
137 |
assert_equal(json, { |
|
138 |
"idcorrevent": idcorrevent, |
|
139 |
"service": None, |
|
140 |
"peak_state": "WARNING", |
|
141 |
"current_state": "WARNING", |
|
142 |
"host": "bar", |
|
143 |
"initial_state": "WARNING" |
|
144 |
}) |
|
145 |
|
|
146 |
|
|
147 |
def test_history_form_LLS_when_forbidden(self): |
|
148 |
"""Teste le formulaire d'historique avec un LLS (alerte invisible).""" |
|
149 |
idcorrevent = insert_deps(True)[1] |
|
150 |
DBSession.flush() |
|
151 |
transaction.commit() |
|
152 |
|
|
153 |
self.app.post('/get_plugin_value', { |
|
154 |
'idcorrevent': idcorrevent, |
|
155 |
'plugin_name': 'history', |
|
156 |
}, extra_environ={'REMOTE_USER': 'manager'}, |
|
157 |
status=302) |
|
158 |
|
|
159 |
def test_history_form_host_when_forbidden(self): |
|
160 |
"""Teste le formulaire d'historique avec un hôte (alerte invisible).""" |
|
161 |
idcorrevent = insert_deps(False)[1] |
|
162 |
DBSession.flush() |
|
163 |
transaction.commit() |
|
164 |
|
|
165 |
self.app.post('/get_plugin_value', { |
|
166 |
'idcorrevent': idcorrevent, |
|
167 |
'plugin_name': 'history', |
|
168 |
}, extra_environ={'REMOTE_USER': 'manager'}, |
|
169 |
status=302) |
|
170 |
|
vigiboard/tests/functional/test_history_table.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# vim:set expandtab tabstop=4 shiftwidth=4: |
|
3 |
""" |
|
4 |
Vérifie que la page qui affiche l'historique des actions sur un événement |
|
5 |
brut fonctionne correctement. |
|
6 |
""" |
|
7 |
|
|
8 |
from nose.tools import assert_true, assert_equal |
|
9 |
from datetime import datetime |
|
10 |
import transaction |
|
11 |
|
|
12 |
from vigilo.models.configure import DBSession |
|
13 |
from vigilo.models import Event, EventHistory, CorrEvent, \ |
|
14 |
Permission, StateName, \ |
|
15 |
Host, HostGroup, LowLevelService, ServiceGroup |
|
16 |
from vigiboard.tests import TestController |
|
17 |
|
|
18 |
def populate_DB(): |
|
19 |
""" Peuple la base de données. """ |
|
20 |
# On ajoute un groupe d'hôtes et un groupe de services. |
|
21 |
hostmanagers = HostGroup(name = u'managersgroup') |
|
22 |
DBSession.add(hostmanagers) |
|
23 |
servicemanagers = ServiceGroup(name = u'managersgroup') |
|
24 |
DBSession.add(servicemanagers) |
|
25 |
DBSession.flush() |
|
26 |
|
|
27 |
# On ajoute la permission 'manage' à ces deux groupes. |
|
28 |
manage_perm = Permission.by_permission_name(u'manage') |
|
29 |
hostmanagers.permissions.append(manage_perm) |
|
30 |
servicemanagers.permissions.append(manage_perm) |
|
31 |
DBSession.flush() |
|
32 |
|
|
33 |
# On crée un hôte de test, et on l'ajoute au groupe d'hôtes. |
|
34 |
managerhost = Host( |
|
35 |
name = u'managerhost', |
|
36 |
checkhostcmd = u'halt', |
|
37 |
snmpcommunity = u'public', |
|
38 |
hosttpl = u'/dev/null', |
|
39 |
mainip = u'192.168.1.1', |
|
40 |
snmpport = 42, |
|
41 |
weight = 42, |
|
42 |
) |
|
43 |
DBSession.add(managerhost) |
|
44 |
hostmanagers.hosts.append(managerhost) |
|
45 |
DBSession.flush() |
|
46 |
|
|
47 |
# On crée un services de bas niveau, et on l'ajoute au groupe de services. |
|
48 |
managerservice = LowLevelService( |
|
49 |
host = managerhost, |
|
50 |
servicename = u'managerservice', |
|
51 |
command = u'halt', |
|
52 |
op_dep = u'+', |
|
53 |
weight = 42, |
|
54 |
) |
|
55 |
DBSession.add(managerservice) |
|
56 |
servicemanagers.services.append(managerservice) |
|
57 |
DBSession.flush() |
|
58 |
|
|
59 |
return (managerhost, managerservice) |
|
60 |
|
|
61 |
def add_correvent_caused_by(supitem): |
|
62 |
""" |
|
63 |
Ajoute dans la base de données un évènement corrélé causé |
|
64 |
par un incident survenu sur l'item passé en paramètre. |
|
65 |
Génère un historique pour les tests. |
|
66 |
""" |
|
67 |
|
|
68 |
# Ajout d'un événement |
|
69 |
event = Event( |
|
70 |
supitem = supitem, |
|
71 |
message = u'foo', |
|
72 |
current_state = StateName.statename_to_value(u"WARNING"), |
|
73 |
) |
|
74 |
DBSession.add(event) |
|
75 |
DBSession.flush() |
|
76 |
|
|
77 |
# Ajout des historiques |
|
78 |
DBSession.add(EventHistory( |
|
79 |
type_action=u'Nagios update state', |
|
80 |
idevent=event.idevent, |
|
81 |
timestamp=datetime.now())) |
|
82 |
DBSession.add(EventHistory( |
|
83 |
type_action=u'Acknowlegement change state', |
|
84 |
idevent=event.idevent, |
|
85 |
timestamp=datetime.now())) |
|
86 |
DBSession.flush() |
|
87 |
|
|
88 |
# Ajout d'un événement corrélé |
|
89 |
aggregate = CorrEvent( |
|
90 |
idcause = event.idevent, |
|
91 |
timestamp_active = datetime.now(), |
|
92 |
priority = 1, |
|
93 |
status = u"None") |
|
94 |
aggregate.events.append(event) |
|
95 |
DBSession.add(aggregate) |
|
96 |
DBSession.flush() |
|
97 |
|
|
98 |
return event.idevent |
|
99 |
|
|
100 |
|
|
101 |
class TestHistoryTable(TestController): |
|
102 |
""" |
|
103 |
Teste . |
|
104 |
""" |
|
105 |
|
|
106 |
def setUp(self): |
|
107 |
super(TestEventTable, self).setUp() |
|
108 |
|
|
109 |
def test_cause_host_history(self): |
|
110 |
"""Historique de la cause d'un événement corrélé sur un hôte.""" |
|
111 |
|
|
112 |
# On peuple la BDD avec un hôte, un service de bas niveau, |
|
113 |
# et un groupe d'hôtes et de services associés à ces items. |
|
114 |
(managerhost, managerservice) = populate_DB() |
|
115 |
|
|
116 |
# On ajoute un évènement corrélé causé par l'hôte |
|
117 |
idevent = add_correvent_caused_by(managerhost) |
|
118 |
transaction.commit() |
|
119 |
|
|
120 |
# L'utilisateur n'est pas authentifié. |
|
121 |
# On s'attend à ce qu'une erreur 401 soit renvoyée, |
|
122 |
# demandant à l'utilisateur de s'authentifier. |
|
123 |
response = self.app.get( |
|
124 |
'/event/%d' % idevent, |
|
125 |
status = 401) |
|
126 |
|
|
127 |
# L'utilisateur N'A PAS les bonnes permissions. |
|
128 |
environ = {'REMOTE_USER': 'editor'} |
|
129 |
|
|
130 |
# On s'attend à ce qu'une erreur 302 soit renvoyée, et à |
|
131 |
# ce qu'un message d'erreur précise à l'utilisateur qu'il |
|
132 |
# n'a pas accès aux informations concernant cet évènement. |
|
133 |
response = self.app.get( |
|
134 |
'/event/%d' % idevent, |
|
135 |
status = 302, |
|
136 |
extra_environ = environ) |
|
137 |
|
|
138 |
# On suit la redirection. |
|
139 |
response = response.follow(status = 200, extra_environ = environ) |
|
140 |
assert_true(len(response.lxml.xpath( |
|
141 |
'//div[@id="flash"]/div[@class="error"]'))) |
|
142 |
|
|
143 |
# L'utilisateur a les bonnes permissions. |
|
144 |
environ = {'REMOTE_USER': 'manager'} |
|
145 |
|
|
146 |
# On s'attend à ce que le statut de la requête soit 200. |
|
147 |
response = self.app.get( |
|
148 |
'/event/%d' % idevent, |
|
149 |
status = 200, |
|
150 |
extra_environ = environ) |
|
151 |
|
|
152 |
# Il doit y avoir 2 lignes de résultats. |
|
153 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
154 |
assert_equal(len(rows), 2) |
|
155 |
|
|
156 |
def test_cause_service_history(self): |
|
157 |
"""Historique de la cause d'un événement corrélé sur un LLS.""" |
|
158 |
|
|
159 |
# On peuple la BDD avec un hôte, un service de bas niveau, |
|
160 |
# et un groupe d'hôtes et de services associés à ces items. |
|
161 |
(managerhost, managerservice) = populate_DB() |
|
162 |
|
|
163 |
# On ajoute un évènement corrélé causé par le service |
|
164 |
idevent = add_correvent_caused_by(managerservice) |
|
165 |
|
|
166 |
transaction.commit() |
|
167 |
|
|
168 |
# L'utilisateur n'est pas authentifié. |
|
169 |
# On s'attend à ce qu'une erreur 401 soit renvoyée, |
|
170 |
# demandant à l'utilisateur de s'authentifier. |
|
171 |
response = self.app.get( |
|
172 |
'/event/%d' % idevent, |
|
173 |
status = 401) |
|
174 |
|
|
175 |
# L'utilisateur N'A PAS les bonnes permissions. |
|
176 |
environ = {'REMOTE_USER': 'editor'} |
|
177 |
|
|
178 |
# On s'attend à ce qu'une erreur 302 soit renvoyée, et à |
|
179 |
# ce qu'un message d'erreur précise à l'utilisateur qu'il |
|
180 |
# n'a pas accès aux informations concernant cet évènement. |
|
181 |
response = self.app.get( |
|
182 |
'/event/%d' % idevent, |
|
183 |
status = 302, |
|
184 |
extra_environ = environ) |
|
185 |
|
|
186 |
# On suit la redirection. |
|
187 |
response = response.follow(status = 200, extra_environ = environ) |
|
188 |
assert_true(len(response.lxml.xpath( |
|
189 |
'//div[@id="flash"]/div[@class="error"]'))) |
|
190 |
|
|
191 |
# L'utilisateur a les bonnes permissions. |
|
192 |
environ = {'REMOTE_USER': 'manager'} |
|
193 |
|
|
194 |
# On s'attend à ce que le statut de la requête soit 200. |
|
195 |
response = self.app.get( |
|
196 |
'/event/%d' % idevent, |
|
197 |
status = 200, |
|
198 |
extra_environ = environ) |
|
199 |
|
|
200 |
# Il doit y avoir 2 lignes de résultats. |
|
201 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
202 |
assert_equal(len(rows), 2) |
|
203 |
|
vigiboard/tests/functional/test_history_tables.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# vim:set expandtab tabstop=4 shiftwidth=4: |
|
3 |
""" |
|
4 |
Test du tableau d'événements de Vigiboard |
|
5 |
""" |
|
6 |
|
|
7 |
from nose.tools import assert_true, assert_equal |
|
8 |
from datetime import datetime |
|
9 |
import transaction |
|
10 |
|
|
11 |
from vigilo.models.configure import DBSession |
|
12 |
from vigilo.models import Event, EventHistory, CorrEvent, \ |
|
13 |
Permission, StateName, \ |
|
14 |
Host, HostGroup, LowLevelService, ServiceGroup |
|
15 |
from vigiboard.tests import TestController |
|
16 |
|
|
17 |
def populate_DB(): |
|
18 |
""" Peuple la base de données. """ |
|
19 |
# On ajoute un groupe d'hôtes et un groupe de services. |
|
20 |
hostmanagers = HostGroup(name = u'managersgroup') |
|
21 |
DBSession.add(hostmanagers) |
|
22 |
servicemanagers = ServiceGroup(name = u'managersgroup') |
|
23 |
DBSession.add(servicemanagers) |
|
24 |
DBSession.flush() |
|
25 |
|
|
26 |
# On ajoute la permission 'manage' à ces deux groupes. |
|
27 |
manage_perm = Permission.by_permission_name(u'manage') |
|
28 |
hostmanagers.permissions.append(manage_perm) |
|
29 |
servicemanagers.permissions.append(manage_perm) |
|
30 |
DBSession.flush() |
|
31 |
|
|
32 |
# On crée un hôte de test, et on l'ajoute au groupe d'hôtes. |
|
33 |
managerhost = Host( |
|
34 |
name = u'managerhost', |
|
35 |
checkhostcmd = u'halt', |
|
36 |
snmpcommunity = u'public', |
|
37 |
hosttpl = u'/dev/null', |
|
38 |
mainip = u'192.168.1.1', |
|
39 |
snmpport = 42, |
|
40 |
weight = 42, |
|
41 |
) |
|
42 |
DBSession.add(managerhost) |
|
43 |
hostmanagers.hosts.append(managerhost) |
|
44 |
DBSession.flush() |
|
45 |
|
|
46 |
# On crée un services de bas niveau, et on l'ajoute au groupe de services. |
|
47 |
managerservice = LowLevelService( |
|
48 |
host = managerhost, |
|
49 |
servicename = u'managerservice', |
|
50 |
command = u'halt', |
|
51 |
op_dep = u'+', |
|
52 |
weight = 42, |
|
53 |
) |
|
54 |
DBSession.add(managerservice) |
|
55 |
servicemanagers.services.append(managerservice) |
|
56 |
DBSession.flush() |
|
57 |
|
|
58 |
return (managerhost, managerservice) |
|
59 |
|
|
60 |
def add_correvent_caused_by(supitem, |
|
61 |
correvent_status=u"None", event_status=u"WARNING"): |
|
62 |
""" |
|
63 |
Ajoute dans la base de données un évènement corrélé causé |
|
64 |
par un incident survenu sur l'item passé en paramètre. |
|
65 |
Génère un historique pour les tests. |
|
66 |
""" |
|
67 |
|
|
68 |
# Ajout d'un événement |
|
69 |
event = Event( |
|
70 |
supitem = supitem, |
|
71 |
message = u'foo', |
|
72 |
current_state = StateName.statename_to_value(event_status), |
|
73 |
) |
|
74 |
DBSession.add(event) |
|
75 |
DBSession.flush() |
|
76 |
|
|
77 |
# Ajout des historiques |
|
78 |
DBSession.add(EventHistory( |
|
79 |
type_action=u'Nagios update state', |
|
80 |
idevent=event.idevent, |
|
81 |
timestamp=datetime.now())) |
|
82 |
DBSession.add(EventHistory( |
|
83 |
type_action=u'Acknowlegement change state', |
|
84 |
idevent=event.idevent, |
|
85 |
timestamp=datetime.now())) |
|
86 |
DBSession.flush() |
|
87 |
|
|
88 |
# Ajout d'un événement corrélé |
|
89 |
aggregate = CorrEvent( |
|
90 |
idcause = event.idevent, |
|
91 |
timestamp_active = datetime.now(), |
|
92 |
priority = 1, |
|
93 |
status = correvent_status) |
|
94 |
aggregate.events.append(event) |
|
95 |
DBSession.add(aggregate) |
|
96 |
DBSession.flush() |
|
97 |
|
|
98 |
return aggregate.idcorrevent |
|
99 |
|
|
100 |
|
|
101 |
class TestEventTable(TestController): |
|
102 |
""" |
|
103 |
Test des historiques de Vigiboard. |
|
104 |
""" |
|
105 |
|
|
106 |
def setUp(self): |
|
107 |
super(TestEventTable, self).setUp() |
|
108 |
|
|
109 |
def test_host_event_history(self): |
|
110 |
"""Affichage de l'historique d'un évènement corrélé pour un hôte.""" |
|
111 |
|
|
112 |
# On peuple la BDD avec un hôte, un service de bas niveau, |
|
113 |
# et un groupe d'hôtes et de services associés à ces items. |
|
114 |
(managerhost, managerservice) = populate_DB() |
|
115 |
|
|
116 |
# On ajoute un évènement corrélé causé par l'hôte |
|
117 |
aggregate_id = add_correvent_caused_by(managerhost) |
|
118 |
|
|
119 |
transaction.commit() |
|
120 |
|
|
121 |
### 1er cas : L'utilisateur utilisé pour |
|
122 |
# se connecter à Vigiboard est 'editor'. |
|
123 |
environ = {'REMOTE_USER': 'editor'} |
|
124 |
|
|
125 |
# On s'attend à ce qu'une erreur 302 soit renvoyée, et à |
|
126 |
# ce qu'un message d'erreur précise à l'utilisateur qu'il |
|
127 |
# n'a pas accès aux informations concernant cet évènement. |
|
128 |
response = self.app.get( |
|
129 |
'/event?idcorrevent=' + str(aggregate_id), |
|
130 |
status = 302, |
|
131 |
extra_environ = environ) |
|
132 |
|
|
133 |
response = self.app.get( |
|
134 |
'/', |
|
135 |
status = 200, |
|
136 |
extra_environ = environ) |
|
137 |
assert_true(response.lxml.xpath( |
|
138 |
'//div[@id="flash"]/div[@class="error"]')) |
|
139 |
|
|
140 |
### 2nd cas : L'utilisateur utilisé pour |
|
141 |
# se connecter à Vigiboard est 'manager'. |
|
142 |
environ = {'REMOTE_USER': 'manager'} |
|
143 |
|
|
144 |
# On s'attend à ce que le statut de la requête soit 200. |
|
145 |
response = self.app.get( |
|
146 |
'/event?idcorrevent=' + str(aggregate_id), |
|
147 |
status = 200, |
|
148 |
extra_environ = environ) |
|
149 |
|
|
150 |
# Il doit y avoir 2 lignes de résultats. |
|
151 |
rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr') |
|
152 |
assert_equal(len(rows), 2) |
|
153 |
|
|
154 |
def test_service_event_history(self): |
|
155 |
"""Affichage de l'historique d'un évènement corrélé pour un SBN.""" |
|
156 |
|
|
157 |
# On peuple la BDD avec un hôte, un service de bas niveau, |
|
158 |
# et un groupe d'hôtes et de services associés à ces items. |
|
159 |
(managerhost, managerservice) = populate_DB() |
|
160 |
|
|
161 |
# On ajoute un évènement corrélé causé par le service |
|
162 |
aggregate_id = add_correvent_caused_by(managerservice) |
|
163 |
|
|
164 |
transaction.commit() |
|
165 |
|
|
166 |
### 1er cas : L'utilisateur utilisé pour |
|
167 |
# se connecter à Vigiboard est 'editor'. |
|
168 |
environ = {'REMOTE_USER': 'editor'} |
|
169 |
|
|
170 |
# On s'attend à ce qu'une erreur 302 soit renvoyée, et à |
|
171 |
# ce qu'un message d'erreur précise à l'utilisateur qu'il |
|
172 |
# n'a pas accès aux informations concernant cet évènement. |
|
173 |
response = self.app.get( |
|
174 |
'/event?idcorrevent=' + str(aggregate_id), |
|
175 |
status = 302, |
|
176 |
extra_environ = environ) |
|
177 |
|
|
178 |
response = self.app.get( |
|
179 |
'/', |
|
180 |
status = 200, |
|
181 |
extra_environ = environ) |
|
182 |
assert_true(response.lxml.xpath( |
|
183 |
'//div[@id="flash"]/div[@class="error"]')) |
|
184 |
|
|
185 |
### 2nd cas : L'utilisateur utilisé pour |
|
186 |
# se connecter à Vigiboard est 'manager'. |
|
187 |
environ = {'REMOTE_USER': 'manager'} |
|
188 |
|
|
189 |
# On s'attend à ce que le statut de la requête soit 200. |
|
190 |
response = self.app.get( |
|
191 |
'/event?idcorrevent=' + str(aggregate_id), |
|
192 |
status = 200, |
|
193 |
extra_environ = environ) |
|
194 |
|
|
195 |
# Il doit y avoir 2 lignes de résultats. |
|
196 |
rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr') |
|
197 |
assert_equal(len(rows), 2) |
|
198 |
|
|
199 |
def test_host_history(self): |
|
200 |
"""Affichage de l'historique d'un hôte.""" |
|
201 |
|
|
202 |
# On peuple la BDD avec un hôte, un service de bas niveau, |
|
203 |
# et un groupe d'hôtes et de services associés à ces items. |
|
204 |
(managerhost, managerservice) = populate_DB() |
|
205 |
|
|
206 |
# On ajoute deux évènements corrélés causés par l'hôte : |
|
207 |
# le premier encore ouvert, le second clos par un utilisateur. |
|
208 |
add_correvent_caused_by(managerhost) |
|
209 |
add_correvent_caused_by(managerhost, u"AAClosed", u"OK") |
|
210 |
|
|
211 |
transaction.commit() |
|
212 |
DBSession.add(managerhost) |
|
213 |
|
|
214 |
### 1er cas : L'utilisateur utilisé pour |
|
215 |
# se connecter à Vigiboard est 'editor'. |
|
216 |
environ = {'REMOTE_USER': 'editor'} |
|
217 |
|
|
218 |
# On s'attend à ce qu'une erreur 302 soit renvoyée, et à |
|
219 |
# ce qu'un message d'erreur précise à l'utilisateur qu'il |
|
220 |
# n'a pas accès aux informations concernant cet évènement. |
|
221 |
response = self.app.get( |
|
222 |
'/host_service/' + managerhost.name, |
|
223 |
status = 302, |
|
224 |
extra_environ = environ) |
|
225 |
|
|
226 |
response = self.app.get( |
|
227 |
'/', |
|
228 |
status = 200, |
|
229 |
extra_environ = environ) |
|
230 |
assert_true(response.lxml.xpath( |
|
231 |
'//div[@id="flash"]/div[@class="error"]')) |
|
232 |
|
|
233 |
### 2nd cas : L'utilisateur utilisé pour |
|
234 |
# se connecter à Vigiboard est 'manager'. |
|
235 |
environ = {'REMOTE_USER': 'manager'} |
|
236 |
|
|
237 |
# On s'attend à ce que le statut de la requête soit 200. |
|
238 |
response = self.app.get( |
|
239 |
'/host_service/' + managerhost.name, |
|
240 |
status = 200, |
|
241 |
extra_environ = environ) |
|
242 |
|
|
243 |
# Il doit y avoir 2 lignes d'évènements |
|
244 |
# + 2 lignes contenant les tableaux d'historiques. |
|
245 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
246 |
assert_equal(len(rows), 2 + 2) |
|
247 |
# Et 4 lignes d'historiques dans les tableaux d'historiques. |
|
248 |
rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr') |
|
249 |
assert_equal(len(rows), 4) |
|
250 |
|
|
251 |
def test_service_history(self): |
|
252 |
"""Affichage de l'historique d'un service de bas niveau.""" |
|
253 |
|
|
254 |
# On peuple la BDD avec un hôte, un service de bas niveau, |
|
255 |
# et un groupe d'hôtes et de services associés à ces items. |
|
256 |
(managerhost, managerservice) = populate_DB() |
|
257 |
|
|
258 |
# On ajoute deux évènements corrélés causés par le service : |
|
259 |
# le premier encore ouvert, le second clos par un utilisateur. |
|
260 |
add_correvent_caused_by(managerservice) |
|
261 |
add_correvent_caused_by(managerservice, u"AAClosed", u"OK") |
|
262 |
|
|
263 |
transaction.commit() |
|
264 |
DBSession.add(managerhost) |
|
265 |
DBSession.add(managerservice) |
|
266 |
|
|
267 |
### 1er cas : L'utilisateur utilisé pour |
|
268 |
# se connecter à Vigiboard est 'editor'. |
|
269 |
environ = {'REMOTE_USER': 'editor'} |
|
270 |
|
|
271 |
# On s'attend à ce qu'une erreur 302 soit renvoyée, et à |
|
272 |
# ce qu'un message d'erreur précise à l'utilisateur qu'il |
|
273 |
# n'a pas accès aux informations concernant cet évènement. |
|
274 |
response = self.app.get( |
|
275 |
'/host_service/' + managerhost.name |
|
276 |
+ '/' + managerservice.servicename, |
|
277 |
status = 302, |
|
278 |
extra_environ = environ) |
|
279 |
|
|
280 |
response = self.app.get( |
|
281 |
'/', |
|
282 |
status = 200, |
|
283 |
extra_environ = environ) |
|
284 |
assert_true(response.lxml.xpath( |
|
285 |
'//div[@id="flash"]/div[@class="error"]')) |
|
286 |
|
|
287 |
### 2nd cas : L'utilisateur utilisé pour |
|
288 |
# se connecter à Vigiboard est 'manager'. |
|
289 |
environ = {'REMOTE_USER': 'manager'} |
|
290 |
|
|
291 |
# On s'attend à ce que le statut de la requête soit 200. |
|
292 |
response = self.app.get( |
|
293 |
'/host_service/' + managerhost.name |
|
294 |
+ '/' + managerservice.servicename, |
|
295 |
status = 200, |
|
296 |
extra_environ = environ) |
|
297 |
|
|
298 |
# Il doit y avoir 2 lignes d'évènements |
|
299 |
# + 2 lignes contenant les tableaux d'historiques. |
|
300 |
rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr') |
|
301 |
assert_equal(len(rows), 2 + 2) |
|
302 |
# Et 4 lignes d'historiques dans les tableaux d'historiques. |
|
303 |
rows = response.lxml.xpath('//table[@class="history_table"]/tbody/tr') |
|
304 |
assert_equal(len(rows), 4) |
|
305 |
|
vigiboard/tests/functional/test_host_vigiboardrequest.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# vim:set expandtab tabstop=4 shiftwidth=4: |
|
3 |
""" |
|
4 |
Test de la classe Vigiboard Request pour des requêtes concernant les hôtes |
|
5 |
""" |
|
6 |
|
|
7 |
from nose.tools import assert_true |
|
8 |
from datetime import datetime |
|
9 |
import tg |
|
10 |
import transaction |
|
11 |
|
|
12 |
from vigilo.models.configure import DBSession |
|
13 |
from vigilo.models import Event, EventHistory, CorrEvent, \ |
|
14 |
Permission, User, StateName, \ |
|
15 |
Host, HostGroup |
|
16 |
from vigiboard.tests import TestController |
|
17 |
from vigiboard.controllers.vigiboardrequest import VigiboardRequest |
|
18 |
|
|
19 |
class TestHostVigiboardRequest(TestController): |
|
20 |
""" Préparation de la base de données en vue des tests. """ |
|
21 |
|
|
22 |
def setUp(self): |
|
23 |
super(TestHostVigiboardRequest, self).setUp() |
|
24 |
|
|
25 |
# On peuple la base de données. |
|
26 |
|
|
27 |
# On ajoute les groupes et leurs dépendances |
|
28 |
self.hosteditors = HostGroup(name=u'editorsgroup') |
|
29 |
DBSession.add(self.hosteditors) |
|
30 |
self.hostmanagers = HostGroup(name=u'managersgroup', |
|
31 |
parent=self.hosteditors) |
|
32 |
DBSession.add(self.hostmanagers) |
|
33 |
DBSession.flush() |
|
34 |
|
|
35 |
manage_perm = Permission.by_permission_name(u'manage') |
|
36 |
edit_perm = Permission.by_permission_name(u'edit') |
|
37 |
|
|
38 |
self.hostmanagers.permissions.append(manage_perm) |
|
39 |
self.hosteditors.permissions.append(edit_perm) |
|
40 |
DBSession.flush() |
|
41 |
|
|
42 |
# Création des hôtes de test. |
|
43 |
host_template = { |
|
44 |
'checkhostcmd': u'halt', |
|
45 |
'snmpcommunity': u'public', |
|
46 |
'hosttpl': u'/dev/null', |
|
47 |
'mainip': u'192.168.1.1', |
|
48 |
'snmpport': 42, |
|
49 |
'weight': 42, |
|
50 |
} |
|
51 |
|
|
52 |
managerhost = Host(name=u'managerhost', **host_template) |
|
53 |
editorhost = Host(name=u'editorhost', **host_template) |
|
54 |
DBSession.add(managerhost) |
|
55 |
DBSession.add(editorhost) |
|
56 |
|
|
57 |
# Affectation des hôtes aux groupes. |
|
58 |
self.hosteditors.hosts.append(editorhost) |
|
59 |
self.hostmanagers.hosts.append(managerhost) |
|
60 |
DBSession.flush() |
|
61 |
|
|
62 |
# Ajout des événements eux-mêmes |
|
63 |
event_template = { |
|
64 |
'message': u'foo', |
|
65 |
'current_state': StateName.statename_to_value(u'WARNING'), |
|
66 |
} |
|
67 |
event1 = Event(supitem=managerhost, **event_template) |
|
68 |
event2 = Event(supitem=editorhost, **event_template) |
|
69 |
DBSession.add(event1) |
|
70 |
DBSession.add(event2) |
|
71 |
DBSession.flush() |
|
72 |
|
|
73 |
# Ajout des historiques |
|
74 |
DBSession.add(EventHistory(type_action=u'Nagios update state', |
|
75 |
idevent=event1.idevent, timestamp=datetime.now())) |
|
76 |
DBSession.add(EventHistory(type_action=u'Acknowledgement change state', |
|
77 |
idevent=event1.idevent, timestamp=datetime.now())) |
|
78 |
DBSession.add(EventHistory(type_action=u'Nagios update state', |
|
79 |
idevent=event2.idevent, timestamp=datetime.now())) |
|
80 |
DBSession.add(EventHistory(type_action=u'Acknowledgement change state', |
|
81 |
idevent=event2.idevent, timestamp=datetime.now())) |
|
82 |
DBSession.flush() |
|
83 |
|
|
84 |
# Ajout des événements corrélés |
|
85 |
aggregate_template = { |
|
86 |
'timestamp_active': datetime.now(), |
|
87 |
'priority': 1, |
|
88 |
'status': u'None', |
|
89 |
} |
|
90 |
self.aggregate1 = CorrEvent( |
|
91 |
idcause=event1.idevent, **aggregate_template) |
|
92 |
self.aggregate2 = CorrEvent( |
|
93 |
idcause=event2.idevent, **aggregate_template) |
|
94 |
|
|
95 |
self.aggregate1.events.append(event1) |
|
96 |
self.aggregate2.events.append(event2) |
|
97 |
DBSession.add(self.aggregate1) |
|
98 |
DBSession.add(self.aggregate2) |
|
99 |
DBSession.flush() |
|
100 |
transaction.commit() |
|
101 |
|
|
102 |
def tearDown(self): |
|
103 |
""" Nettoyage de la base de données après les tests. """ |
|
104 |
super(TestHostVigiboardRequest, self).tearDown() |
|
105 |
|
|
106 |
|
|
107 |
def test_request_creation(self): |
|
108 |
"""Génération d'une requête avec plugin et permissions.""" |
|
109 |
|
|
110 |
# On indique qui on est et on envoie une requête sur |
|
111 |
# l'index pour obtenir toutes les variables de sessions. |
|
112 |
environ = {'REMOTE_USER': 'editor'} |
|
113 |
response = self.app.get('/', extra_environ=environ) |
|
114 |
tg.request = response.request |
|
115 |
|
|
116 |
# Derrière, VigiboardRequest doit charger le plugin de tests tout seul |
|
117 |
vigi_req = VigiboardRequest(User.by_user_name(u'editor')) |
|
118 |
vigi_req.add_table( |
|
119 |
CorrEvent, |
|
120 |
vigi_req.items.c.hostname, |
|
121 |
vigi_req.items.c.servicename, |
|
122 |
) |
|
123 |
vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent)) |
|
124 |
vigi_req.add_join((vigi_req.items, |
|
125 |
Event.idsupitem == vigi_req.items.c.idsupitem)) |
|
126 |
|
|
127 |
# On vérifie que le nombre d'événements corrélés |
|
128 |
# trouvés par la requête est bien égal à 1. |
|
129 |
num_rows = vigi_req.num_rows() |
|
130 |
assert_true(num_rows == 1, |
|
131 |
msg = "One history should be available for " + |
|
132 |
"the user 'editor' but there are %d" % num_rows) |
|
133 |
|
|
134 |
vigi_req.format_events(0, 10) |
|
135 |
vigi_req.format_history() |
|
136 |
assert_true(len(vigi_req.events) == 1 + 1, |
|
137 |
msg = "One history should be available for user " + |
|
138 |
"'editor' but there are %d" % (len(vigi_req.events) - 1)) |
|
139 |
|
|
140 |
|
|
141 |
# On recommence les tests précédents avec l'utilisateur |
|
142 |
# manager (qui dispose de plus de droits). |
|
143 |
environ = {'REMOTE_USER': 'manager'} |
|
144 |
response = self.app.get('/', extra_environ=environ) |
|
145 |
tg.request = response.request |
|
146 |
|
|
147 |
vigi_req = VigiboardRequest(User.by_user_name(u'manager')) |
|
148 |
vigi_req.add_plugin(MonPlugin) |
|
149 |
vigi_req.add_table( |
|
150 |
CorrEvent, |
|
151 |
vigi_req.items.c.hostname, |
|
152 |
vigi_req.items.c.servicename, |
|
153 |
) |
|
154 |
vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent)) |
|
155 |
vigi_req.add_join((vigi_req.items, |
|
156 |
Event.idsupitem == vigi_req.items.c.idsupitem)) |
|
157 |
|
|
158 |
# On vérifie que le nombre d'événements corrélés |
|
159 |
# trouvés par la requête est bien égal à 2. |
|
160 |
num_rows = vigi_req.num_rows() |
|
161 |
assert_true(num_rows == 2, |
|
162 |
msg = "2 histories should be available for " + |
|
163 |
"the user 'manager' but there are %d" % num_rows) |
|
164 |
|
|
165 |
vigi_req.format_events(0, 10) |
|
166 |
vigi_req.format_history() |
|
167 |
assert_true(len(vigi_req.events) == 2 + 1, |
|
168 |
msg = "2 histories should be available for user " + |
Also available in: Unified diff