Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_host_vigiboardrequest.py @ 94f31908

History | View | Annotate | Download (6.36 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test de la classe Vigiboard Request pour des requêtes concernant les hôtes
5
"""
6

    
7
from nose.tools import assert_true
8
from datetime import datetime
9
import tg
10
import transaction
11

    
12
from vigilo.models.configure import DBSession
13
from vigilo.models import Event, EventHistory, CorrEvent, \
14
                            Permission, User, StateName, \
15
                            Host, HostGroup
16
from vigiboard.tests import TestController
17
from vigiboard.controllers.vigiboardrequest import VigiboardRequest
18

    
19
class TestHostVigiboardRequest(TestController):
20
    """ Préparation de la base de données en vue des tests. """
21

    
22
    def setUp(self):
23
        super(TestHostVigiboardRequest, self).setUp()
24

    
25
        # On peuple la base de données.
26

    
27
        # On ajoute les groupes et leurs dépendances
28
        self.hosteditors = HostGroup(name=u'editorsgroup')
29
        DBSession.add(self.hosteditors)
30
        self.hostmanagers = HostGroup(name=u'managersgroup', 
31
                                        parent=self.hosteditors)
32
        DBSession.add(self.hostmanagers)
33
        DBSession.flush()
34

    
35
        manage_perm = Permission.by_permission_name(u'manage')
36
        edit_perm = Permission.by_permission_name(u'edit')
37

    
38
        self.hostmanagers.permissions.append(manage_perm)
39
        self.hosteditors.permissions.append(edit_perm)
40
        DBSession.flush()
41

    
42
        # Création des hôtes de test.
43
        host_template = {
44
            'checkhostcmd': u'halt',
45
            'snmpcommunity': u'public',
46
            'hosttpl': u'/dev/null',
47
            'mainip': u'192.168.1.1',
48
            'snmpport': 42,
49
            'weight': 42,
50
        }
51

    
52
        managerhost = Host(name=u'managerhost', **host_template)
53
        editorhost = Host(name=u'editorhost', **host_template)
54
        DBSession.add(managerhost)
55
        DBSession.add(editorhost)
56

    
57
        # Affectation des hôtes aux groupes.
58
        self.hosteditors.hosts.append(editorhost)
59
        self.hostmanagers.hosts.append(managerhost)
60
        DBSession.flush()
61

    
62
        # Ajout des événements eux-mêmes
63
        event_template = {
64
            'message': u'foo',
65
            'current_state': StateName.statename_to_value(u'WARNING'),
66
        }
67
        event1 = Event(supitem=managerhost, **event_template)
68
        event2 = Event(supitem=editorhost, **event_template)
69
        DBSession.add(event1)
70
        DBSession.add(event2)
71
        DBSession.flush()
72

    
73
        # Ajout des historiques
74
        DBSession.add(EventHistory(type_action=u'Nagios update state',
75
            idevent=event1.idevent, timestamp=datetime.now()))
76
        DBSession.add(EventHistory(type_action=u'Acknowledgement change state',
77
            idevent=event1.idevent, timestamp=datetime.now()))
78
        DBSession.add(EventHistory(type_action=u'Nagios update state',
79
            idevent=event2.idevent, timestamp=datetime.now()))
80
        DBSession.add(EventHistory(type_action=u'Acknowledgement change state',
81
            idevent=event2.idevent, timestamp=datetime.now()))
82
        DBSession.flush()
83

    
84
        # Ajout des événements corrélés
85
        aggregate_template = {
86
            'timestamp_active': datetime.now(),
87
            'priority': 1,
88
            'status': u'None',
89
        }
90
        self.aggregate1 = CorrEvent(
91
            idcause=event1.idevent, **aggregate_template)
92
        self.aggregate2 = CorrEvent(
93
            idcause=event2.idevent, **aggregate_template)
94

    
95
        self.aggregate1.events.append(event1)
96
        self.aggregate2.events.append(event2)
97
        DBSession.add(self.aggregate1)
98
        DBSession.add(self.aggregate2)
99
        DBSession.flush()
100
        transaction.commit()
101

    
102
    def tearDown(self):
103
        """ Nettoyage de la base de données après les tests. """
104
        super(TestHostVigiboardRequest, self).tearDown()
105

    
106

    
107
    def test_request_creation(self):
108
        """Génération d'une requête avec plugin et permissions."""
109

    
110
        # On indique qui on est et on envoie une requête sur 
111
        # l'index pour obtenir toutes les variables de sessions.
112
        environ = {'REMOTE_USER': 'editor'}
113
        response = self.app.get('/', extra_environ=environ)
114
        tg.request = response.request
115

    
116
        # Derrière, VigiboardRequest doit charger le plugin de tests tout seul
117
        vigi_req = VigiboardRequest(User.by_user_name(u'editor'))
118
        vigi_req.add_table(
119
            CorrEvent,
120
            vigi_req.items.c.hostname,
121
            vigi_req.items.c.servicename,
122
        )
123
        vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent))
124
        vigi_req.add_join((vigi_req.items, 
125
            Event.idsupitem == vigi_req.items.c.idsupitem))
126

    
127
        # On vérifie que le nombre d'événements corrélés 
128
        # trouvés par la requête est bien égal à 1.
129
        num_rows = vigi_req.num_rows()
130
        assert_true(num_rows == 1, 
131
            msg = "One history should be available for " +
132
            "the user 'editor' but there are %d" % num_rows)
133

    
134
        vigi_req.format_events(0, 10)
135
        vigi_req.format_history()
136
        assert_true(len(vigi_req.events) == 1 + 1,
137
            msg = "One history should be available for user " +
138
            "'editor' but there are %d" % (len(vigi_req.events) - 1))
139

    
140

    
141
        # On recommence les tests précédents avec l'utilisateur
142
        # manager (qui dispose de plus de droits).
143
        environ = {'REMOTE_USER': 'manager'}
144
        response = self.app.get('/', extra_environ=environ)
145
        tg.request = response.request
146

    
147
        vigi_req = VigiboardRequest(User.by_user_name(u'manager'))
148
        vigi_req.add_plugin(MonPlugin)
149
        vigi_req.add_table(
150
            CorrEvent,
151
            vigi_req.items.c.hostname,
152
            vigi_req.items.c.servicename,
153
        )
154
        vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent))
155
        vigi_req.add_join((vigi_req.items, 
156
            Event.idsupitem == vigi_req.items.c.idsupitem))
157

    
158
        # On vérifie que le nombre d'événements corrélés 
159
        # trouvés par la requête est bien égal à 2.
160
        num_rows = vigi_req.num_rows()
161
        assert_true(num_rows == 2,  
162
            msg = "2 histories should be available for " +
163
            "the user 'manager' but there are %d" % num_rows)
164

    
165
        vigi_req.format_events(0, 10)
166
        vigi_req.format_history()
167
        assert_true(len(vigi_req.events) == 2 + 1,
168
            msg = "2 histories should be available for user " +
169
            "'manager' but there are %d" % (len(vigi_req.events) - 1))
170