xref: /curl/tests/http/testenv/env.py (revision d82f9f96)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#***************************************************************************
4#                                  _   _ ____  _
5#  Project                     ___| | | |  _ \| |
6#                             / __| | | | |_) | |
7#                            | (__| |_| |  _ <| |___
8#                             \___|\___/|_| \_\_____|
9#
10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
11#
12# This software is licensed as described in the file COPYING, which
13# you should have received as part of this distribution. The terms
14# are also available at https://curl.se/docs/copyright.html.
15#
16# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17# copies of the Software, and permit persons to whom the Software is
18# furnished to do so, under the terms of the COPYING file.
19#
20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21# KIND, either express or implied.
22#
23# SPDX-License-Identifier: curl
24#
25###########################################################################
26#
27import logging
28import os
29import re
30import shutil
31import socket
32import subprocess
33import sys
34from configparser import ConfigParser, ExtendedInterpolation
35from datetime import timedelta
36from typing import Optional
37
38import pytest
39
40from .certs import CertificateSpec, TestCA, Credentials
41from .ports import alloc_ports
42
43
44log = logging.getLogger(__name__)
45
46
47def init_config_from(conf_path):
48    if os.path.isfile(conf_path):
49        config = ConfigParser(interpolation=ExtendedInterpolation())
50        config.read(conf_path)
51        return config
52    return None
53
54
55TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
56DEF_CONFIG = init_config_from(os.path.join(TESTS_HTTPD_PATH, 'config.ini'))
57
58TOP_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH))
59CURL = os.path.join(TOP_PATH, 'src/curl')
60
61
62class EnvConfig:
63
64    def __init__(self):
65        self.tests_dir = TESTS_HTTPD_PATH
66        self.gen_dir = os.path.join(self.tests_dir, 'gen')
67        self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir))
68        self.config = DEF_CONFIG
69        # check cur and its features
70        self.curl = CURL
71        if 'CURL' in os.environ:
72            self.curl = os.environ['CURL']
73        self.curl_props = {
74            'version': None,
75            'os': None,
76            'fullname': None,
77            'features': [],
78            'protocols': [],
79            'libs': [],
80            'lib_versions': [],
81        }
82        self.curl_is_debug = False
83        self.curl_protos = []
84        p = subprocess.run(args=[self.curl, '-V'],
85                           capture_output=True, text=True)
86        if p.returncode != 0:
87            assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
88        if p.stderr.startswith('WARNING:'):
89            self.curl_is_debug = True
90        for l in p.stdout.splitlines(keepends=False):
91            if l.startswith('curl '):
92                m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l)
93                if m:
94                    self.curl_props['fullname'] = m.group(0)
95                    self.curl_props['version'] = m.group('version')
96                    self.curl_props['os'] = m.group('os')
97                    self.curl_props['lib_versions'] = [
98                        lib.lower() for lib in m.group('libs').split(' ')
99                    ]
100                    self.curl_props['libs'] = [
101                        re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions']
102                    ]
103            if l.startswith('Features: '):
104                self.curl_props['features'] = [
105                    feat.lower() for feat in l[10:].split(' ')
106                ]
107            if l.startswith('Protocols: '):
108                self.curl_props['protocols'] = [
109                    prot.lower() for prot in l[11:].split(' ')
110                ]
111
112        self.ports = alloc_ports(port_specs={
113            'ftp': socket.SOCK_STREAM,
114            'ftps': socket.SOCK_STREAM,
115            'http': socket.SOCK_STREAM,
116            'https': socket.SOCK_STREAM,
117            'proxy': socket.SOCK_STREAM,
118            'proxys': socket.SOCK_STREAM,
119            'h2proxys': socket.SOCK_STREAM,
120            'caddy': socket.SOCK_STREAM,
121            'caddys': socket.SOCK_STREAM,
122            'ws': socket.SOCK_STREAM,
123        })
124        self.httpd = self.config['httpd']['httpd']
125        self.apachectl = self.config['httpd']['apachectl']
126        self.apxs = self.config['httpd']['apxs']
127        if len(self.apxs) == 0:
128            self.apxs = None
129        self._httpd_version = None
130
131        self.examples_pem = {
132            'key': 'xxx',
133            'cert': 'xxx',
134        }
135        self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
136        self.tld = 'http.curl.se'
137        self.domain1 = f"one.{self.tld}"
138        self.domain1brotli = f"brotli.one.{self.tld}"
139        self.domain2 = f"two.{self.tld}"
140        self.ftp_domain = f"ftp.{self.tld}"
141        self.proxy_domain = f"proxy.{self.tld}"
142        self.cert_specs = [
143            CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
144            CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
145            CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
146            CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),
147            CertificateSpec(name="clientsX", sub_specs=[
148               CertificateSpec(name="user1", client=True),
149            ]),
150        ]
151
152        self.nghttpx = self.config['nghttpx']['nghttpx']
153        if len(self.nghttpx.strip()) == 0:
154            self.nghttpx = None
155        self._nghttpx_version = None
156        self.nghttpx_with_h3 = False
157        if self.nghttpx is not None:
158            p = subprocess.run(args=[self.nghttpx, '-v'],
159                               capture_output=True, text=True)
160            if p.returncode != 0:
161                # not a working nghttpx
162                self.nghttpx = None
163            else:
164                self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip())
165                self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None
166                log.debug(f'nghttpx -v: {p.stdout}')
167
168        self.caddy = self.config['caddy']['caddy']
169        self._caddy_version = None
170        if len(self.caddy.strip()) == 0:
171            self.caddy = None
172        if self.caddy is not None:
173            try:
174                p = subprocess.run(args=[self.caddy, 'version'],
175                                   capture_output=True, text=True)
176                if p.returncode != 0:
177                    # not a working caddy
178                    self.caddy = None
179                m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout)
180                if m:
181                    self._caddy_version = m.group(1)
182                else:
183                    raise f'Unable to determine cadd version from: {p.stdout}'
184            except:
185                self.caddy = None
186
187        self.vsftpd = self.config['vsftpd']['vsftpd']
188        self._vsftpd_version = None
189        if self.vsftpd is not None:
190            try:
191                p = subprocess.run(args=[self.vsftpd, '-v'],
192                                   capture_output=True, text=True)
193                if p.returncode != 0:
194                    # not a working vsftpd
195                    self.vsftpd = None
196                m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', p.stderr)
197                if m:
198                    self._vsftpd_version = m.group(1)
199                elif len(p.stderr) == 0:
200                    # vsftp does not use stdout or stderr for printing its version... -.-
201                    self._vsftpd_version = 'unknown'
202                else:
203                    raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
204            except Exception as e:
205                self.vsftpd = None
206
207        self._tcpdump = shutil.which('tcpdump')
208
209    @property
210    def httpd_version(self):
211        if self._httpd_version is None and self.apxs is not None:
212            try:
213                p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'],
214                                   capture_output=True, text=True)
215                if p.returncode != 0:
216                    log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
217                else:
218                    self._httpd_version = p.stdout.strip()
219            except Exception as e:
220                log.error(f'{self.apxs} failed to run: {e}')
221        return self._httpd_version
222
223    def versiontuple(self, v):
224        v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v)
225        return tuple(map(int, v.split('.')))
226
227    def httpd_is_at_least(self, minv):
228        if self.httpd_version is None:
229            return False
230        hv = self.versiontuple(self.httpd_version)
231        return hv >= self.versiontuple(minv)
232
233    def caddy_is_at_least(self, minv):
234        if self.caddy_version is None:
235            return False
236        hv = self.versiontuple(self.caddy_version)
237        return hv >= self.versiontuple(minv)
238
239    def is_complete(self) -> bool:
240        return os.path.isfile(self.httpd) and \
241               os.path.isfile(self.apachectl) and \
242               self.apxs is not None and \
243               os.path.isfile(self.apxs)
244
245    def get_incomplete_reason(self) -> Optional[str]:
246        if self.httpd is None or len(self.httpd.strip()) == 0:
247            return f'httpd not configured, see `--with-test-httpd=<path>`'
248        if not os.path.isfile(self.httpd):
249            return f'httpd ({self.httpd}) not found'
250        if not os.path.isfile(self.apachectl):
251            return f'apachectl ({self.apachectl}) not found'
252        if self.apxs is None:
253            return f"command apxs not found (commonly provided in apache2-dev)"
254        if not os.path.isfile(self.apxs):
255            return f"apxs ({self.apxs}) not found"
256        return None
257
258    @property
259    def nghttpx_version(self):
260        return self._nghttpx_version
261
262    @property
263    def caddy_version(self):
264        return self._caddy_version
265
266    @property
267    def vsftpd_version(self):
268        return self._vsftpd_version
269
270    @property
271    def tcpdmp(self) -> Optional[str]:
272        return self._tcpdump
273
274
275class Env:
276
277    CONFIG = EnvConfig()
278
279    @staticmethod
280    def setup_incomplete() -> bool:
281        return not Env.CONFIG.is_complete()
282
283    @staticmethod
284    def incomplete_reason() -> Optional[str]:
285        return Env.CONFIG.get_incomplete_reason()
286
287    @staticmethod
288    def have_nghttpx() -> bool:
289        return Env.CONFIG.nghttpx is not None
290
291    @staticmethod
292    def have_h3_server() -> bool:
293        return Env.CONFIG.nghttpx_with_h3
294
295    @staticmethod
296    def have_ssl_curl() -> bool:
297        return 'ssl' in Env.CONFIG.curl_props['features']
298
299    @staticmethod
300    def have_h2_curl() -> bool:
301        return 'http2' in Env.CONFIG.curl_props['features']
302
303    @staticmethod
304    def have_h3_curl() -> bool:
305        return 'http3' in Env.CONFIG.curl_props['features']
306
307    @staticmethod
308    def curl_uses_lib(libname: str) -> bool:
309        return libname.lower() in Env.CONFIG.curl_props['libs']
310
311    @staticmethod
312    def curl_uses_ossl_quic() -> bool:
313        if Env.have_h3_curl():
314            return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3')
315        return False
316
317    @staticmethod
318    def curl_has_feature(feature: str) -> bool:
319        return feature.lower() in Env.CONFIG.curl_props['features']
320
321    @staticmethod
322    def curl_has_protocol(protocol: str) -> bool:
323        return protocol.lower() in Env.CONFIG.curl_props['protocols']
324
325    @staticmethod
326    def curl_lib_version(libname: str) -> str:
327        prefix = f'{libname.lower()}/'
328        for lversion in Env.CONFIG.curl_props['lib_versions']:
329            if lversion.startswith(prefix):
330                return lversion[len(prefix):]
331        return 'unknown'
332
333    @staticmethod
334    def curl_lib_version_at_least(libname: str, min_version) -> str:
335        lversion = Env.curl_lib_version(libname)
336        if lversion != 'unknown':
337            return Env.CONFIG.versiontuple(min_version) <= \
338                   Env.CONFIG.versiontuple(lversion)
339        return False
340
341    @staticmethod
342    def curl_os() -> str:
343        return Env.CONFIG.curl_props['os']
344
345    @staticmethod
346    def curl_fullname() -> str:
347        return Env.CONFIG.curl_props['fullname']
348
349    @staticmethod
350    def curl_version() -> str:
351        return Env.CONFIG.curl_props['version']
352
353    @staticmethod
354    def curl_is_debug() -> bool:
355        return Env.CONFIG.curl_is_debug
356
357    @staticmethod
358    def have_h3() -> bool:
359        return Env.have_h3_curl() and Env.have_h3_server()
360
361    @staticmethod
362    def httpd_version() -> str:
363        return Env.CONFIG.httpd_version
364
365    @staticmethod
366    def nghttpx_version() -> str:
367        return Env.CONFIG.nghttpx_version
368
369    @staticmethod
370    def caddy_version() -> str:
371        return Env.CONFIG.caddy_version
372
373    @staticmethod
374    def caddy_is_at_least(minv) -> bool:
375        return Env.CONFIG.caddy_is_at_least(minv)
376
377    @staticmethod
378    def httpd_is_at_least(minv) -> bool:
379        return Env.CONFIG.httpd_is_at_least(minv)
380
381    @staticmethod
382    def has_caddy() -> bool:
383        return Env.CONFIG.caddy is not None
384
385    @staticmethod
386    def has_vsftpd() -> bool:
387        return Env.CONFIG.vsftpd is not None
388
389    @staticmethod
390    def vsftpd_version() -> str:
391        return Env.CONFIG.vsftpd_version
392
393    @staticmethod
394    def tcpdump() -> Optional[str]:
395        return Env.CONFIG.tcpdmp
396
397    def __init__(self, pytestconfig=None):
398        self._verbose = pytestconfig.option.verbose \
399            if pytestconfig is not None else 0
400        self._ca = None
401        self._test_timeout = 300.0 if self._verbose > 1 else 60.0  # seconds
402
403    def issue_certs(self):
404        if self._ca is None:
405            ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca')
406            self._ca = TestCA.create_root(name=self.CONFIG.tld,
407                                          store_dir=ca_dir,
408                                          key_type="rsa2048")
409        self._ca.issue_certs(self.CONFIG.cert_specs)
410
411    def setup(self):
412        os.makedirs(self.gen_dir, exist_ok=True)
413        os.makedirs(self.htdocs_dir, exist_ok=True)
414        self.issue_certs()
415
416    def get_credentials(self, domain) -> Optional[Credentials]:
417        creds = self.ca.get_credentials_for_name(domain)
418        if len(creds) > 0:
419            return creds[0]
420        return None
421
422    @property
423    def verbose(self) -> int:
424        return self._verbose
425
426    @property
427    def test_timeout(self) -> Optional[float]:
428        return self._test_timeout
429
430    @test_timeout.setter
431    def test_timeout(self, val: Optional[float]):
432        self._test_timeout = val
433
434    @property
435    def gen_dir(self) -> str:
436        return self.CONFIG.gen_dir
437
438    @property
439    def project_dir(self) -> str:
440        return self.CONFIG.project_dir
441
442    @property
443    def ca(self):
444        return self._ca
445
446    @property
447    def htdocs_dir(self) -> str:
448        return self.CONFIG.htdocs_dir
449
450    @property
451    def domain1(self) -> str:
452        return self.CONFIG.domain1
453
454    @property
455    def domain1brotli(self) -> str:
456        return self.CONFIG.domain1brotli
457
458    @property
459    def domain2(self) -> str:
460        return self.CONFIG.domain2
461
462    @property
463    def ftp_domain(self) -> str:
464        return self.CONFIG.ftp_domain
465
466    @property
467    def proxy_domain(self) -> str:
468        return self.CONFIG.proxy_domain
469
470    @property
471    def http_port(self) -> int:
472        return self.CONFIG.ports['http']
473
474    @property
475    def https_port(self) -> int:
476        return self.CONFIG.ports['https']
477
478    @property
479    def h3_port(self) -> int:
480        return self.https_port
481
482    @property
483    def proxy_port(self) -> int:
484        return self.CONFIG.ports['proxy']
485
486    @property
487    def proxys_port(self) -> int:
488        return self.CONFIG.ports['proxys']
489
490    @property
491    def ftp_port(self) -> int:
492        return self.CONFIG.ports['ftp']
493
494    @property
495    def ftps_port(self) -> int:
496        return self.CONFIG.ports['ftps']
497
498    @property
499    def h2proxys_port(self) -> int:
500        return self.CONFIG.ports['h2proxys']
501
502    def pts_port(self, proto: str = 'http/1.1') -> int:
503        # proxy tunnel port
504        return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys']
505
506    @property
507    def caddy(self) -> str:
508        return self.CONFIG.caddy
509
510    @property
511    def caddy_https_port(self) -> int:
512        return self.CONFIG.ports['caddys']
513
514    @property
515    def caddy_http_port(self) -> int:
516        return self.CONFIG.ports['caddy']
517
518    @property
519    def vsftpd(self) -> str:
520        return self.CONFIG.vsftpd
521
522    @property
523    def ws_port(self) -> int:
524        return self.CONFIG.ports['ws']
525
526    @property
527    def curl(self) -> str:
528        return self.CONFIG.curl
529
530    @property
531    def httpd(self) -> str:
532        return self.CONFIG.httpd
533
534    @property
535    def apachectl(self) -> str:
536        return self.CONFIG.apachectl
537
538    @property
539    def apxs(self) -> str:
540        return self.CONFIG.apxs
541
542    @property
543    def nghttpx(self) -> Optional[str]:
544        return self.CONFIG.nghttpx
545
546    @property
547    def slow_network(self) -> bool:
548        return "CURL_DBG_SOCK_WBLOCK" in os.environ or \
549               "CURL_DBG_SOCK_WPARTIAL" in os.environ
550
551    @property
552    def ci_run(self) -> bool:
553        return "CURL_CI" in os.environ
554
555    def port_for(self, alpn_proto: Optional[str] = None):
556        if alpn_proto is None or \
557                alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
558            return self.https_port
559        if alpn_proto in ['h3']:
560            return self.h3_port
561        return self.http_port
562
563    def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
564        return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}'
565
566    def make_data_file(self, indir: str, fname: str, fsize: int,
567                       line_length: int = 1024) -> str:
568        if line_length < 11:
569            raise 'line_length less than 11 not supported'
570        fpath = os.path.join(indir, fname)
571        s10 = "0123456789"
572        s = round((line_length / 10) + 1) * s10
573        s = s[0:line_length-11]
574        with open(fpath, 'w') as fd:
575            for i in range(int(fsize / line_length)):
576                fd.write(f"{i:09d}-{s}\n")
577            remain = int(fsize % line_length)
578            if remain != 0:
579                i = int(fsize / line_length) + 1
580                fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n")
581        return fpath
582