xref: /curl/tests/http/testenv/curl.py (revision 0e0c8cdf)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#***************************************************************************
4#                                  _   _ ____  _
5#  Project                     ___| | | |  _ \| |
6#                             / __| | | | |_) | |
7#                            | (__| |_| |  _ <| |___
8#                             \___|\___/|_| \_\_____|
9#
10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
11#
12# This software is licensed as described in the file COPYING, which
13# you should have received as part of this distribution. The terms
14# are also available at https://curl.se/docs/copyright.html.
15#
16# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17# copies of the Software, and permit persons to whom the Software is
18# furnished to do so, under the terms of the COPYING file.
19#
20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21# KIND, either express or implied.
22#
23# SPDX-License-Identifier: curl
24#
25###########################################################################
26#
27import json
28import logging
29import os
30import sys
31import time
32from threading import Thread
33
34import psutil
35import re
36import shutil
37import subprocess
38from statistics import mean, fmean
39from datetime import timedelta, datetime
40from typing import List, Optional, Dict, Union, Any
41from urllib.parse import urlparse
42
43from .env import Env
44
45
46log = logging.getLogger(__name__)
47
48
49class RunProfile:
50
51    STAT_KEYS = ['cpu', 'rss', 'vsz']
52
53    @classmethod
54    def AverageStats(cls, profiles: List['RunProfile']):
55        avg = {}
56        stats = [p.stats for p in profiles]
57        for key in cls.STAT_KEYS:
58            vals = [s[key] for s in stats]
59            avg[key] = mean(vals) if len(vals) else 0.0
60        return avg
61
62    def __init__(self, pid: int, started_at: datetime, run_dir):
63        self._pid = pid
64        self._started_at = started_at
65        self._duration = timedelta(seconds=0)
66        self._run_dir = run_dir
67        self._samples = []
68        self._psu = None
69        self._stats = None
70
71    @property
72    def duration(self) -> timedelta:
73        return self._duration
74
75    @property
76    def stats(self) -> Optional[Dict[str,Any]]:
77        return self._stats
78
79    def sample(self):
80        elapsed = datetime.now() - self._started_at
81        try:
82            if self._psu is None:
83                self._psu = psutil.Process(pid=self._pid)
84            mem = self._psu.memory_info()
85            self._samples.append({
86                'time': elapsed,
87                'cpu': self._psu.cpu_percent(),
88                'vsz': mem.vms,
89                'rss': mem.rss,
90            })
91        except psutil.NoSuchProcess:
92            pass
93
94    def finish(self):
95        self._duration = datetime.now() - self._started_at
96        if len(self._samples) > 0:
97            weights = [s['time'].total_seconds() for s in self._samples]
98            self._stats = {}
99            for key in self.STAT_KEYS:
100                self._stats[key] = fmean([s[key] for s in self._samples], weights)
101        else:
102            self._stats = None
103        self._psu = None
104
105    def __repr__(self):
106        return f'RunProfile[pid={self._pid}, '\
107               f'duration={self.duration.total_seconds():.3f}s, '\
108               f'stats={self.stats}]'
109
110
111class RunTcpDump:
112
113    def __init__(self, env, run_dir):
114        self._env = env
115        self._run_dir = run_dir
116        self._proc = None
117        self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out')
118        self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err')
119
120    @property
121    def stats(self) -> Optional[List[str]]:
122        if self._proc:
123            raise Exception('tcpdump still running')
124        return [line
125                for line in open(self._stdoutfile)
126                if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line)]
127
128    def stats_excluding(self, src_port) -> Optional[List[str]]:
129        if self._proc:
130            raise Exception('tcpdump still running')
131        return [line
132                for line in self.stats
133                if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line)]
134
135    @property
136    def stderr(self) -> List[str]:
137        if self._proc:
138            raise Exception('tcpdump still running')
139        return open(self._stderrfile).readlines()
140
141    def sample(self):
142        # not sure how to make that detection reliable for all platforms
143        local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo'
144        try:
145            tcpdump = self._env.tcpdump()
146            if tcpdump is None:
147                raise Exception('tcpdump not available')
148            # look with tcpdump for TCP RST packets which indicate
149            # we did not shut down connections cleanly
150            args = []
151            # at least on Linux, we need root permissions to run tcpdump
152            if sys.platform.startswith('linux'):
153                args.append('sudo')
154            args.extend([
155                tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0'
156            ])
157            with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
158                self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
159                                              text=True, cwd=self._run_dir,
160                                              shell=False)
161                assert self._proc
162                assert self._proc.returncode is None
163                while self._proc:
164                    try:
165                        self._proc.wait(timeout=1)
166                    except subprocess.TimeoutExpired:
167                        pass
168        except Exception:
169            log.exception('Tcpdump')
170
171    def start(self):
172        def do_sample():
173            self.sample()
174        t = Thread(target=do_sample)
175        t.start()
176
177    def finish(self):
178        if self._proc:
179            time.sleep(1)
180            self._proc.terminate()
181            self._proc = None
182
183
184class ExecResult:
185
186    def __init__(self, args: List[str], exit_code: int,
187                 stdout: List[str], stderr: List[str],
188                 duration: Optional[timedelta] = None,
189                 with_stats: bool = False,
190                 exception: Optional[str] = None,
191                 profile: Optional[RunProfile] = None,
192                 tcpdump: Optional[RunTcpDump] = None):
193        self._args = args
194        self._exit_code = exit_code
195        self._exception = exception
196        self._stdout = stdout
197        self._stderr = stderr
198        self._profile = profile
199        self._tcpdump = tcpdump
200        self._duration = duration if duration is not None else timedelta()
201        self._response = None
202        self._responses = []
203        self._results = {}
204        self._assets = []
205        self._stats = []
206        self._json_out = None
207        self._with_stats = with_stats
208        if with_stats:
209            self._parse_stats()
210        else:
211            # noinspection PyBroadException
212            try:
213                out = ''.join(self._stdout)
214                self._json_out = json.loads(out)
215            except:  # noqa: E722
216                pass
217
218    def __repr__(self):
219        return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\
220               f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]"
221
222    def _parse_stats(self):
223        self._stats = []
224        for line in self._stdout:
225            try:
226                self._stats.append(json.loads(line))
227            # TODO: specify specific exceptions here
228            except:  # noqa: E722
229                log.exception(f'not a JSON stat: {line}')
230                break
231
232    @property
233    def exit_code(self) -> int:
234        return self._exit_code
235
236    @property
237    def args(self) -> List[str]:
238        return self._args
239
240    @property
241    def outraw(self) -> bytes:
242        return ''.join(self._stdout).encode()
243
244    @property
245    def stdout(self) -> str:
246        return ''.join(self._stdout)
247
248    @property
249    def json(self) -> Optional[Dict]:
250        """Output as JSON dictionary or None if not parseable."""
251        return self._json_out
252
253    @property
254    def stderr(self) -> str:
255        return ''.join(self._stderr)
256
257    @property
258    def trace_lines(self) -> List[str]:
259        return self._stderr
260
261    @property
262    def duration(self) -> timedelta:
263        return self._duration
264
265    @property
266    def profile(self) -> Optional[RunProfile]:
267        return self._profile
268
269    @property
270    def tcpdump(self) -> Optional[RunTcpDump]:
271        return self._tcpdump
272
273    @property
274    def response(self) -> Optional[Dict]:
275        return self._response
276
277    @property
278    def responses(self) -> List[Dict]:
279        return self._responses
280
281    @property
282    def results(self) -> Dict:
283        return self._results
284
285    @property
286    def assets(self) -> List:
287        return self._assets
288
289    @property
290    def with_stats(self) -> bool:
291        return self._with_stats
292
293    @property
294    def stats(self) -> List:
295        return self._stats
296
297    @property
298    def total_connects(self) -> Optional[int]:
299        if len(self.stats):
300            n = 0
301            for stat in self.stats:
302                n += stat['num_connects']
303            return n
304        return None
305
306    def add_response(self, resp: Dict):
307        self._response = resp
308        self._responses.append(resp)
309
310    def add_results(self, results: Dict):
311        self._results.update(results)
312        if 'response' in results:
313            self.add_response(results['response'])
314
315    def add_assets(self, assets: List):
316        self._assets.extend(assets)
317
318    def check_exit_code(self, code: Union[int, bool]):
319        if code is True:
320            assert self.exit_code == 0, f'expected exit code {code}, '\
321                                        f'got {self.exit_code}\n{self.dump_logs()}'
322        elif code is False:
323            assert self.exit_code != 0, f'expected exit code {code}, '\
324                                                f'got {self.exit_code}\n{self.dump_logs()}'
325        else:
326            assert self.exit_code == code, f'expected exit code {code}, '\
327                                           f'got {self.exit_code}\n{self.dump_logs()}'
328
329    def check_response(self, http_status: Optional[int] = 200,
330                       count: Optional[int] = 1,
331                       protocol: Optional[str] = None,
332                       exitcode: Optional[int] = 0,
333                       connect_count: Optional[int] = None):
334        if exitcode:
335            self.check_exit_code(exitcode)
336            if self.with_stats and isinstance(exitcode, int):
337                for idx, x in enumerate(self.stats):
338                    if 'exitcode' in x:
339                        assert int(x['exitcode']) == exitcode, \
340                            f'response #{idx} exitcode: expected {exitcode}, '\
341                            f'got {x["exitcode"]}\n{self.dump_logs()}'
342
343        if self.with_stats:
344            assert len(self.stats) == count, \
345                f'response count: expected {count}, ' \
346                f'got {len(self.stats)}\n{self.dump_logs()}'
347        else:
348            assert len(self.responses) == count, \
349                f'response count: expected {count}, ' \
350                f'got {len(self.responses)}\n{self.dump_logs()}'
351        if http_status is not None:
352            if self.with_stats:
353                for idx, x in enumerate(self.stats):
354                    assert 'http_code' in x, \
355                        f'response #{idx} reports no http_code\n{self.dump_stat(x)}'
356                    assert x['http_code'] == http_status, \
357                        f'response #{idx} http_code: expected {http_status}, '\
358                        f'got {x["http_code"]}\n{self.dump_stat(x)}'
359            else:
360                for idx, x in enumerate(self.responses):
361                    assert x['status'] == http_status, \
362                        f'response #{idx} status: expected {http_status},'\
363                        f'got {x["status"]}\n{self.dump_stat(x)}'
364        if protocol is not None:
365            if self.with_stats:
366                http_version = None
367                if protocol == 'HTTP/1.1':
368                    http_version = '1.1'
369                elif protocol == 'HTTP/2':
370                    http_version = '2'
371                elif protocol == 'HTTP/3':
372                    http_version = '3'
373                if http_version is not None:
374                    for idx, x in enumerate(self.stats):
375                        assert x['http_version'] == http_version, \
376                            f'response #{idx} protocol: expected http/{http_version},' \
377                            f'got version {x["http_version"]}\n{self.dump_stat(x)}'
378            else:
379                for idx, x in enumerate(self.responses):
380                    assert x['protocol'] == protocol, \
381                        f'response #{idx} protocol: expected {protocol},'\
382                        f'got {x["protocol"]}\n{self.dump_logs()}'
383        if connect_count is not None:
384            assert self.total_connects == connect_count, \
385                f'expected {connect_count}, but {self.total_connects} '\
386                f'were made\n{self.dump_logs()}'
387
388    def check_stats(self, count: int, http_status: Optional[int] = None,
389                    exitcode: Optional[int] = None,
390                    remote_port: Optional[int] = None,
391                    remote_ip: Optional[str] = None):
392        if exitcode is None:
393            self.check_exit_code(0)
394        assert len(self.stats) == count, \
395            f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}'
396        if http_status is not None:
397            for idx, x in enumerate(self.stats):
398                assert 'http_code' in x, \
399                    f'status #{idx} reports no http_code\n{self.dump_stat(x)}'
400                assert x['http_code'] == http_status, \
401                    f'status #{idx} http_code: expected {http_status}, '\
402                    f'got {x["http_code"]}\n{self.dump_stat(x)}'
403        if exitcode is not None:
404            for idx, x in enumerate(self.stats):
405                if 'exitcode' in x:
406                    assert x['exitcode'] == exitcode, \
407                        f'status #{idx} exitcode: expected {exitcode}, '\
408                        f'got {x["exitcode"]}\n{self.dump_stat(x)}'
409        if remote_port is not None:
410            for idx, x in enumerate(self.stats):
411                assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}'
412                assert x['remote_port'] == remote_port, \
413                        f'status #{idx} remote_port: expected {remote_port}, '\
414                        f'got {x["remote_port"]}\n{self.dump_stat(x)}'
415        if remote_ip is not None:
416            for idx, x in enumerate(self.stats):
417                assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}'
418                assert x['remote_ip'] == remote_ip, \
419                        f'status #{idx} remote_ip: expected {remote_ip}, '\
420                        f'got {x["remote_ip"]}\n{self.dump_stat(x)}'
421
422    def dump_logs(self):
423        lines = ['>>--stdout ----------------------------------------------\n']
424        lines.extend(self._stdout)
425        lines.append('>>--stderr ----------------------------------------------\n')
426        lines.extend(self._stderr)
427        lines.append('<<-------------------------------------------------------\n')
428        return ''.join(lines)
429
430    def dump_stat(self, x):
431        lines = [
432            'json stat from curl:',
433            json.JSONEncoder(indent=2).encode(x),
434        ]
435        if 'xfer_id' in x:
436            xfer_id = x['xfer_id']
437            lines.append(f'>>--xfer {xfer_id} trace:\n')
438            lines.extend(self.xfer_trace_for(xfer_id))
439        else:
440            lines.append('>>--full trace-------------------------------------------\n')
441            lines.extend(self._stderr)
442            lines.append('<<-------------------------------------------------------\n')
443        return ''.join(lines)
444
445    def xfer_trace_for(self, xfer_id) -> List[str]:
446            pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$')
447            return [line for line in self._stderr if pat.match(line)]
448
449
450class CurlClient:
451
452    ALPN_ARG = {
453        'http/0.9': '--http0.9',
454        'http/1.0': '--http1.0',
455        'http/1.1': '--http1.1',
456        'h2': '--http2',
457        'h2c': '--http2',
458        'h3': '--http3-only',
459    }
460
461    def __init__(self, env: Env,
462                 run_dir: Optional[str] = None,
463                 timeout: Optional[float] = None,
464                 silent: bool = False,
465                 run_env: Optional[Dict[str, str]] = None,
466                 server_addr: Optional[str] = None):
467        self.env = env
468        self._timeout = timeout if timeout else env.test_timeout
469        self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
470        self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl')
471        self._stdoutfile = f'{self._run_dir}/curl.stdout'
472        self._stderrfile = f'{self._run_dir}/curl.stderr'
473        self._headerfile = f'{self._run_dir}/curl.headers'
474        self._log_path = f'{self._run_dir}/curl.log'
475        self._silent = silent
476        self._run_env = run_env
477        self._server_addr = server_addr if server_addr else '127.0.0.1'
478        self._rmrf(self._run_dir)
479        self._mkpath(self._run_dir)
480
481    @property
482    def run_dir(self) -> str:
483        return self._run_dir
484
485    def download_file(self, i: int) -> str:
486        return os.path.join(self.run_dir, f'download_{i}.data')
487
488    def _rmf(self, path):
489        if os.path.exists(path):
490            return os.remove(path)
491
492    def _rmrf(self, path):
493        if os.path.exists(path):
494            return shutil.rmtree(path)
495
496    def _mkpath(self, path):
497        if not os.path.exists(path):
498            return os.makedirs(path)
499
500    def get_proxy_args(self, proto: str = 'http/1.1',
501                       proxys: bool = True, tunnel: bool = False,
502                       use_ip: bool = False):
503        proxy_name = self._server_addr if use_ip else self.env.proxy_domain
504        if proxys:
505            pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port
506            xargs = [
507                '--proxy', f'https://{proxy_name}:{pport}/',
508                '--resolve', f'{proxy_name}:{pport}:{self._server_addr}',
509                '--proxy-cacert', self.env.ca.cert_file,
510            ]
511            if proto == 'h2':
512                xargs.append('--proxy-http2')
513        else:
514            xargs = [
515                '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/',
516                '--resolve', f'{proxy_name}:{self.env.proxy_port}:{self._server_addr}',
517            ]
518        if tunnel:
519            xargs.append('--proxytunnel')
520        return xargs
521
522    def http_get(self, url: str, extra_args: Optional[List[str]] = None,
523                 alpn_proto: Optional[str] = None,
524                 def_tracing: bool = True,
525                 with_stats: bool = False,
526                 with_profile: bool = False,
527                 with_tcpdump: bool = False):
528        return self._raw(url, options=extra_args,
529                         with_stats=with_stats,
530                         alpn_proto=alpn_proto,
531                         def_tracing=def_tracing,
532                         with_profile=with_profile,
533                         with_tcpdump=with_tcpdump)
534
535    def http_download(self, urls: List[str],
536                      alpn_proto: Optional[str] = None,
537                      with_stats: bool = True,
538                      with_headers: bool = False,
539                      with_profile: bool = False,
540                      with_tcpdump: bool = False,
541                      no_save: bool = False,
542                      extra_args: Optional[List[str]] = None):
543        if extra_args is None:
544            extra_args = []
545        if no_save:
546            extra_args.extend([
547                '-o', '/dev/null',
548            ])
549        else:
550            extra_args.extend([
551                '-o', 'download_#1.data',
552            ])
553        # remove any existing ones
554        for i in range(100):
555            self._rmf(self.download_file(i))
556        if with_stats:
557            extra_args.extend([
558                '-w', '%{json}\\n'
559            ])
560        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
561                         with_stats=with_stats,
562                         with_headers=with_headers,
563                         with_profile=with_profile,
564                         with_tcpdump=with_tcpdump)
565
566    def http_upload(self, urls: List[str], data: str,
567                    alpn_proto: Optional[str] = None,
568                    with_stats: bool = True,
569                    with_headers: bool = False,
570                    with_profile: bool = False,
571                    with_tcpdump: bool = False,
572                    extra_args: Optional[List[str]] = None):
573        if extra_args is None:
574            extra_args = []
575        extra_args.extend([
576            '--data-binary', data, '-o', 'download_#1.data',
577        ])
578        if with_stats:
579            extra_args.extend([
580                '-w', '%{json}\\n'
581            ])
582        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
583                         with_stats=with_stats,
584                         with_headers=with_headers,
585                         with_profile=with_profile,
586                         with_tcpdump=with_tcpdump)
587
588    def http_delete(self, urls: List[str],
589                    alpn_proto: Optional[str] = None,
590                    with_stats: bool = True,
591                    with_profile: bool = False,
592                    extra_args: Optional[List[str]] = None):
593        if extra_args is None:
594            extra_args = []
595        extra_args.extend([
596            '-X', 'DELETE', '-o', '/dev/null',
597        ])
598        if with_stats:
599            extra_args.extend([
600                '-w', '%{json}\\n'
601            ])
602        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
603                         with_stats=with_stats,
604                         with_headers=False,
605                         with_profile=with_profile)
606
607    def http_put(self, urls: List[str], data=None, fdata=None,
608                 alpn_proto: Optional[str] = None,
609                 with_stats: bool = True,
610                 with_headers: bool = False,
611                 with_profile: bool = False,
612                 extra_args: Optional[List[str]] = None):
613        if extra_args is None:
614            extra_args = []
615        if fdata is not None:
616            extra_args.extend(['-T', fdata])
617        elif data is not None:
618            extra_args.extend(['-T', '-'])
619        extra_args.extend([
620            '-o', 'download_#1.data',
621        ])
622        if with_stats:
623            extra_args.extend([
624                '-w', '%{json}\\n'
625            ])
626        return self._raw(urls, intext=data,
627                         alpn_proto=alpn_proto, options=extra_args,
628                         with_stats=with_stats,
629                         with_headers=with_headers,
630                         with_profile=with_profile)
631
632    def http_form(self, urls: List[str], form: Dict[str, str],
633                  alpn_proto: Optional[str] = None,
634                  with_stats: bool = True,
635                  with_headers: bool = False,
636                  extra_args: Optional[List[str]] = None):
637        if extra_args is None:
638            extra_args = []
639        for key, val in form.items():
640            extra_args.extend(['-F', f'{key}={val}'])
641        extra_args.extend([
642            '-o', 'download_#1.data',
643        ])
644        if with_stats:
645            extra_args.extend([
646                '-w', '%{json}\\n'
647            ])
648        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
649                         with_stats=with_stats,
650                         with_headers=with_headers)
651
652    def ftp_get(self, urls: List[str],
653                      with_stats: bool = True,
654                      with_profile: bool = False,
655                      with_tcpdump: bool = False,
656                      no_save: bool = False,
657                      extra_args: Optional[List[str]] = None):
658        if extra_args is None:
659            extra_args = []
660        if no_save:
661            extra_args.extend([
662                '-o', '/dev/null',
663            ])
664        else:
665            extra_args.extend([
666                '-o', 'download_#1.data',
667            ])
668        # remove any existing ones
669        for i in range(100):
670            self._rmf(self.download_file(i))
671        if with_stats:
672            extra_args.extend([
673                '-w', '%{json}\\n'
674            ])
675        return self._raw(urls, options=extra_args,
676                         with_stats=with_stats,
677                         with_headers=False,
678                         with_profile=with_profile,
679                         with_tcpdump=with_tcpdump)
680
681    def ftp_ssl_get(self, urls: List[str],
682                      with_stats: bool = True,
683                      with_profile: bool = False,
684                      with_tcpdump: bool = False,
685                      no_save: bool = False,
686                      extra_args: Optional[List[str]] = None):
687        if extra_args is None:
688            extra_args = []
689        extra_args.extend([
690            '--ssl-reqd',
691        ])
692        return self.ftp_get(urls=urls, with_stats=with_stats,
693                            with_profile=with_profile, no_save=no_save,
694                            with_tcpdump=with_tcpdump,
695                            extra_args=extra_args)
696
697    def ftp_upload(self, urls: List[str],
698                   fupload: Optional[Any] = None,
699                   updata: Optional[str] = None,
700                   with_stats: bool = True,
701                   with_profile: bool = False,
702                   with_tcpdump: bool = False,
703                   extra_args: Optional[List[str]] = None):
704        if extra_args is None:
705            extra_args = []
706        if fupload is not None:
707            extra_args.extend([
708                '--upload-file', fupload
709            ])
710        elif updata is not None:
711            extra_args.extend([
712                '--upload-file', '-'
713            ])
714        else:
715            raise Exception('need either file or data to upload')
716        if with_stats:
717            extra_args.extend([
718                '-w', '%{json}\\n'
719            ])
720        return self._raw(urls, options=extra_args,
721                         intext=updata,
722                         with_stats=with_stats,
723                         with_headers=False,
724                         with_profile=with_profile,
725                         with_tcpdump=with_tcpdump)
726
727    def ftp_ssl_upload(self, urls: List[str],
728                       fupload: Optional[Any] = None,
729                       updata: Optional[str] = None,
730                       with_stats: bool = True,
731                       with_profile: bool = False,
732                       with_tcpdump: bool = False,
733                       extra_args: Optional[List[str]] = None):
734        if extra_args is None:
735            extra_args = []
736        extra_args.extend([
737            '--ssl-reqd',
738        ])
739        return self.ftp_upload(urls=urls, fupload=fupload, updata=updata,
740                               with_stats=with_stats, with_profile=with_profile,
741                               with_tcpdump=with_tcpdump,
742                               extra_args=extra_args)
743
744    def response_file(self, idx: int):
745        return os.path.join(self._run_dir, f'download_{idx}.data')
746
747    def run_direct(self, args, with_stats: bool = False, with_profile: bool = False):
748        my_args = [self._curl]
749        if with_stats:
750            my_args.extend([
751                '-w', '%{json}\\n'
752            ])
753        my_args.extend([
754            '-o', 'download.data',
755        ])
756        my_args.extend(args)
757        return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile)
758
759    def _run(self, args, intext='', with_stats: bool = False,
760             with_profile: bool = True, with_tcpdump: bool = False):
761        self._rmf(self._stdoutfile)
762        self._rmf(self._stderrfile)
763        self._rmf(self._headerfile)
764        exception = None
765        profile = None
766        tcpdump = None
767        started_at = datetime.now()
768        if with_tcpdump:
769            tcpdump = RunTcpDump(self.env, self._run_dir)
770            tcpdump.start()
771        try:
772            with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
773                if with_profile:
774                    end_at = started_at + timedelta(seconds=self._timeout) \
775                        if self._timeout else None
776                    log.info(f'starting: {args}')
777                    p = subprocess.Popen(args, stderr=cerr, stdout=cout,
778                                         cwd=self._run_dir, shell=False,
779                                         env=self._run_env)
780                    profile = RunProfile(p.pid, started_at, self._run_dir)
781                    if intext is not None and False:
782                        p.communicate(input=intext.encode(), timeout=1)
783                    ptimeout = 0.0
784                    while True:
785                        try:
786                            p.wait(timeout=ptimeout)
787                            break
788                        except subprocess.TimeoutExpired:
789                            if end_at and datetime.now() >= end_at:
790                                p.kill()
791                                raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
792                            profile.sample()
793                            ptimeout = 0.01
794                    exitcode = p.returncode
795                    profile.finish()
796                    log.info(f'done: exit={exitcode}, profile={profile}')
797                else:
798                    p = subprocess.run(args, stderr=cerr, stdout=cout,
799                                       cwd=self._run_dir, shell=False,
800                                       input=intext.encode() if intext else None,
801                                       timeout=self._timeout,
802                                       env=self._run_env)
803                    exitcode = p.returncode
804        except subprocess.TimeoutExpired:
805            now = datetime.now()
806            duration = now - started_at
807            log.warning(f'Timeout at {now} after {duration.total_seconds()}s '
808                        f'(configured {self._timeout}s): {args}')
809            exitcode = -1
810            exception = 'TimeoutExpired'
811        if tcpdump:
812            tcpdump.finish()
813        coutput = open(self._stdoutfile).readlines()
814        cerrput = open(self._stderrfile).readlines()
815        return ExecResult(args=args, exit_code=exitcode, exception=exception,
816                          stdout=coutput, stderr=cerrput,
817                          duration=datetime.now() - started_at,
818                          with_stats=with_stats,
819                          profile=profile, tcpdump=tcpdump)
820
821    def _raw(self, urls, intext='', timeout=None, options=None, insecure=False,
822             alpn_proto: Optional[str] = None,
823             force_resolve=True,
824             with_stats=False,
825             with_headers=True,
826             def_tracing=True,
827             with_profile=False,
828             with_tcpdump=False):
829        args = self._complete_args(
830            urls=urls, timeout=timeout, options=options, insecure=insecure,
831            alpn_proto=alpn_proto, force_resolve=force_resolve,
832            with_headers=with_headers, def_tracing=def_tracing)
833        r = self._run(args, intext=intext, with_stats=with_stats,
834                      with_profile=with_profile, with_tcpdump=with_tcpdump)
835        if r.exit_code == 0 and with_headers:
836            self._parse_headerfile(self._headerfile, r=r)
837        return r
838
839    def _complete_args(self, urls, timeout=None, options=None,
840                       insecure=False, force_resolve=True,
841                       alpn_proto: Optional[str] = None,
842                       with_headers: bool = True,
843                       def_tracing: bool = True):
844        if not isinstance(urls, list):
845            urls = [urls]
846
847        args = [self._curl, "-s", "--path-as-is"]
848        if 'CURL_TEST_EVENT' in os.environ:
849            args.append('--test-event')
850
851        if with_headers:
852            args.extend(["-D", self._headerfile])
853        if def_tracing is not False and not self._silent:
854            args.extend(['-v', '--trace-ids', '--trace-time'])
855            if self.env.verbose > 1:
856                args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy'])
857
858        active_options = options
859        if options is not None and '--next' in options:
860            active_options = options[options.index('--next') + 1:]
861
862        for url in urls:
863            u = urlparse(urls[0])
864            if options:
865                args.extend(options)
866            if alpn_proto is not None:
867                if alpn_proto not in self.ALPN_ARG:
868                    raise Exception(f'unknown ALPN protocol: "{alpn_proto}"')
869                args.append(self.ALPN_ARG[alpn_proto])
870
871            if u.scheme == 'http':
872                pass
873            elif insecure:
874                args.append('--insecure')
875            elif active_options and "--cacert" in active_options:
876                pass
877            elif u.hostname:
878                args.extend(["--cacert", self.env.ca.cert_file])
879
880            if force_resolve and u.hostname and u.hostname != 'localhost' \
881                    and not re.match(r'^(\d+|\[|:).*', u.hostname):
882                port = u.port if u.port else 443
883                args.extend([
884                    '--resolve', f'{u.hostname}:{port}:{self._server_addr}',
885                ])
886            if timeout is not None and int(timeout) > 0:
887                args.extend(["--connect-timeout", str(int(timeout))])
888            args.append(url)
889        return args
890
891    def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult:
892        lines = open(headerfile).readlines()
893        if r is None:
894            r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])
895
896        response = None
897
898        def fin_response(resp):
899            if resp:
900                r.add_response(resp)
901
902        expected = ['status']
903        for line in lines:
904            line = line.strip()
905            if re.match(r'^$', line):
906                if 'trailer' in expected:
907                    # end of trailers
908                    fin_response(response)
909                    response = None
910                    expected = ['status']
911                elif 'header' in expected:
912                    # end of header, another status or trailers might follow
913                    expected = ['status', 'trailer']
914                else:
915                    assert False, f"unexpected line: '{line}'"
916                continue
917            if 'status' in expected:
918                # log.debug("reading 1st response line: %s", line)
919                m = re.match(r'^(\S+) (\d+)( .*)?$', line)
920                if m:
921                    fin_response(response)
922                    response = {
923                        "protocol": m.group(1),
924                        "status": int(m.group(2)),
925                        "description": m.group(3),
926                        "header": {},
927                        "trailer": {},
928                        "body": r.outraw
929                    }
930                    expected = ['header']
931                    continue
932            if 'trailer' in expected:
933                m = re.match(r'^([^:]+):\s*(.*)$', line)
934                if m:
935                    response['trailer'][m.group(1).lower()] = m.group(2)
936                    continue
937            if 'header' in expected:
938                m = re.match(r'^([^:]+):\s*(.*)$', line)
939                if m:
940                    response['header'][m.group(1).lower()] = m.group(2)
941                    continue
942            assert False, f"unexpected line: '{line}, expected: {expected}'"
943
944        fin_response(response)
945        return r
946