-
Notifications
You must be signed in to change notification settings - Fork 176
Expand file tree
/
Copy pathcontroller_impl.go
More file actions
116 lines (99 loc) · 3.39 KB
/
controller_impl.go
File metadata and controls
116 lines (99 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package scrape
import (
"fmt"
"github.com/pkg/errors"
"github.com/stackrox/rox/central/sensor/service/common"
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/generated/internalapi/compliance"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/set"
"github.com/stackrox/rox/pkg/sync"
)
type controllerImpl struct {
stoppedSig concurrency.ReadOnlyErrorSignal
scrapes map[string]*scrapeImpl
scrapesMutex sync.RWMutex
msgInjector common.MessageInjector
}
func (s *controllerImpl) sendStartScrapeMsg(ctx concurrency.Waitable, scrape *scrapeImpl) error {
msg := ¢ral.MsgToSensor{
Msg: ¢ral.MsgToSensor_ScrapeCommand{
ScrapeCommand: ¢ral.ScrapeCommand{
ScrapeId: scrape.scrapeID,
Command: ¢ral.ScrapeCommand_StartScrape{
StartScrape: ¢ral.StartScrape{
Hostnames: scrape.expectedHosts.AsSlice(),
Standards: scrape.GetStandardIDs(),
},
},
},
},
}
return s.msgInjector.InjectMessage(ctx, msg)
}
func (s *controllerImpl) sendKillScrapeMsg(ctx concurrency.Waitable, scrape *scrapeImpl) error {
msg := ¢ral.MsgToSensor{
Msg: ¢ral.MsgToSensor_ScrapeCommand{
ScrapeCommand: ¢ral.ScrapeCommand{
ScrapeId: scrape.scrapeID,
Command: ¢ral.ScrapeCommand_KillScrape{
KillScrape: ¢ral.KillScrape{},
},
},
},
}
return s.msgInjector.InjectMessage(ctx, msg)
}
func (s *controllerImpl) RunScrape(expectedHosts set.StringSet, kill concurrency.Waitable, standardIDs []string) (map[string]*compliance.ComplianceReturn, error) {
// If no hosts need to be scraped, just bounce, otherwise we will be waiting for nothing.
if expectedHosts.Cardinality() == 0 {
return make(map[string]*compliance.ComplianceReturn), nil
}
// Create the scrape and register it so it can be updated.
scrape := newScrape(expectedHosts, set.NewStringSet(standardIDs...))
concurrency.WithLock(&s.scrapesMutex, func() {
s.scrapes[scrape.scrapeID] = scrape
})
defer concurrency.WithLock(&s.scrapesMutex, func() {
delete(s.scrapes, scrape.scrapeID)
})
if err := s.sendStartScrapeMsg(kill, scrape); err != nil {
return nil, err
}
defer func() {
if err := s.sendKillScrapeMsg(scrape.Stopped(), scrape); err != nil {
log.Errorf("tried to kill scrape %s but failed: %v", scrape.scrapeID, err)
}
}()
// Either receive a kill, or wait for the scrape to finish.
var err error
select {
case <-kill.Done():
err = errors.New("scrape stopped due to received kill command")
case <-s.stoppedSig.Done():
err = errors.Wrap(s.stoppedSig.Err(), "scrape stopped as sensor connection was terminated")
case <-scrape.Stopped().Done():
if scrapeErr := scrape.Stopped().Err(); scrapeErr != nil {
err = errors.Wrap(scrapeErr, "scrape failed")
}
}
return scrape.GetResults(), err
}
func (s *controllerImpl) getScrape(scrapeID string, remove bool) *scrapeImpl {
s.scrapesMutex.RLock()
defer s.scrapesMutex.RUnlock()
scrape := s.scrapes[scrapeID]
if remove {
delete(s.scrapes, scrapeID)
}
return scrape
}
// AcceptUpdate forwards the update to a matching registered scrape.
func (s *controllerImpl) ProcessScrapeUpdate(update *central.ScrapeUpdate) error {
scrape := s.getScrape(update.GetScrapeId(), update.GetScrapeKilled() != nil)
if scrape == nil {
return fmt.Errorf("received update for invalid scrape ID %q: %+v", update.GetScrapeId(), update)
}
scrape.AcceptUpdate(update)
return nil
}