1
1
#
2
- #
3
2
# Copyright (c) nexB Inc. and others. All rights reserved.
4
3
# VulnerableCode is a trademark of nexB Inc.
5
4
# SPDX-License-Identifier: Apache-2.0
21
20
22
21
from vulnerabilities .importer import AdvisoryData
23
22
from vulnerabilities .importer import AffectedPackage
24
- from vulnerabilities .importer import Importer
23
+ from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipeline
25
24
from vulnerabilities .references import WireSharkReference
26
25
from vulnerabilities .references import XsaReference
27
26
from vulnerabilities .references import ZbxReference
28
27
from vulnerabilities .utils import fetch_response
29
28
from vulnerabilities .utils import is_cve
30
29
31
- LOGGER = logging .getLogger (__name__ )
32
- BASE_URL = "https://secdb.alpinelinux.org/"
33
30
31
+ class AlpineLinuxImporterPipeline (VulnerableCodeBaseImporterPipeline ):
32
+ """Collect Alpine Linux advisories."""
33
+
34
+ pipeline_id = "alpine_linux_importer"
34
35
35
- class AlpineImporter (Importer ):
36
36
spdx_license_expression = "CC-BY-SA-4.0"
37
37
license_url = "https://secdb.alpinelinux.org/license.txt"
38
+ url = "https://secdb.alpinelinux.org/"
38
39
importer_name = "Alpine Linux Importer"
39
40
40
- def advisory_data (self ) -> Iterable [AdvisoryData ]:
41
- page_response_content = fetch_response (BASE_URL ).content
42
- advisory_directory_links = fetch_advisory_directory_links (page_response_content )
41
+ @classmethod
42
+ def steps (cls ):
43
+ return (
44
+ cls .collect_and_store_advisories ,
45
+ cls .import_new_advisories ,
46
+ )
47
+
48
+ def advisories_count (self ) -> int :
49
+ return 0
50
+
51
+ def collect_advisories (self ) -> Iterable [AdvisoryData ]:
52
+ page_response_content = fetch_response (self .url ).content
53
+ advisory_directory_links = fetch_advisory_directory_links (
54
+ page_response_content , self .url , self .log
55
+ )
43
56
advisory_links = []
44
57
for advisory_directory_link in advisory_directory_links :
45
58
advisory_directory_page = fetch_response (advisory_directory_link ).content
46
59
advisory_links .extend (
47
- fetch_advisory_links (advisory_directory_page , advisory_directory_link )
60
+ fetch_advisory_links (advisory_directory_page , advisory_directory_link , self . log )
48
61
)
49
62
for link in advisory_links :
50
63
record = fetch_response (link ).json ()
51
64
if not record ["packages" ]:
52
- LOGGER .error (f'"packages" not found in { link !r} ' )
65
+ self .log (
66
+ f'"packages" not found in { link !r} ' ,
67
+ level = logging .DEBUG ,
68
+ )
53
69
continue
54
- yield from process_record (record = record , url = link )
70
+ yield from process_record (record = record , url = link , logger = self . log )
55
71
56
72
57
- def fetch_advisory_directory_links (page_response_content : str ) -> List [str ]:
73
+ def fetch_advisory_directory_links (
74
+ page_response_content : str ,
75
+ base_url : str ,
76
+ logger : callable = None ,
77
+ ) -> List [str ]:
58
78
"""
59
79
Return a list of advisory directory links present in `page_response_content` html string
60
80
"""
@@ -66,60 +86,83 @@ def fetch_advisory_directory_links(page_response_content: str) -> List[str]:
66
86
]
67
87
68
88
if not alpine_versions :
69
- LOGGER .error (f"No versions found in { BASE_URL !r} " )
89
+ if logger :
90
+ logger (
91
+ f"No versions found in { base_url !r} " ,
92
+ level = logging .DEBUG ,
93
+ )
70
94
return []
71
95
72
- advisory_directory_links = [urljoin (BASE_URL , version ) for version in alpine_versions ]
96
+ advisory_directory_links = [urljoin (base_url , version ) for version in alpine_versions ]
73
97
74
98
return advisory_directory_links
75
99
76
100
77
101
def fetch_advisory_links (
78
- advisory_directory_page : str , advisory_directory_link : str
102
+ advisory_directory_page : str ,
103
+ advisory_directory_link : str ,
104
+ logger : callable = None ,
79
105
) -> Iterable [str ]:
80
106
"""
81
107
Yield json file urls present in `advisory_directory_page`
82
108
"""
83
109
advisory_directory_page = BeautifulSoup (advisory_directory_page , features = "lxml" )
84
110
anchor_tags = advisory_directory_page .find_all ("a" )
85
111
if not anchor_tags :
86
- LOGGER .error (f"No anchor tags found in { advisory_directory_link !r} " )
112
+ if logger :
113
+ logger (
114
+ f"No anchor tags found in { advisory_directory_link !r} " ,
115
+ level = logging .DEBUG ,
116
+ )
87
117
return iter ([])
88
118
for anchor_tag in anchor_tags :
89
119
if anchor_tag .text .endswith ("json" ):
90
120
yield urljoin (advisory_directory_link , anchor_tag .text )
91
121
92
122
93
- def check_for_attributes (record ) -> bool :
123
+ def check_for_attributes (record , logger ) -> bool :
94
124
attributes = ["distroversion" , "reponame" , "archs" ]
95
125
for attribute in attributes :
96
126
if attribute not in record :
97
- LOGGER .error (f'"{ attribute !r} " not found in { record !r} ' )
127
+ if logger :
128
+ logger (
129
+ f'"{ attribute !r} " not found in { record !r} ' ,
130
+ level = logging .DEBUG ,
131
+ )
98
132
return False
99
133
return True
100
134
101
135
102
- def process_record (record : dict , url : str ) -> Iterable [AdvisoryData ]:
136
+ def process_record (record : dict , url : str , logger : callable = None ) -> Iterable [AdvisoryData ]:
103
137
"""
104
138
Return a list of AdvisoryData objects by processing data
105
139
present in that `record`
106
140
"""
107
141
if not record .get ("packages" ):
108
- LOGGER .error (f'"packages" not found in this record { record !r} ' )
142
+ if logger :
143
+ logger (
144
+ f'"packages" not found in this record { record !r} ' ,
145
+ level = logging .DEBUG ,
146
+ )
109
147
return []
110
148
111
149
for package in record ["packages" ]:
112
150
if not package ["pkg" ]:
113
- LOGGER .error (f'"pkg" not found in this package { package !r} ' )
151
+ if logger :
152
+ logger (
153
+ f'"pkg" not found in this package { package !r} ' ,
154
+ level = logging .DEBUG ,
155
+ )
114
156
continue
115
- if not check_for_attributes (record ):
157
+ if not check_for_attributes (record , logger ):
116
158
continue
117
159
yield from load_advisories (
118
160
pkg_infos = package ["pkg" ],
119
161
distroversion = record ["distroversion" ],
120
162
reponame = record ["reponame" ],
121
163
archs = record ["archs" ],
122
164
url = url ,
165
+ logger = logger ,
123
166
)
124
167
125
168
@@ -129,24 +172,37 @@ def load_advisories(
129
172
reponame : str ,
130
173
archs : List [str ],
131
174
url : str ,
175
+ logger : callable = None ,
132
176
) -> Iterable [AdvisoryData ]:
133
177
"""
134
178
Yield AdvisoryData by mapping data from `pkg_infos`
135
179
and form PURL for AffectedPackages by using
136
180
`distroversion`, `reponame`, `archs`
137
181
"""
138
182
if not pkg_infos .get ("name" ):
139
- LOGGER .error (f'"name" is not available in package { pkg_infos !r} ' )
183
+ if logger :
184
+ logger (
185
+ f'"name" is not available in package { pkg_infos !r} ' ,
186
+ level = logging .DEBUG ,
187
+ )
140
188
return []
141
189
142
190
for version , fixed_vulns in pkg_infos ["secfixes" ].items ():
143
191
if not fixed_vulns :
144
- LOGGER .error (f"No fixed vulnerabilities in version { version !r} " )
192
+ if logger :
193
+ logger (
194
+ f"No fixed vulnerabilities in version { version !r} " ,
195
+ level = logging .DEBUG ,
196
+ )
145
197
continue
146
198
147
199
for vuln_ids in fixed_vulns :
148
200
if not isinstance (vuln_ids , str ):
149
- LOGGER .error (f"{ vuln_ids !r} is not of `str` instance" )
201
+ if logger :
202
+ logger (
203
+ f"{ vuln_ids !r} is not of `str` instance" ,
204
+ level = logging .DEBUG ,
205
+ )
150
206
continue
151
207
vuln_ids = vuln_ids .split ()
152
208
aliases = []
@@ -179,10 +235,18 @@ def load_advisories(
179
235
try :
180
236
fixed_version = AlpineLinuxVersion (version )
181
237
except Exception as e :
182
- LOGGER .error (f"{ version !r} is not a valid AlpineVersion { e !r} " )
238
+ if logger :
239
+ logger (
240
+ f"{ version !r} is not a valid AlpineVersion { e !r} " ,
241
+ level = logging .DEBUG ,
242
+ )
183
243
continue
184
244
if not isinstance (archs , List ):
185
- LOGGER .error (f"{ archs !r} is not of `List` instance" )
245
+ if logger :
246
+ logger (
247
+ f"{ archs !r} is not of `List` instance" ,
248
+ level = logging .DEBUG ,
249
+ )
186
250
continue
187
251
if archs :
188
252
for arch in archs :
0 commit comments