diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index 2d126a43..0b19cd5e 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -67,6 +67,7 @@ def __init__(self, host: str = '', port: int = 22) -> None: self.conn_rate_test_enabled = False self.conn_rate_test_threads = 0 self.conn_rate_test_target_rate = 0 + self.socks_proxy: Optional[str] = None # SOCKS5 proxy in "host:port" format def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[int]]) -> None: @@ -95,7 +96,7 @@ def __setattr__(self, name: str, value: Union[str, int, float, bool, Sequence[in if value == -1.0: raise ValueError('invalid timeout: {}'.format(value)) valid = True - elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test']: + elif name in ['ip_version_preference', 'lookup', 'policy_file', 'policy', 'target_file', 'target_list', 'gex_test', 'socks_proxy']: valid = True elif name == "threads": valid, num_threads = True, Utils.parse_int(value) diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 2624c465..6992d1fb 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -794,6 +794,7 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p parser.add_argument("--list-hardening-guides", action="store_true", dest="list_hardening_guides", default=False, help="list all official, built-in hardening guides for common systems. Their full names can then be passed to --get-hardening-guide. Add -v to this option to view hardening guide change logs and prior versions.") parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.") parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)") + parser.add_argument("--socks", action="store", dest="socks_proxy", metavar="host:port", type=str, default=None, help="connect via a SOCKS5 proxy (e.g. 127.0.0.1:1080)") parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)") @@ -819,6 +820,18 @@ def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # p aconf.skip_rate_test = argument.skip_rate_test oport = argument.oport + if argument.socks_proxy is not None: + # Validate format: must be "host:port" + socks_parts = argument.socks_proxy.rsplit(':', 1) + if len(socks_parts) != 2 or not socks_parts[1].isdigit(): + out.fail("--socks must be in host:port format (e.g. 127.0.0.1:1080)", write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) + socks_port = int(socks_parts[1]) + if socks_port < 1 or socks_port > 65535: + out.fail("SOCKS proxy port must be between 1 and 65535", write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) + aconf.socks_proxy = argument.socks_proxy + if argument.batch is True: aconf.batch = True @@ -1144,7 +1157,7 @@ def audit(out: OutputBuffer, aconf: AuditConf, print_target: bool = False) -> in out.debug = aconf.debug out.level = aconf.level out.use_colors = aconf.colors - s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set) + s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set, aconf.socks_proxy) if aconf.client_audit: out.v("Listening for client connection on port %d..." % aconf.port, write_now=True) diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index 97a0e942..3874fde5 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -51,7 +51,7 @@ class InsufficientReadException(Exception): SM_BANNER_SENT = 1 - def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False) -> None: # pylint: disable=dangerous-default-value + def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False, socks_proxy: Optional[str] = None) -> None: # pylint: disable=dangerous-default-value super(SSH_Socket, self).__init__() self.__outputbuffer = outputbuffer self.__sock: Optional[socket.socket] = None @@ -72,6 +72,7 @@ def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, self.__timeout_set = timeout_set self.client_host: Optional[str] = None self.client_port = None + self.__socks_proxy = socks_proxy # SOCKS5 proxy in "host:port" format, or None def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]: """Resolves a hostname into a list of IPs @@ -154,6 +155,10 @@ def connect(self) -> Optional[str]: '''Returns None on success, or an error string.''' err = None s = None + + if self.__socks_proxy is not None: + return self._connect_via_socks5() + try: for af, addr in self._resolve(): s = socket.socket(af, socket.SOCK_STREAM) @@ -172,6 +177,88 @@ def connect(self) -> Optional[str]: errm = 'cannot connect to {} port {}: {}'.format(*errt) return '[exception] {}'.format(errm) + def _connect_via_socks5(self) -> Optional[str]: + '''Connect to the target host:port via a SOCKS5 proxy. Returns None on success, or an error string.''' + assert self.__socks_proxy is not None + proxy_parts = self.__socks_proxy.rsplit(':', 1) + proxy_host = proxy_parts[0] + proxy_port = int(proxy_parts[1]) + + s = None + try: + self.__outputbuffer.d("Connecting to SOCKS5 proxy %s:%d..." % (proxy_host, proxy_port), write_now=True) + s = socket.create_connection((proxy_host, proxy_port), timeout=self.__timeout) + + # SOCKS5 greeting: version=5, nmethods=1, method=0 (no auth) + s.sendall(b'\x05\x01\x00') + resp = self._socks5_recv_exact(s, 2) + if resp is None: + raise socket.error("no response from SOCKS5 proxy during handshake") + if resp[0] != 5: + raise socket.error("SOCKS5 proxy returned unexpected version: {}".format(resp[0])) + if resp[1] == 0xff: + raise socket.error("SOCKS5 proxy rejected all authentication methods") + if resp[1] != 0: + raise socket.error("SOCKS5 proxy requires authentication (method {:d}), but only no-auth is supported".format(resp[1])) + + # SOCKS5 connect request: version=5, cmd=1 (connect), rsv=0, atyp=3 (domain name) + host_bytes = self.__host.encode('idna') + request = struct.pack('!BBBB', 5, 1, 0, 3) + struct.pack('!B', len(host_bytes)) + host_bytes + struct.pack('!H', self.__port) + self.__outputbuffer.d("Requesting SOCKS5 proxy to connect to %s:%d..." % (self.__host, self.__port), write_now=True) + s.sendall(request) + + # Read the fixed part of the response (4 bytes: ver, rep, rsv, atyp) + hdr = self._socks5_recv_exact(s, 4) + if hdr is None: + raise socket.error("no response from SOCKS5 proxy during connect") + if hdr[0] != 5: + raise socket.error("SOCKS5 proxy returned unexpected version in connect response: {}".format(hdr[0])) + if hdr[1] != 0: + socks5_errors = { + 1: "general SOCKS server failure", + 2: "connection not allowed by ruleset", + 3: "network unreachable", + 4: "host unreachable", + 5: "connection refused", + 6: "TTL expired", + 7: "command not supported", + 8: "address type not supported", + } + msg = socks5_errors.get(hdr[1], "unknown error {:d}".format(hdr[1])) + raise socket.error("SOCKS5 proxy connect failed: {}".format(msg)) + + # Read and discard the bound address from the response + atyp = hdr[3] + if atyp == 1: # IPv4 + self._socks5_recv_exact(s, 4 + 2) + elif atyp == 4: # IPv6 + self._socks5_recv_exact(s, 16 + 2) + elif atyp == 3: # domain name + alen_data = self._socks5_recv_exact(s, 1) + if alen_data is None: + raise socket.error("truncated SOCKS5 response") + self._socks5_recv_exact(s, alen_data[0] + 2) + else: + raise socket.error("SOCKS5 proxy returned unknown address type: {}".format(atyp)) + + self.__sock = s + return None + + except socket.error as e: + self._close_socket(s) + return '[exception] cannot connect via SOCKS5 proxy to {} port {}: {}'.format(self.__host, self.__port, e) + + @staticmethod + def _socks5_recv_exact(s: socket.socket, n: int) -> Optional[bytes]: + '''Read exactly n bytes from socket s, returning None on EOF.''' + buf = b'' + while len(buf) < n: + chunk = s.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf + def get_banner(self) -> Tuple[Optional['Banner'], List[str], Optional[str]]: self.__outputbuffer.d('Getting banner...', write_now=True)