mirror of https://git.sr.ht/~garritfra/taurus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
721 lines
23 KiB
721 lines
23 KiB
#!/usr/bin/env python3 |
|
""" |
|
A diagnostic tool for gemini servers. |
|
|
|
This program will barrage your server with a series of requests in |
|
an attempt to uncover unexpected behavior. Not all of these checks |
|
adhere strictly to the gemini specification. Some of them are |
|
general best practices, and some trigger undefined behavior. Results |
|
should be taken with a grain of salt and analyzed on their own merit. |
|
""" |
|
import argparse |
|
import contextlib |
|
import datetime |
|
import ipaddress |
|
import socket |
|
import ssl |
|
import sys |
|
import time |
|
import typing |
|
|
|
if sys.version_info < (3, 7): |
|
sys.exit("Fatal Error: script requires Python 3.7+") |
|
|
|
socket.setdefaulttimeout(5) |
|
|
|
# ANSI color codes |
|
A_BOLD = 1 |
|
FG_BLACK = 30 |
|
FG_RED = 31 |
|
FG_GREEN = 32 |
|
FG_YELLOW = 33 |
|
FG_BLUE = 34 |
|
FG_MAGENTA = 35 |
|
FG_CYAN = 36 |
|
FG_WHITE = 37 |
|
|
|
|
|
def colorize(text: str, color: int) -> str: |
|
""" |
|
Colorize text using ANSI escape codes. |
|
""" |
|
if sys.stdout.isatty(): |
|
return f"\033[{color}m{text}\033[0m" |
|
else: |
|
return text |
|
|
|
|
|
def log(text: str, style: str = "normal") -> None: |
|
""" |
|
Print formatted text to stdout with optional styling. |
|
""" |
|
if style == "title": |
|
text = colorize(text, A_BOLD) |
|
if style == "warning": |
|
text = colorize(f" {text}", FG_YELLOW) |
|
elif style == "info": |
|
text = colorize(f" {text}", FG_CYAN) |
|
elif style == "success": |
|
text = colorize(f" ✓ {text}", FG_GREEN) |
|
elif style == "failure": |
|
text = colorize(f" x {text}", FG_RED) |
|
print(text) |
|
|
|
|
|
def log_error(err: Exception) -> None: |
|
""" |
|
Helper method for formatting exceptions as error messages. |
|
""" |
|
if isinstance(err, Warning): |
|
log(str(err), style="warning") |
|
else: |
|
log(str(err), style="failure") |
|
|
|
|
|
class GeminiResponse: |
|
def __init__(self, header): |
|
self.charset: str = "utf-8" |
|
self.header: str = header |
|
self.body: str = "" |
|
self.meta: typing.Optional[str] = None |
|
self.status: typing.Optional[str] = None |
|
self.mime: typing.Optional[str] = None |
|
|
|
|
|
class BaseCheck: |
|
""" |
|
Abstract base class for implementing server checks. |
|
""" |
|
|
|
description: str = "" |
|
|
|
def __init__(self, args: argparse.Namespace): |
|
self.args = args |
|
|
|
def run(self) -> None: |
|
""" |
|
Run the check and log any unhandled exceptions. |
|
""" |
|
log(f"[{self.__class__.__name__}] {self.__doc__}", style="title") |
|
try: |
|
self.check() |
|
except Exception as e: |
|
log_error(e) |
|
log("") |
|
|
|
def check(self) -> None: |
|
raise NotImplemented |
|
|
|
@property |
|
def netloc(self): |
|
if self.args.port == 1965: |
|
return self.args.host |
|
else: |
|
return f"{self.args.host}:{self.args.port}" |
|
|
|
def resolve_host(self, family: socket.AddressFamily) -> tuple: |
|
""" |
|
Retrieve the IP address and connection information for the host. |
|
""" |
|
host = self.args.host |
|
port = self.args.port |
|
type_ = socket.SOCK_STREAM |
|
proto = socket.IPPROTO_TCP |
|
addr_info = socket.getaddrinfo(host, port, family, type_, proto) |
|
if not addr_info: |
|
raise UserWarning(f"No {family} address found for host") |
|
# Gemini IPv6 |
|
return addr_info[0][4] |
|
|
|
@contextlib.contextmanager |
|
def connection( |
|
self, context: typing.Optional[ssl.SSLContext] = None |
|
) -> ssl.SSLSocket: |
|
""" |
|
Setup an unverified TLS socket connection with the host. |
|
""" |
|
if context is None: |
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS) |
|
context.check_hostname = False |
|
context.verify_mode = ssl.CERT_NONE |
|
with socket.create_connection( |
|
(self.args.host, self.args.port), timeout=5 |
|
) as sock: |
|
with context.wrap_socket(sock, server_hostname = self.netloc) as ssock: |
|
yield ssock |
|
|
|
def make_request(self, url: str) -> GeminiResponse: |
|
""" |
|
Send the request verbatim to the server and parse the response bytes. |
|
""" |
|
log("Requesting URL") |
|
log(repr(url), style="info") |
|
with self.connection() as sock: |
|
sock.sendall(url.encode(errors="surrogateescape")) |
|
fp = sock.makefile("rb") |
|
header = fp.readline().decode() |
|
|
|
log("Response header") |
|
log(repr(header), style="info") |
|
|
|
response = GeminiResponse(header) |
|
try: |
|
response.status, response.meta = header.strip().split(maxsplit=1) |
|
except ValueError: |
|
return response |
|
|
|
if response.status.startswith("2"): |
|
meta_parts = [part.strip() for part in response.meta.split(";")] |
|
response.mime = meta_parts[0] |
|
for part in meta_parts[1:]: |
|
if part.lower().startswith("charset="): |
|
response.charset = part[8:] |
|
|
|
response.body = fp.read().decode(response.charset) |
|
return response |
|
|
|
def assert_success(self, response: GeminiResponse) -> None: |
|
""" |
|
Helper method to check if a response was successful. |
|
""" |
|
log("Status should return a success code (20 SUCCESS)") |
|
style = "success" if response.status == "20" else "failure" |
|
log(f"Received status of {response.status!r}", style) |
|
|
|
def assert_permanent_failure(self, response: GeminiResponse) -> None: |
|
""" |
|
Helper method to assert that a response returned a permanent. |
|
""" |
|
log("Status should return a failure code (5X PERMANENT FAILURE)") |
|
style = "success" if response.status.startswith("5") else "failure" |
|
log(f"Received status of {response.status!r}", style) |
|
|
|
def assert_proxy_refused(self, response: GeminiResponse) -> None: |
|
""" |
|
Helper method to assert that a response returned a permanent. |
|
""" |
|
log("Status should return a failure code (53 PROXY REQUEST REFUSED)") |
|
style = "success" if response.status == "53" else "failure" |
|
log(f"Received status of {response.status!r}", style) |
|
|
|
def assert_bad_request(self, response: GeminiResponse) -> None: |
|
""" |
|
Helper method to assert that a response returned a permanent. |
|
""" |
|
log("Status should return a failure code (59 BAD REQUEST)") |
|
style = "success" if response.status == "59" else "failure" |
|
log(f"Received status of {response.status!r}", style) |
|
|
|
|
|
class IPv4Address(BaseCheck): |
|
"""Establish a connection over an IPv4 address""" |
|
|
|
def check(self): |
|
log(f"Looking up IPv4 address for {self.args.host!r}") |
|
addr = self.resolve_host(socket.AF_INET) |
|
log(f"{addr[0]!r}", style="success") |
|
log(f"Attempting to connect to {addr[0]}:{addr[1]}") |
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
|
sock.connect(addr) |
|
sock.close() |
|
log(f"Successfully established connection", style="success") |
|
|
|
|
|
class IPv6Address(BaseCheck): |
|
"""Establish a connection over an IPv6 address""" |
|
|
|
def check(self) -> None: |
|
log(f"Looking up IPv6 address for {self.args.host!r}") |
|
addr = self.resolve_host(socket.AF_INET6) |
|
if ipaddress.ip_address(addr[0]).ipv4_mapped: |
|
raise UserWarning("Found IPv4-mapped address, skipping check") |
|
log(f"{addr[0]!r}", style="success") |
|
log(f"Attempting to connect to [{addr[0]}]:{addr[1]}") |
|
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: |
|
sock.connect(addr) |
|
sock.close() |
|
log(f"Successfully established connection", style="success") |
|
|
|
|
|
class TLSVersion(BaseCheck): |
|
"""Server must negotiate at least TLS v1.2, ideally TLS v1.3""" |
|
|
|
def check(self) -> None: |
|
log(f"Checking client library") |
|
log(f"{ssl.OPENSSL_VERSION!r}", style="info") |
|
log("Determining highest supported TLS version") |
|
with self.connection() as sock: |
|
version = sock.version() |
|
if version in ("SSLv2", "SSLv3", "TLSv1", "TLSv1.1"): |
|
log(f"Negotiated {version}", style="failure") |
|
elif version == "TLSv1.2": |
|
log(f"Negotiated {version}", style="warning") |
|
else: |
|
log(f"Negotiated {version}", style="success") |
|
|
|
|
|
class TLSClaims(BaseCheck): |
|
"""Certificate claims must be valid""" |
|
|
|
def check(self) -> None: |
|
try: |
|
# $ pip install cryptography |
|
import cryptography |
|
from cryptography.hazmat.backends import default_backend |
|
from cryptography.x509.oid import NameOID, ExtensionOID |
|
except ImportError: |
|
raise UserWarning("cryptography library not installed, skipping check") |
|
|
|
with self.connection() as sock: |
|
# Python refuses to parse a certificate unless the issuer is validated. |
|
# Because many gemini servers use self-signed certs, we need to use |
|
# a third-party library to parse the certs from their binary form. |
|
der_x509 = sock.getpeercert(binary_form=True) |
|
cert = default_backend().load_der_x509_certificate(der_x509) |
|
now = datetime.datetime.utcnow() |
|
|
|
log('Checking "Not Valid Before" timestamp') |
|
style = "success" if cert.not_valid_before <= now else "failure" |
|
log(f"{cert.not_valid_before} UTC", style) |
|
|
|
log('Checking "Not Valid After" timestamp') |
|
style = "success" if cert.not_valid_after >= now else "failure" |
|
log(f"{cert.not_valid_after} UTC", style) |
|
|
|
log("Checking subject claim matches server hostname") |
|
subject = [] |
|
for cn in cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME): |
|
subject.append(("commonName", cn.value)) |
|
|
|
subject_alt_name = [] |
|
try: |
|
ext = cert.extensions.get_extension_for_oid( |
|
ExtensionOID.SUBJECT_ALTERNATIVE_NAME |
|
) |
|
except cryptography.x509.ExtensionNotFound: |
|
pass |
|
else: |
|
for dns in ext.value.get_values_for_type(cryptography.x509.DNSName): |
|
subject_alt_name.append(("DNS", dns)) |
|
for ip_address in ext.value.get_values_for_type( |
|
cryptography.x509.IPAddress |
|
): |
|
subject_alt_name.append(("IP Address", ip_address)) |
|
|
|
cert_dict = { |
|
"subject": (tuple(subject),), |
|
"subjectAltName": tuple(subject_alt_name), |
|
} |
|
log(f"{cert_dict!r}", style="info") |
|
ssl.match_hostname(cert_dict, self.args.host) |
|
log(f"Hostname {self.args.host!r} matches claim", style="success") |
|
|
|
|
|
class TLSVerified(BaseCheck): |
|
"""Certificate should be self-signed or have a trusted issuer""" |
|
|
|
def check(self) -> None: |
|
log("Connecting over verified SSL socket") |
|
context = ssl.create_default_context() |
|
try: |
|
with socket.create_connection((self.args.host, self.args.port)) as sock: |
|
with context.wrap_socket(sock, server_hostname=self.args.host) as ssock: |
|
ssock.sendall(f"gemini://{self.netloc}\r\n".encode()) |
|
except Exception as e: |
|
if getattr(e, "verify_code", None) == 18: |
|
log("Self-signed TLS certificate detected", style="warning") |
|
else: |
|
raise |
|
else: |
|
log("Established trusted TLS connection", style="success") |
|
|
|
|
|
class TLSRequired(BaseCheck): |
|
"""Non-TLS requests should be refused""" |
|
|
|
def check(self) -> None: |
|
log("Sending non-TLS request") |
|
try: |
|
with socket.create_connection((self.args.host, self.args.port)) as sock: |
|
sock.sendall(f"gemini://{self.netloc}\r\n".encode()) |
|
fp = sock.makefile("rb") |
|
header = fp.readline().decode() |
|
if header: |
|
log(f"Received unexpected response {header!r}", style="failure") |
|
else: |
|
log(f"Connection closed by server", style="success") |
|
except Exception as e: |
|
# A connection error is a valid response |
|
log(f"{e!r}", style="success") |
|
|
|
|
|
class ConcurrentConnections(BaseCheck): |
|
"""Server should support concurrent connections""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.netloc}/\r\n" |
|
|
|
log(f"Attempting to establish two connections") |
|
with self.connection() as sock: |
|
log("Opening socket 1", style="info") |
|
sock.send(url[0].encode()) |
|
with self.connection() as sock2: |
|
log("Opening socket 2", style="info") |
|
sock2.sendall(url.encode()) |
|
log("Closing socket 2", style="info") |
|
sock.sendall(url[1:].encode()) |
|
log("Closing socket 1", style="info") |
|
|
|
log(f"Concurrent connections supported", style="success") |
|
|
|
|
|
class Homepage(BaseCheck): |
|
"""Request the gemini homepage""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.netloc}/\r\n" |
|
response = self.make_request(url) |
|
|
|
self.assert_success(response) |
|
|
|
log('Mime type should be "text/gemini"') |
|
style = "success" if response.mime == "text/gemini" else "failure" |
|
log(f"{response.mime!r}", style) |
|
|
|
log('Header should end with "\\r\\n"') |
|
style = "success" if response.header.endswith("\r\n") else "failure" |
|
log(f"{response.header[-2:]!r}", style) |
|
|
|
log("Body should be non-empty") |
|
style = "success" if response.body else "failure" |
|
log(f"{response.body[:50]!r}", style) |
|
|
|
log("Body should terminate with a newline") |
|
style = "success" if response.body.endswith("\n") else "failure" |
|
log(f"{response.body[-1:]!r}", style) |
|
|
|
log('Body should use "\\r\\n" line endings') |
|
bad_line = None |
|
for line in response.body.splitlines(True): |
|
if not line.endswith("\r\n"): |
|
bad_line = line |
|
break |
|
if bad_line is None: |
|
log("All lines end with '\\r\\n'", style="success") |
|
else: |
|
log(f"Invalid line ending {bad_line!r}", style="failure") |
|
|
|
|
|
class HomepageRedirect(BaseCheck): |
|
"""A URL with no trailing slash should redirect to the canonical resource""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.netloc}\r\n" |
|
response = self.make_request(url) |
|
|
|
log("Status should return code 31 (REDIRECT PERMANENT)") |
|
style = "success" if response.status == "31" else "failure" |
|
log(f"{response.status!r}", style) |
|
|
|
log('Meta should redirect to location "gemini://[hostname]/"') |
|
style = "success" if response.meta == f"gemini://{self.netloc}/" else "failure" |
|
log(f"{response.meta!r}", style) |
|
|
|
log('Header should end with "\\r\\n"') |
|
style = "success" if response.header.endswith("\r\n") else "failure" |
|
log(f"{response.header[-2:]!r}", style) |
|
|
|
log("Body should be empty") |
|
style = "success" if response.body == "" else "failure" |
|
log(f"{response.body[:50]!r}", style) |
|
|
|
|
|
class PageNotFound(BaseCheck): |
|
"""Request a gemini URL that does not exist""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n" |
|
response = self.make_request(url) |
|
|
|
log("Status should return code 51 (NOT FOUND)") |
|
style = "success" if response.status == "51" else "failure" |
|
log(f"{response.status!r}", style) |
|
|
|
log('Header should end with "\\r\\n"') |
|
style = "success" if response.header.endswith("\r\n") else "failure" |
|
log(f"{response.header[-2:]!r}", style) |
|
|
|
log("Body should be empty") |
|
style = "success" if response.body == "" else "failure" |
|
log(f"{response.body[:50]!r}", style) |
|
|
|
|
|
class RequestMissingCR(BaseCheck): |
|
"""A request without a <CR> should timeout""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.netloc}/\n" |
|
try: |
|
response = self.make_request(url) |
|
except Exception as e: |
|
log("No response should be received") |
|
log(f"{e}", style="success") |
|
else: |
|
log("No response should be received") |
|
log(f"{response.status!r}", style="failure") |
|
|
|
|
|
class URLIncludePort(BaseCheck): |
|
"""Send the URL with the port explicitly defined""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.args.host}:{self.args.port}/\r\n" |
|
response = self.make_request(url) |
|
self.assert_success(response) |
|
|
|
|
|
class URLSchemeMissing(BaseCheck): |
|
"""A URL without a scheme should be inferred as gemini""" |
|
|
|
def check(self) -> None: |
|
url = f"//{self.netloc}/\r\n" |
|
response = self.make_request(url) |
|
self.assert_success(response) |
|
|
|
|
|
class URLByIPAddress(BaseCheck): |
|
"""Send the URL using the IPv4 address""" |
|
|
|
def check(self) -> None: |
|
addr = self.resolve_host(socket.AF_INET) |
|
url = f"gemini://{addr[0]}:{addr[1]}/\r\n" |
|
response = self.make_request(url) |
|
|
|
log("Verify that the status matches your desired behavior") |
|
log(f"{response.status!r}", style="info") |
|
|
|
|
|
class URLInvalidUTF8Byte(BaseCheck): |
|
"""Send a URL containing a non-UTF8 byte sequence""" |
|
|
|
def check(self) -> None: |
|
non_utf8_character = "\udcdc" # Surrogate-escaped byte sequence |
|
url = f"gemini://{self.netloc}/{non_utf8_character}\r\n" |
|
|
|
try: |
|
response = self.make_request(url) |
|
except Exception: |
|
response = None |
|
|
|
log("Connection should either drop, or return 59 (BAD REQUEST)") |
|
if response is None: |
|
log("Connection closed without response", style="success") |
|
else: |
|
style = "success" if response.status == "59" else "failure" |
|
log(f"{response.status!r}", style) |
|
|
|
|
|
class URLMaxSize(BaseCheck): |
|
"""Send a 1024 byte URL, the maximum allowed size""" |
|
|
|
def check(self) -> None: |
|
# Per the spec, the <CR><LF> are not included in the total size |
|
base_url = f"gemini://{self.netloc}/" |
|
buffer = "0" * (1024 - len(base_url.encode("utf-8"))) |
|
url = base_url + buffer + "\r\n" |
|
|
|
response = self.make_request(url) |
|
log("Status should return code 51 (NOT FOUND)") |
|
style = "success" if response.status == "51" else "failure" |
|
log(f"{response.status!r}", style) |
|
|
|
|
|
class URLAboveMaxSize(BaseCheck): |
|
"""Send a 1025 byte URL, above the maximum allowed size""" |
|
|
|
def check(self) -> None: |
|
# Per the spec, the <CR><LF> are not included in the total size |
|
base_url = f"gemini://{self.netloc}/" |
|
buffer = "0" * (1025 - len(base_url.encode("utf-8"))) |
|
url = base_url + buffer + "\r\n" |
|
|
|
try: |
|
response = self.make_request(url) |
|
except Exception: |
|
response = None |
|
|
|
log("Connection should either drop, or return 59 (BAD REQUEST)") |
|
if response is None: |
|
log("Connection closed without response", style="success") |
|
else: |
|
style = "success" if response.status == "59" else "failure" |
|
log(f"{response.status!r}", style) |
|
|
|
|
|
class URLWrongPort(BaseCheck): |
|
"""A URL with an incorrect port number should be rejected""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.args.host}:443/\r\n" |
|
response = self.make_request(url) |
|
self.assert_proxy_refused(response) |
|
|
|
|
|
class URLWrongHost(BaseCheck): |
|
"""A URL with a foreign hostname should be rejected""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://wikipedia.org/\r\n" |
|
response = self.make_request(url) |
|
self.assert_proxy_refused(response) |
|
|
|
|
|
class URLSchemeHTTP(BaseCheck): |
|
"""Send a URL with an HTTP scheme""" |
|
|
|
def check(self) -> None: |
|
url = f"http://{self.netloc}/\r\n" |
|
response = self.make_request(url) |
|
self.assert_proxy_refused(response) |
|
|
|
|
|
class URLSchemeHTTPS(BaseCheck): |
|
"""Send a URL with an HTTPS scheme""" |
|
|
|
def check(self) -> None: |
|
url = f"https://{self.netloc}/\r\n" |
|
response = self.make_request(url) |
|
self.assert_proxy_refused(response) |
|
|
|
|
|
class URLSchemeGopher(BaseCheck): |
|
"""Send a URL with a Gopher scheme""" |
|
|
|
def check(self) -> None: |
|
url = f"gopher://{self.netloc}/\r\n" |
|
response = self.make_request(url) |
|
self.assert_proxy_refused(response) |
|
|
|
|
|
class URLEmpty(BaseCheck): |
|
"""Empty URLs should not be accepted by the server""" |
|
|
|
def check(self) -> None: |
|
url = f"\r\n" |
|
response = self.make_request(url) |
|
self.assert_bad_request(response) |
|
|
|
|
|
class URLRelative(BaseCheck): |
|
"""Relative URLs should not be accepted by the server""" |
|
|
|
def check(self) -> None: |
|
url = f"/\r\n" |
|
response = self.make_request(url) |
|
self.assert_bad_request(response) |
|
|
|
|
|
class URLInvalid(BaseCheck): |
|
"""Random text should not be accepted by the server""" |
|
|
|
def check(self) -> None: |
|
url = f"Hello Gemini!\r\n" |
|
response = self.make_request(url) |
|
self.assert_bad_request(response) |
|
|
|
|
|
class URLDotEscape(BaseCheck): |
|
"""A URL should not be able to escape the root using dot notation""" |
|
|
|
def check(self) -> None: |
|
url = f"gemini://{self.netloc}/../../\r\n" |
|
response = self.make_request(url) |
|
self.assert_permanent_failure(response) |
|
|
|
|
|
# TODO: Test sending a transient client certificate |
|
# TODO: Test with client pinned to TLS v1.1 |
|
CHECKS = [ |
|
IPv4Address, |
|
IPv6Address, |
|
TLSVersion, |
|
TLSClaims, |
|
TLSVerified, |
|
TLSRequired, |
|
ConcurrentConnections, |
|
Homepage, |
|
HomepageRedirect, |
|
PageNotFound, |
|
RequestMissingCR, |
|
URLIncludePort, |
|
URLSchemeMissing, |
|
URLByIPAddress, |
|
URLInvalidUTF8Byte, |
|
URLMaxSize, |
|
URLAboveMaxSize, |
|
URLWrongPort, |
|
URLWrongHost, |
|
URLSchemeHTTP, |
|
URLSchemeHTTPS, |
|
URLSchemeGopher, |
|
URLEmpty, |
|
URLRelative, |
|
URLInvalid, |
|
URLDotEscape, |
|
] |
|
|
|
|
|
def build_epilog(): |
|
epilog = ["list of checks:"] |
|
for check in CHECKS: |
|
epilog.append(colorize(f" [{check.__name__}]", A_BOLD)) |
|
epilog.append(f" {check.__doc__}") |
|
return "\n".join(epilog) |
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
usage="%(prog)s host [port] [--help]", |
|
description=__doc__, |
|
epilog=build_epilog(), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
parser.add_argument("host", help="server hostname to connect to") |
|
parser.add_argument( |
|
"port", |
|
nargs="?", |
|
type=int, |
|
default=1965, |
|
help="server port to connect to (default: 1965)", |
|
) |
|
parser.add_argument("--checks", help="comma separated list of checks to apply") |
|
parser.add_argument( |
|
"--delay", |
|
type=float, |
|
default=2, |
|
help="seconds to sleep between checks (default: 2)", |
|
) |
|
|
|
|
|
def run(): |
|
args = parser.parse_args() |
|
if args.checks: |
|
check_names = {cls.__name__: cls for cls in CHECKS} |
|
check_list = [] |
|
for name in args.checks.split(","): |
|
name = name.strip() |
|
if name not in check_names: |
|
raise ValueError(f"unknown check {name!r}") |
|
check_list.append(check_names[name]) |
|
else: |
|
check_list = CHECKS |
|
|
|
log(f"Running gemini server diagnostics check against {args.host}:{args.port}") |
|
log("...\n") |
|
for check in check_list: |
|
time.sleep(args.delay) |
|
check(args).run() |
|
log("Done!") |
|
|
|
|
|
if __name__ == "__main__": |
|
run()
|
|
|