-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathcommit_coverage.py
More file actions
159 lines (130 loc) · 5.42 KB
/
Copy pathcommit_coverage.py
File metadata and controls
159 lines (130 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import concurrent.futures
import io
import json
import os
import threading
import time
import hglib
import structlog
import zstandard
from tqdm import tqdm
from code_coverage_bot import hgmo
from code_coverage_bot.phabricator import PhabricatorUploader
from code_coverage_bot.secrets import secrets
from code_coverage_bot.utils import ThreadPoolExecutorResult
from code_coverage_bot.gcp import DEFAULT_FILTER
from code_coverage_bot.gcp import download_report
from code_coverage_bot.gcp import get_bucket
from code_coverage_bot.gcp import get_name
from code_coverage_bot.gcp import list_reports
logger = structlog.get_logger(__name__)
hg_servers = list()
hg_servers_lock = threading.Lock()
thread_local = threading.local()
def _init_thread(repo_dir: str) -> None:
hg_server = hglib.open(repo_dir)
thread_local.hg = hg_server
with hg_servers_lock:
hg_servers.append(hg_server)
def generate(server_address: str, repo_dir: str, out_dir: str = ".") -> None:
start_time = time.monotonic()
commit_coverage_path = os.path.join(out_dir, "commit_coverage.json.zst")
assert (
secrets[secrets.GOOGLE_CLOUD_STORAGE] is not None
), "Missing GOOGLE_CLOUD_STORAGE secret"
bucket = get_bucket(secrets[secrets.GOOGLE_CLOUD_STORAGE])
blob = bucket.blob("commit_coverage.json.zst")
if blob.exists():
dctx = zstandard.ZstdDecompressor()
commit_coverage = json.loads(
dctx.decompress(blob.download_as_bytes(raw_download=True))
)
else:
commit_coverage = {}
cctx = zstandard.ZstdCompressor(threads=-1)
def _upload():
blob = bucket.blob("commit_coverage.json.zst")
blob.upload_from_string(
cctx.compress(json.dumps(commit_coverage).encode("ascii"))
)
blob.content_type = "application/json"
blob.content_encoding = "zstd"
blob.patch()
# We are only interested in "overall" coverage, not platform or suite specific.
changesets_to_analyze = [
changeset
for changeset, platform, suite in list_reports(bucket, "mozilla-central")
if platform == DEFAULT_FILTER and suite == DEFAULT_FILTER
]
# Skip already analyzed changesets.
changesets_to_analyze = [
changeset
for changeset in changesets_to_analyze
if changeset not in commit_coverage
]
# Use the local server to generate the coverage mapping, as it is faster and
# correct.
def analyze_changeset(changeset_to_analyze: str) -> None:
report_name = get_name(
"mozilla-central", changeset_to_analyze, DEFAULT_FILTER, DEFAULT_FILTER
)
assert download_report(
os.path.join(out_dir, "ccov-reports"), bucket, report_name
)
with open(
os.path.join(out_dir, "ccov-reports", f"{report_name}.json"), "r"
) as f:
report = json.load(f)
phabricatorUploader = PhabricatorUploader(
repo_dir, changeset_to_analyze, warnings_enabled=False
)
# Use the hg.mozilla.org server to get the automation relevant changesets, since
# this information is broken in our local repo (which mozilla-unified).
with hgmo.HGMO(server_address=server_address) as hgmo_remote_server:
changesets = hgmo_remote_server.get_automation_relevance_changesets(
changeset_to_analyze
)
results = phabricatorUploader.generate(thread_local.hg, report, changesets)
for changeset in changesets:
# Lookup changeset coverage from phabricator uploader
coverage = results.get(changeset["node"])
if coverage is None:
logger.info("No coverage found", changeset=changeset)
commit_coverage[changeset["node"]] = None
continue
commit_coverage[changeset["node"]] = {
"added": sum(c["lines_added"] for c in coverage["paths"].values()),
"covered": sum(c["lines_covered"] for c in coverage["paths"].values()),
"unknown": sum(c["lines_unknown"] for c in coverage["paths"].values()),
}
max_workers = min(32, (os.cpu_count() or 1) + 4)
logger.info(f"Analyzing {len(changesets_to_analyze)} with {max_workers} workers")
with ThreadPoolExecutorResult(
initializer=_init_thread, initargs=(repo_dir,)
) as executor:
futures = [
executor.submit(analyze_changeset, changeset)
for changeset in changesets_to_analyze
]
for changeset, future in tqdm(
zip(changesets_to_analyze, concurrent.futures.as_completed(futures)),
total=len(futures),
):
exc = future.exception()
if exc is not None:
logger.error(f"Exception {exc} while analyzing {changeset}")
if time.monotonic() - start_time >= 600:
_upload()
start_time = time.monotonic()
while len(hg_servers) > 0:
hg_server = hg_servers.pop()
hg_server.close()
_upload()
with open(commit_coverage_path, "wb") as zf:
with cctx.stream_writer(zf) as compressor:
with io.TextIOWrapper(compressor, encoding="ascii") as f:
json.dump(commit_coverage, f)