-
Notifications
You must be signed in to change notification settings - Fork 76
Expand file tree
/
Copy pathcodeql.py
More file actions
156 lines (124 loc) · 6.58 KB
/
codeql.py
File metadata and controls
156 lines (124 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import subprocess
import json
from json.decoder import JSONDecodeError
import tempfile
from pathlib import Path
import csv
import yaml
from typing import *
class CodeQLError(Exception):
def __init__(self, reason, stdout=None, stderr=None, returncode=None):
self.reason = reason
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
def __str__(self):
return repr(self.reason)
class CodeQL():
def __init__(self) -> None:
codeql_result = subprocess.run(
["codeql", "version", "--format=json"], capture_output=True)
if not codeql_result.returncode == 0:
raise CodeQLError('CodeQL is not available on PATH!',
returncode=codeql_result.returncode)
codeql_output = codeql_result.stdout.decode('utf-8')
try:
codeql_version_info = json.loads(codeql_output)
self.version = codeql_version_info['version']
except JSONDecodeError as e:
raise CodeQLError(
f"Failed to retrieve codeql version information with error: {e.msg}")
def __build_command_options(self, **options: str) -> List[str]:
command_options = []
for key, value in options.items():
command_options.append(f"--{key.replace('_', '-')}")
if not isinstance(value, bool):
command_options.append(str(value))
return command_options
def cleanup(self, database_path: Path, mode: str = "normal") -> None:
if not database_path.exists():
raise CodeQLError(f"Database '{database_path}' not found!")
supported_modes = ["brutal", "normal", "light"]
if not mode in supported_modes:
raise CodeQLError(
f"Invalid cleanup mode {mode}, expecting one of {' '.join(supported_modes)}")
result = subprocess.run(
["codeql", "database", "cleanup", "--mode", mode, str(database_path)], capture_output=True)
if not result.returncode == 0:
raise CodeQLError(
f"Unable to cleanup database {database_path}", stdout=result.stdout, stderr=result.stderr, returncode=result.returncode)
def run_queries(self, database_path: Path, *queries: Path, **options: str) -> None:
database_path = database_path.resolve()
command_options = self.__build_command_options(**options)
command = ["codeql", "database", "run-queries"] + command_options
command.append(str(database_path))
command.extend(map(str, queries))
for query in queries:
if not query.exists():
raise CodeQLError(f"{query} not found!")
codeql_result = subprocess.run(command, capture_output=True)
if not codeql_result.returncode == 0:
raise CodeQLError(
f"Unable to run queries: {','.join((map(str, queries)))}!", stdout=codeql_result.stdout, stderr=codeql_result.stderr, returncode=codeql_result.returncode)
def resolve_qlpack_path(self, query: Path) -> Path:
for parent in query.parents:
qlpack = parent / 'qlpack.yml'
if qlpack.exists():
return qlpack
raise CodeQLError(f"Unable to find QL pack for {query}")
def get_qlpack(self, qlpack_path: Path) -> Any:
if qlpack_path.name != 'qlpack.yml':
raise CodeQLError(
f"Invalid QLPack path: {qlpack_path}, it is not ending with 'qlpack.yml'!")
with qlpack_path.open() as f:
return yaml.safe_load(f)
def decode_results(self, database_path: Path, query_path: Path, **options: str) -> List:
qlpack_path = self.resolve_qlpack_path(query_path)
qlpack = self.get_qlpack(qlpack_path)
relative_query_path = query_path.relative_to(qlpack_path.parent)
bqrs_path = (database_path / "results" /
qlpack['name'] / relative_query_path).with_suffix(".bqrs")
command = ["codeql", "bqrs", "decode"]
options['format'] = 'csv'
with tempfile.TemporaryDirectory() as temp_name:
temp_file = Path(temp_name) / \
Path(query_path).with_suffix('.tmp').name
options['output'] = str(temp_file)
command_options = self.__build_command_options(**options)
command.extend(command_options)
command.append(str(bqrs_path))
result = subprocess.run(command, capture_output=True)
if not result.returncode == 0:
raise CodeQLError(
f"Could not read the output of the query {query_path}", stdout=result.stdout, stderr=result.stderr, returncode=result.returncode)
with open(temp_file) as tmp:
return list(csv.reader(tmp))
def generate_query_help(self, query_help_path: Path, output: Path, format : str = "markdown", **options: str) -> None:
command = ['codeql', 'generate', 'query-help']
options['output'] = str(output)
options['format'] = format
command_options = self.__build_command_options(**options)
command.extend(command_options)
command.append(str(query_help_path))
result = subprocess.run(command, capture_output=True)
if not result.returncode == 0:
raise CodeQLError(
f"Failed to generate query help file {query_help_path}", stdout=result.stdout, stderr=result.stderr, returncode=result.returncode)
def format(self, path: Path) -> None:
command = ['codeql', 'query', 'format', '--in-place', str(path)]
result = subprocess.run(command, capture_output=True)
if not result.returncode == 0:
raise CodeQLError(
f"Failed to format file {path}", stdout=result.stdout, stderr=result.stderr, returncode=result.returncode)
def create_database(self, src_root: Path, language: str, database: Path, *build_commands : str, **options: str) -> None:
command = ['codeql', 'database', 'create']
options['source-root'] = str(src_root)
options['language'] = language
command_options = self.__build_command_options(**options)
command.extend(command_options)
command.extend([f'--command={build_command}' for build_command in build_commands])
command.append(str(database))
result = subprocess.run(command, capture_output=True)
if not result.returncode == 0:
raise CodeQLError(
f"Failed to build database {database} from {src_root} with language {language} and commands [{','.join(build_commands)}]", stdout=result.stdout, stderr=result.stderr, returncode=result.returncode)