Skip to content

Commit c21f851

Browse files
n-thumanngreenbonebot
authored andcommitted
Add: SourceApi class
1 parent b0374c5 commit c21f851

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed

pontos/nvd/source/api.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# SPDX-FileCopyrightText: 2022-2023 Greenbone AG
2+
#
3+
# SPDX-License-Identifier: GPL-3.0-or-later
4+
#
5+
6+
from datetime import datetime
7+
from typing import (
8+
Iterable,
9+
Iterator,
10+
Optional,
11+
)
12+
13+
from httpx import Timeout
14+
15+
from pontos.nvd.api import (
16+
DEFAULT_TIMEOUT_CONFIG,
17+
JSON,
18+
NVDApi,
19+
NVDResults,
20+
Params,
21+
format_date,
22+
now,
23+
)
24+
from pontos.nvd.models.source import Source
25+
26+
__all__ = ("SourceApi",)
27+
28+
DEFAULT_NIST_NVD_CVES_URL = "https://services.nvd.nist.gov/rest/json/source/2.0"
29+
MAX_SOURCES_PER_PAGE = 1000
30+
31+
32+
def _result_iterator(data: JSON) -> Iterator[Source]:
33+
sources: Iterable = data.get("sources", []) # type: ignore
34+
return (Source.from_dict(source) for source in sources)
35+
36+
37+
class SourceApi(NVDApi):
38+
"""
39+
API for querying the NIST NVD CVE Change History information.
40+
41+
Should be used as an async context manager.
42+
43+
Example:
44+
.. code-block:: python
45+
46+
from pontos.nvd.source import SourceApi
47+
48+
async with SourceApi() as api:
49+
async for source in api.source():
50+
print(source)
51+
"""
52+
53+
def __init__(
54+
self,
55+
*,
56+
token: Optional[str] = None,
57+
timeout: Optional[Timeout] = DEFAULT_TIMEOUT_CONFIG,
58+
rate_limit: bool = True,
59+
request_attempts: int = 1,
60+
) -> None:
61+
"""
62+
Create a new instance of the CVE API.
63+
64+
Args:
65+
token: The API key to use. Using an API key allows to run more
66+
requests at the same time.
67+
timeout: Timeout settings for the HTTP requests
68+
rate_limit: Set to False to ignore rate limits. The public rate
69+
limit (without an API key) is 5 requests in a rolling 30 second
70+
window. The rate limit with an API key is 50 requests in a
71+
rolling 30 second window.
72+
See https://nvd.nist.gov/developers/start-here#divRateLimits
73+
Default: True.
74+
request_attempts: The number of attempts per HTTP request. Defaults to 1.
75+
"""
76+
super().__init__(
77+
DEFAULT_NIST_NVD_CVES_URL,
78+
token=token,
79+
timeout=timeout,
80+
rate_limit=rate_limit,
81+
request_attempts=request_attempts,
82+
)
83+
84+
def sources(
85+
self,
86+
*,
87+
last_modified_start_date: Optional[datetime] = None,
88+
last_modified_end_date: Optional[datetime] = None,
89+
source_identifier: Optional[str] = None,
90+
request_results: Optional[int] = None,
91+
start_index: int = 0,
92+
results_per_page: Optional[int] = None,
93+
) -> NVDResults[Source]:
94+
"""
95+
Get all sources for the provided arguments
96+
97+
https://nvd.nist.gov/developers/data-sources#divGetSource
98+
99+
Args:
100+
last_modified_start_date: Return all CVEs modified after this date.
101+
last_modified_end_date: Return all CVEs modified before this date.
102+
If last_modified_start_date is set but no
103+
last_modified_end_date is passed it is set to now.
104+
source_identifier: Return all source records where the source identifier matches
105+
start_index: Index of the first CVE to be returned. Useful only for
106+
paginated requests that should not start at the first page.
107+
results_per_page: Number of results in a single requests. Mostly
108+
useful for paginated requests.
109+
110+
Returns:
111+
A NVDResponse for sources
112+
113+
Examples:
114+
.. code-block:: python
115+
116+
from pontos.nvd.source import SourceApi
117+
118+
async with SourceApi() as api:
119+
async for source in api.source(source_identifier="[email protected]"):
120+
print(source)
121+
"""
122+
params: Params = {}
123+
if last_modified_start_date:
124+
params["lastModStartDate"] = format_date(last_modified_start_date)
125+
if not last_modified_end_date:
126+
params["lastModEndDate"] = format_date(now())
127+
if last_modified_end_date:
128+
params["lastModEndDate"] = format_date(last_modified_end_date)
129+
130+
if source_identifier:
131+
params["sourceIdentifier"] = source_identifier
132+
133+
results_per_page = min(
134+
results_per_page or MAX_SOURCES_PER_PAGE,
135+
request_results or MAX_SOURCES_PER_PAGE,
136+
)
137+
return NVDResults(
138+
self,
139+
params,
140+
_result_iterator,
141+
request_results=request_results,
142+
results_per_page=results_per_page,
143+
start_index=start_index,
144+
)

0 commit comments

Comments
 (0)