Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions vanir/code_extractors/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ py_library(
":code_extractor_android",
":code_extractor_base",
":code_extractor_git",
":code_extractor_linux",
"//:vulnerability",
requirement("requests"),
],
Expand Down Expand Up @@ -57,6 +58,17 @@ py_library(
],
)

py_library(
name = "code_extractor_linux",
srcs = ["code_extractor_linux.py"],
deps = [
":code_extractor_base",
":git_commit",
":gitiles_commit",
"//:vulnerability",
],
)

py_library(
name = "gitiles_commit",
srcs = ["gitiles_commit.py"],
Expand Down Expand Up @@ -134,6 +146,21 @@ py_test(
],
)

py_test(
name = "code_extractor_linux_test",
srcs = ["code_extractor_linux_test.py"],
data = [
"//vanir/testdata:test_patch_set",
],
deps = [
":code_extractor_linux",
":code_extractor_base",
":git_commit",
"//:vulnerability",
requirement("absl-py"),
],
)

py_test(
name = "gitiles_commit_test",
srcs = ["gitiles_commit_test.py"],
Expand Down
1 change: 1 addition & 0 deletions vanir/code_extractors/code_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from vanir.code_extractors import code_extractor_android
from vanir.code_extractors import code_extractor_base
from vanir.code_extractors import code_extractor_git
from vanir.code_extractors import code_extractor_linux
# pylint: enable=unused-import

_P = TypeVar('_P', bound=code_extractor_base.AbstractCodeExtractor)
Expand Down
87 changes: 87 additions & 0 deletions vanir/code_extractors/code_extractor_linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright 2023 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd

"""Code extractors for Linux ecosystem packages of OSV CVEs.
"""

import functools
import logging
from typing import Collection, FrozenSet, Mapping, Sequence, Tuple

from vanir import vulnerability
from vanir.code_extractors import code_extractor_base
from vanir.code_extractors import git_commit
from vanir.code_extractors import gitiles_commit


@functools.cache
def _generate_commit(url: str, **kwargs) -> code_extractor_base.Commit:
"""Generates Commit object for the given URL.
Args:
url: a URL pointing a commit of a known source repo.
**kwargs: additional arguments to pass to the constructor of each Commit.
Returns:
A commit object containing all patches and files extracted from |url|.
Raises:
CommitDataFetchError: when fails to extract valid commit data from |url|.
ValueError: when the given URL is malformatted or not compatible with any
known source repos.
"""
known_commit_classes = [
gitiles_commit.GitilesCommit,
git_commit.GitCommit,
]
for commit_class in known_commit_classes:
try:
return commit_class(url, **kwargs)
except (
code_extractor_base.IncompatibleUrlError,
code_extractor_base.CommitDataFetchError,
):
continue
raise ValueError('Unknown commit URL: %s' % url)


class LinuxCodeExtractor(code_extractor_base.AbstractCodeExtractor):
"""Code extractor for Linux affected packages."""

@classmethod
def is_supported_ecosystem(cls, ecosystem: str) -> bool:
return 'Debian' in ecosystem or 'Linux' in ecosystem

def extract_commits_for_affected_entry(
self,
affected: vulnerability.AffectedEntry,
**kwargs,
) -> Tuple[
Sequence[code_extractor_base.Commit],
Sequence[code_extractor_base.FailedCommitUrl],
]:
fix_urls = affected.ecosystem_specific.get('fixes', [])
commits = []
failed_commit_urls = []
for fix_url in fix_urls:
logging.info('Analyzing fix: %s', fix_url)
try:
commit = _generate_commit(fix_url, **kwargs)
commits.append(commit)
except (ValueError, code_extractor_base.CommitDataFetchError) as e:
failed_commit_urls.append(
code_extractor_base.FailedCommitUrl(fix_url, e)
)
return (commits, failed_commit_urls)

def extract_files_at_tip_of_unaffected_versions(
self,
package_name: str,
affected_versions: Collection[str],
files: Collection[str],
**kwargs,
) -> Tuple[
Sequence[code_extractor_base.Commit],
Sequence[code_extractor_base.FailedCommitUrl],
]:
return ([], [])
113 changes: 113 additions & 0 deletions vanir/code_extractors/code_extractor_linux_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2023 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd

from unittest import mock

from vanir import vulnerability
from vanir.code_extractors import code_extractor_linux
from vanir.code_extractors import code_extractor_base
from vanir.code_extractors import git_commit

from absl.testing import absltest
from absl.testing import parameterized

class CodeExtractorLinuxTest(parameterized.TestCase):

def setUp(self):
super().setUp()
# special mock for git operations done in GitCommit's constructor
# return value must not be empty
self.enter_context(
mock.patch.object(git_commit.GitCommit, '_run_git', autospec=True)
).return_value = b"mock-result"

def test_commit_init_with_unknown_commit_url(self):
bad_url = 'https://unsupported.kernel.patch.source.com/blah'
affected = vulnerability.AffectedEntry({
'package': {'ecosystem': 'Linux', 'name': 'Kernel'},
'ecosystem_specific': {'fixes': [bad_url]},
})
extractor = code_extractor_linux.LinuxCodeExtractor()
commits, failures = extractor.extract_commits_for_affected_entry(affected)
self.assertEmpty(commits)
self.assertLen(failures, 1)
self.assertEqual(failures[0].url, bad_url)
self.assertIsInstance(failures[0].error, ValueError)

def test_different_packages(self):
packages = (
{'ecosystem': 'Linux', 'name': 'Kernel'},
{'ecosystem': 'Debian:11', 'name': 'linux'}
)
for package in packages:
affected = vulnerability.AffectedEntry({
'package': package,
'ecosystem_specific': {'fixes': [
'https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=1234567',
]},
})
extractor = code_extractor_linux.LinuxCodeExtractor()
commits, failures = extractor.extract_commits_for_affected_entry(affected)
self.assertLen(failures, 0)

def test_extractor_with_multiple_fixes_and_failures(self):
affected = vulnerability.AffectedEntry({
'package': {'ecosystem': 'Linux', 'name': 'Kernel'},
'ecosystem_specific': {'fixes': [
'https://git.kernel.org/linus/1234567',
'https://git.kernel.org/stable/c/1234567',
'https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=1234567',
'https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git@1234567',
'https://github.com/torvalds/linux/commit/1234567',
'https://unsupported.kernel.patch.source.com/blah',
]},
})
extractor = code_extractor_linux.LinuxCodeExtractor()
commits, failures = extractor.extract_commits_for_affected_entry(affected)
self.assertLen(failures, 1)
self.assertEqual(
failures[0].url,
'https://unsupported.kernel.patch.source.com/blah'
)
self.assertLen(commits, 5)
for commit in commits:
self.assertIsInstance(commit, git_commit.GitCommit)
self.assertEqual(commit._rev, "1234567")

self.assertEqual(
commits[0]._remote,
'https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git'
)
self.assertEqual(
commits[1]._remote,
'https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git'
)
self.assertEqual(
commits[2]._remote,
'https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git'
)
self.assertEqual(
commits[3]._remote,
'https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git'
)
self.assertEqual(
commits[4]._remote,
'https://github.com/torvalds/linux'
)

def test_extract_with_empty_patch(self):
affected = vulnerability.AffectedEntry({
'package': {'ecosystem': 'Linux', 'name': 'Kernel'},
'ecosystem_specific': {'fixes': []},
})

extractor = code_extractor_linux.LinuxCodeExtractor()
commits, failures = extractor.extract_commits_for_affected_entry(affected)
self.assertEmpty(commits)
self.assertEmpty(failures)

if __name__ == '__main__':
absltest.main()
22 changes: 21 additions & 1 deletion vanir/code_extractors/git_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
_GENERIC_URL_PATTERN = re.compile(
r'(?P<remote>[^:]+://[^/]+/.+)/(?P<rev>[^/]+)'
)
# https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/?id=382c27f4ed28f803b1f1473ac2d8db0afc795a1b
_LINUX_KERNEL_PATTERN = re.compile(
r'(?P<remote>[^:]+://git.kernel.org/pub/scm/linux/kernel/git/[^/]+/[^.]+.git)/commit/\?id=(?P<rev>[^/]+)'
)
# https://git.kernel.org/linus/1eff70a9abd46f175defafd29bc17ad456f398a7
# https://git.kernel.org/stable/c/47f82395f04a976d4fa97de7f2acffa1c1096571
_LINUX_KERNEL_PATTERN_SHORT = re.compile(
r'(?P<remote>[^:]+://git.kernel.org/)(?P<name>[^/]+)/(c/)?(?P<rev>[^/]+)'
)


@functools.cache
Expand All @@ -36,11 +45,22 @@ def _parse_url(url: str) -> Tuple[str, str]:
_NORMALIZED_URL_PATTERN,
_GITILES_URL_PATTERN,
_GITHUB_URL_PATTERN,
_LINUX_KERNEL_PATTERN,
_LINUX_KERNEL_PATTERN_SHORT,
_GENERIC_URL_PATTERN,
):
match = pattern.fullmatch(url)
if match:
return (match.group('remote'), match.group('rev'))
if pattern == _LINUX_KERNEL_PATTERN_SHORT:
# the shortened url does not contain the complete remote
if match.group('name') == 'linus':
remote = 'https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git'
return (remote, match.group('rev'))
elif match.group('name') == 'stable':
remote = 'https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git'
return (remote, match.group('rev'))
else:
return (match.group('remote'), match.group('rev'))
raise code_extractor_base.IncompatibleUrlError(f'Unrecognized git URL: {url}')


