1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3#*************************************************************************** 4# _ _ ____ _ 5# Project ___| | | | _ \| | 6# / __| | | | |_) | | 7# | (__| |_| | _ <| |___ 8# \___|\___/|_| \_\_____| 9# 10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 11# 12# This software is licensed as described in the file COPYING, which 13# you should have received as part of this distribution. The terms 14# are also available at https://curl.se/docs/copyright.html. 15# 16# You may opt to use, copy, modify, merge, publish, distribute and/or sell 17# copies of the Software, and permit persons to whom the Software is 18# furnished to do so, under the terms of the COPYING file. 19# 20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21# KIND, either express or implied. 22# 23# SPDX-License-Identifier: curl 24# 25########################################################################### 26# 27import logging 28import os 29import re 30import shutil 31import socket 32import subprocess 33import sys 34from configparser import ConfigParser, ExtendedInterpolation 35from datetime import timedelta 36from typing import Optional 37 38import pytest 39 40from .certs import CertificateSpec, TestCA, Credentials 41from .ports import alloc_ports 42 43 44log = logging.getLogger(__name__) 45 46 47def init_config_from(conf_path): 48 if os.path.isfile(conf_path): 49 config = ConfigParser(interpolation=ExtendedInterpolation()) 50 config.read(conf_path) 51 return config 52 return None 53 54 55TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__)) 56DEF_CONFIG = init_config_from(os.path.join(TESTS_HTTPD_PATH, 'config.ini')) 57 58TOP_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH)) 59CURL = os.path.join(TOP_PATH, 'src/curl') 60 61 62class EnvConfig: 63 64 def __init__(self): 65 self.tests_dir = TESTS_HTTPD_PATH 66 self.gen_dir = os.path.join(self.tests_dir, 'gen') 67 self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir)) 68 self.config = DEF_CONFIG 69 # check cur and its features 70 self.curl = CURL 71 if 'CURL' in os.environ: 72 self.curl = os.environ['CURL'] 73 self.curl_props = { 74 'version': None, 75 'os': None, 76 'fullname': None, 77 'features': [], 78 'protocols': [], 79 'libs': [], 80 'lib_versions': [], 81 } 82 self.curl_is_debug = False 83 self.curl_protos = [] 84 p = subprocess.run(args=[self.curl, '-V'], 85 capture_output=True, text=True) 86 if p.returncode != 0: 87 assert False, f'{self.curl} -V failed with exit code: {p.returncode}' 88 if p.stderr.startswith('WARNING:'): 89 self.curl_is_debug = True 90 for l in p.stdout.splitlines(keepends=False): 91 if l.startswith('curl '): 92 m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l) 93 if m: 94 self.curl_props['fullname'] = m.group(0) 95 self.curl_props['version'] = m.group('version') 96 self.curl_props['os'] = m.group('os') 97 self.curl_props['lib_versions'] = [ 98 lib.lower() for lib in m.group('libs').split(' ') 99 ] 100 self.curl_props['libs'] = [ 101 re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions'] 102 ] 103 if l.startswith('Features: '): 104 self.curl_props['features'] = [ 105 feat.lower() for feat in l[10:].split(' ') 106 ] 107 if l.startswith('Protocols: '): 108 self.curl_props['protocols'] = [ 109 prot.lower() for prot in l[11:].split(' ') 110 ] 111 112 self.ports = alloc_ports(port_specs={ 113 'ftp': socket.SOCK_STREAM, 114 'ftps': socket.SOCK_STREAM, 115 'http': socket.SOCK_STREAM, 116 'https': socket.SOCK_STREAM, 117 'proxy': socket.SOCK_STREAM, 118 'proxys': socket.SOCK_STREAM, 119 'h2proxys': socket.SOCK_STREAM, 120 'caddy': socket.SOCK_STREAM, 121 'caddys': socket.SOCK_STREAM, 122 'ws': socket.SOCK_STREAM, 123 }) 124 self.httpd = self.config['httpd']['httpd'] 125 self.apachectl = self.config['httpd']['apachectl'] 126 self.apxs = self.config['httpd']['apxs'] 127 if len(self.apxs) == 0: 128 self.apxs = None 129 self._httpd_version = None 130 131 self.examples_pem = { 132 'key': 'xxx', 133 'cert': 'xxx', 134 } 135 self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs') 136 self.tld = 'http.curl.se' 137 self.domain1 = f"one.{self.tld}" 138 self.domain1brotli = f"brotli.one.{self.tld}" 139 self.domain2 = f"two.{self.tld}" 140 self.ftp_domain = f"ftp.{self.tld}" 141 self.proxy_domain = f"proxy.{self.tld}" 142 self.cert_specs = [ 143 CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'), 144 CertificateSpec(domains=[self.domain2], key_type='rsa2048'), 145 CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'), 146 CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'), 147 CertificateSpec(name="clientsX", sub_specs=[ 148 CertificateSpec(name="user1", client=True), 149 ]), 150 ] 151 152 self.nghttpx = self.config['nghttpx']['nghttpx'] 153 if len(self.nghttpx.strip()) == 0: 154 self.nghttpx = None 155 self._nghttpx_version = None 156 self.nghttpx_with_h3 = False 157 if self.nghttpx is not None: 158 p = subprocess.run(args=[self.nghttpx, '-v'], 159 capture_output=True, text=True) 160 if p.returncode != 0: 161 # not a working nghttpx 162 self.nghttpx = None 163 else: 164 self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip()) 165 self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None 166 log.debug(f'nghttpx -v: {p.stdout}') 167 168 self.caddy = self.config['caddy']['caddy'] 169 self._caddy_version = None 170 if len(self.caddy.strip()) == 0: 171 self.caddy = None 172 if self.caddy is not None: 173 try: 174 p = subprocess.run(args=[self.caddy, 'version'], 175 capture_output=True, text=True) 176 if p.returncode != 0: 177 # not a working caddy 178 self.caddy = None 179 m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout) 180 if m: 181 self._caddy_version = m.group(1) 182 else: 183 raise f'Unable to determine cadd version from: {p.stdout}' 184 except: 185 self.caddy = None 186 187 self.vsftpd = self.config['vsftpd']['vsftpd'] 188 self._vsftpd_version = None 189 if self.vsftpd is not None: 190 try: 191 p = subprocess.run(args=[self.vsftpd, '-v'], 192 capture_output=True, text=True) 193 if p.returncode != 0: 194 # not a working vsftpd 195 self.vsftpd = None 196 m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', p.stderr) 197 if m: 198 self._vsftpd_version = m.group(1) 199 elif len(p.stderr) == 0: 200 # vsftp does not use stdout or stderr for printing its version... -.- 201 self._vsftpd_version = 'unknown' 202 else: 203 raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}') 204 except Exception as e: 205 self.vsftpd = None 206 207 self._tcpdump = shutil.which('tcpdump') 208 209 @property 210 def httpd_version(self): 211 if self._httpd_version is None and self.apxs is not None: 212 try: 213 p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'], 214 capture_output=True, text=True) 215 if p.returncode != 0: 216 log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}') 217 else: 218 self._httpd_version = p.stdout.strip() 219 except Exception as e: 220 log.error(f'{self.apxs} failed to run: {e}') 221 return self._httpd_version 222 223 def versiontuple(self, v): 224 v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v) 225 return tuple(map(int, v.split('.'))) 226 227 def httpd_is_at_least(self, minv): 228 if self.httpd_version is None: 229 return False 230 hv = self.versiontuple(self.httpd_version) 231 return hv >= self.versiontuple(minv) 232 233 def caddy_is_at_least(self, minv): 234 if self.caddy_version is None: 235 return False 236 hv = self.versiontuple(self.caddy_version) 237 return hv >= self.versiontuple(minv) 238 239 def is_complete(self) -> bool: 240 return os.path.isfile(self.httpd) and \ 241 os.path.isfile(self.apachectl) and \ 242 self.apxs is not None and \ 243 os.path.isfile(self.apxs) 244 245 def get_incomplete_reason(self) -> Optional[str]: 246 if self.httpd is None or len(self.httpd.strip()) == 0: 247 return f'httpd not configured, see `--with-test-httpd=<path>`' 248 if not os.path.isfile(self.httpd): 249 return f'httpd ({self.httpd}) not found' 250 if not os.path.isfile(self.apachectl): 251 return f'apachectl ({self.apachectl}) not found' 252 if self.apxs is None: 253 return f"command apxs not found (commonly provided in apache2-dev)" 254 if not os.path.isfile(self.apxs): 255 return f"apxs ({self.apxs}) not found" 256 return None 257 258 @property 259 def nghttpx_version(self): 260 return self._nghttpx_version 261 262 @property 263 def caddy_version(self): 264 return self._caddy_version 265 266 @property 267 def vsftpd_version(self): 268 return self._vsftpd_version 269 270 @property 271 def tcpdmp(self) -> Optional[str]: 272 return self._tcpdump 273 274 275class Env: 276 277 CONFIG = EnvConfig() 278 279 @staticmethod 280 def setup_incomplete() -> bool: 281 return not Env.CONFIG.is_complete() 282 283 @staticmethod 284 def incomplete_reason() -> Optional[str]: 285 return Env.CONFIG.get_incomplete_reason() 286 287 @staticmethod 288 def have_nghttpx() -> bool: 289 return Env.CONFIG.nghttpx is not None 290 291 @staticmethod 292 def have_h3_server() -> bool: 293 return Env.CONFIG.nghttpx_with_h3 294 295 @staticmethod 296 def have_ssl_curl() -> bool: 297 return 'ssl' in Env.CONFIG.curl_props['features'] 298 299 @staticmethod 300 def have_h2_curl() -> bool: 301 return 'http2' in Env.CONFIG.curl_props['features'] 302 303 @staticmethod 304 def have_h3_curl() -> bool: 305 return 'http3' in Env.CONFIG.curl_props['features'] 306 307 @staticmethod 308 def curl_uses_lib(libname: str) -> bool: 309 return libname.lower() in Env.CONFIG.curl_props['libs'] 310 311 @staticmethod 312 def curl_uses_ossl_quic() -> bool: 313 if Env.have_h3_curl(): 314 return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3') 315 return False 316 317 @staticmethod 318 def curl_has_feature(feature: str) -> bool: 319 return feature.lower() in Env.CONFIG.curl_props['features'] 320 321 @staticmethod 322 def curl_has_protocol(protocol: str) -> bool: 323 return protocol.lower() in Env.CONFIG.curl_props['protocols'] 324 325 @staticmethod 326 def curl_lib_version(libname: str) -> str: 327 prefix = f'{libname.lower()}/' 328 for lversion in Env.CONFIG.curl_props['lib_versions']: 329 if lversion.startswith(prefix): 330 return lversion[len(prefix):] 331 return 'unknown' 332 333 @staticmethod 334 def curl_lib_version_at_least(libname: str, min_version) -> str: 335 lversion = Env.curl_lib_version(libname) 336 if lversion != 'unknown': 337 return Env.CONFIG.versiontuple(min_version) <= \ 338 Env.CONFIG.versiontuple(lversion) 339 return False 340 341 @staticmethod 342 def curl_os() -> str: 343 return Env.CONFIG.curl_props['os'] 344 345 @staticmethod 346 def curl_fullname() -> str: 347 return Env.CONFIG.curl_props['fullname'] 348 349 @staticmethod 350 def curl_version() -> str: 351 return Env.CONFIG.curl_props['version'] 352 353 @staticmethod 354 def curl_is_debug() -> bool: 355 return Env.CONFIG.curl_is_debug 356 357 @staticmethod 358 def have_h3() -> bool: 359 return Env.have_h3_curl() and Env.have_h3_server() 360 361 @staticmethod 362 def httpd_version() -> str: 363 return Env.CONFIG.httpd_version 364 365 @staticmethod 366 def nghttpx_version() -> str: 367 return Env.CONFIG.nghttpx_version 368 369 @staticmethod 370 def caddy_version() -> str: 371 return Env.CONFIG.caddy_version 372 373 @staticmethod 374 def caddy_is_at_least(minv) -> bool: 375 return Env.CONFIG.caddy_is_at_least(minv) 376 377 @staticmethod 378 def httpd_is_at_least(minv) -> bool: 379 return Env.CONFIG.httpd_is_at_least(minv) 380 381 @staticmethod 382 def has_caddy() -> bool: 383 return Env.CONFIG.caddy is not None 384 385 @staticmethod 386 def has_vsftpd() -> bool: 387 return Env.CONFIG.vsftpd is not None 388 389 @staticmethod 390 def vsftpd_version() -> str: 391 return Env.CONFIG.vsftpd_version 392 393 @staticmethod 394 def tcpdump() -> Optional[str]: 395 return Env.CONFIG.tcpdmp 396 397 def __init__(self, pytestconfig=None): 398 self._verbose = pytestconfig.option.verbose \ 399 if pytestconfig is not None else 0 400 self._ca = None 401 self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds 402 403 def issue_certs(self): 404 if self._ca is None: 405 ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca') 406 self._ca = TestCA.create_root(name=self.CONFIG.tld, 407 store_dir=ca_dir, 408 key_type="rsa2048") 409 self._ca.issue_certs(self.CONFIG.cert_specs) 410 411 def setup(self): 412 os.makedirs(self.gen_dir, exist_ok=True) 413 os.makedirs(self.htdocs_dir, exist_ok=True) 414 self.issue_certs() 415 416 def get_credentials(self, domain) -> Optional[Credentials]: 417 creds = self.ca.get_credentials_for_name(domain) 418 if len(creds) > 0: 419 return creds[0] 420 return None 421 422 @property 423 def verbose(self) -> int: 424 return self._verbose 425 426 @property 427 def test_timeout(self) -> Optional[float]: 428 return self._test_timeout 429 430 @test_timeout.setter 431 def test_timeout(self, val: Optional[float]): 432 self._test_timeout = val 433 434 @property 435 def gen_dir(self) -> str: 436 return self.CONFIG.gen_dir 437 438 @property 439 def project_dir(self) -> str: 440 return self.CONFIG.project_dir 441 442 @property 443 def ca(self): 444 return self._ca 445 446 @property 447 def htdocs_dir(self) -> str: 448 return self.CONFIG.htdocs_dir 449 450 @property 451 def domain1(self) -> str: 452 return self.CONFIG.domain1 453 454 @property 455 def domain1brotli(self) -> str: 456 return self.CONFIG.domain1brotli 457 458 @property 459 def domain2(self) -> str: 460 return self.CONFIG.domain2 461 462 @property 463 def ftp_domain(self) -> str: 464 return self.CONFIG.ftp_domain 465 466 @property 467 def proxy_domain(self) -> str: 468 return self.CONFIG.proxy_domain 469 470 @property 471 def http_port(self) -> int: 472 return self.CONFIG.ports['http'] 473 474 @property 475 def https_port(self) -> int: 476 return self.CONFIG.ports['https'] 477 478 @property 479 def h3_port(self) -> int: 480 return self.https_port 481 482 @property 483 def proxy_port(self) -> int: 484 return self.CONFIG.ports['proxy'] 485 486 @property 487 def proxys_port(self) -> int: 488 return self.CONFIG.ports['proxys'] 489 490 @property 491 def ftp_port(self) -> int: 492 return self.CONFIG.ports['ftp'] 493 494 @property 495 def ftps_port(self) -> int: 496 return self.CONFIG.ports['ftps'] 497 498 @property 499 def h2proxys_port(self) -> int: 500 return self.CONFIG.ports['h2proxys'] 501 502 def pts_port(self, proto: str = 'http/1.1') -> int: 503 # proxy tunnel port 504 return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys'] 505 506 @property 507 def caddy(self) -> str: 508 return self.CONFIG.caddy 509 510 @property 511 def caddy_https_port(self) -> int: 512 return self.CONFIG.ports['caddys'] 513 514 @property 515 def caddy_http_port(self) -> int: 516 return self.CONFIG.ports['caddy'] 517 518 @property 519 def vsftpd(self) -> str: 520 return self.CONFIG.vsftpd 521 522 @property 523 def ws_port(self) -> int: 524 return self.CONFIG.ports['ws'] 525 526 @property 527 def curl(self) -> str: 528 return self.CONFIG.curl 529 530 @property 531 def httpd(self) -> str: 532 return self.CONFIG.httpd 533 534 @property 535 def apachectl(self) -> str: 536 return self.CONFIG.apachectl 537 538 @property 539 def apxs(self) -> str: 540 return self.CONFIG.apxs 541 542 @property 543 def nghttpx(self) -> Optional[str]: 544 return self.CONFIG.nghttpx 545 546 @property 547 def slow_network(self) -> bool: 548 return "CURL_DBG_SOCK_WBLOCK" in os.environ or \ 549 "CURL_DBG_SOCK_WPARTIAL" in os.environ 550 551 @property 552 def ci_run(self) -> bool: 553 return "CURL_CI" in os.environ 554 555 def port_for(self, alpn_proto: Optional[str] = None): 556 if alpn_proto is None or \ 557 alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']: 558 return self.https_port 559 if alpn_proto in ['h3']: 560 return self.h3_port 561 return self.http_port 562 563 def authority_for(self, domain: str, alpn_proto: Optional[str] = None): 564 return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}' 565 566 def make_data_file(self, indir: str, fname: str, fsize: int, 567 line_length: int = 1024) -> str: 568 if line_length < 11: 569 raise 'line_length less than 11 not supported' 570 fpath = os.path.join(indir, fname) 571 s10 = "0123456789" 572 s = round((line_length / 10) + 1) * s10 573 s = s[0:line_length-11] 574 with open(fpath, 'w') as fd: 575 for i in range(int(fsize / line_length)): 576 fd.write(f"{i:09d}-{s}\n") 577 remain = int(fsize % line_length) 578 if remain != 0: 579 i = int(fsize / line_length) + 1 580 fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n") 581 return fpath 582