Expand Down
12 changes: 12 additions & 0 deletions vanir/sign_generator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@
' stored in affected[].database_specific.vanir_signatures.',
)

_FAIL_ON_MISSING_FIELD = flags.DEFINE_bool(
'fail_on_missing_field',
True,
'if True, the program will abort the execution'
'when one or more fields are missing for a vulnerability. If False,'
'malformed vulnerabilities are ignored.',
)

_FLAG_ERROR_MESSAGE = (
'Not enough configuration flags has been passed. '
'Please provide one of the following sets of flags:\n'
Expand Down Expand Up @@ -176,25 +184,29 @@ def main(argv: Sequence[str]) -> None:
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=retries))

legacy_signatures = _STORE_SIGNATURES_IN_LEGACY_LOCATION.value
fail_on_missing_field = _FAIL_ON_MISSING_FIELD.value
if _VULNERABILITY_FILE_NAME.value:
vuln_file_path = os.path.abspath(_VULNERABILITY_FILE_NAME.value)
vuln_manager = vulnerability_manager.generate_from_file(
vuln_file_path,
store_signatures_in_legacy_location=legacy_signatures,
fail_on_missing_field=fail_on_missing_field,
)
elif _USE_OSV_ANDROID_KERNEL.value:
vuln_manager = vulnerability_manager.generate_from_osv(
ecosystem='Android',
packages=vulnerability.MetaPackage.ANDROID_KERNEL,
session=session,
store_signatures_in_legacy_location=legacy_signatures,
fail_on_missing_field=fail_on_missing_field,
)
elif _OSV_ECOSYSTEM.value:
vuln_manager = vulnerability_manager.generate_from_osv(
ecosystem=_OSV_ECOSYSTEM.value,
packages=_OSV_PACKAGES.value if _OSV_PACKAGES.value else None,
session=session,
store_signatures_in_legacy_location=legacy_signatures,
fail_on_missing_field=fail_on_missing_field,
)
else:
raise ValueError(_FLAG_ERROR_MESSAGE)
Expand Down
Loading