xref: /curl/tests/http/testenv/curl.py (revision ea6f5c9f)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#***************************************************************************
4#                                  _   _ ____  _
5#  Project                     ___| | | |  _ \| |
6#                             / __| | | | |_) | |
7#                            | (__| |_| |  _ <| |___
8#                             \___|\___/|_| \_\_____|
9#
10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
11#
12# This software is licensed as described in the file COPYING, which
13# you should have received as part of this distribution. The terms
14# are also available at https://curl.se/docs/copyright.html.
15#
16# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17# copies of the Software, and permit persons to whom the Software is
18# furnished to do so, under the terms of the COPYING file.
19#
20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21# KIND, either express or implied.
22#
23# SPDX-License-Identifier: curl
24#
25###########################################################################
26#
27import json
28import logging
29import os
30import sys
31import time
32from threading import Thread
33
34import psutil
35import re
36import shutil
37import subprocess
38from statistics import mean, fmean
39from datetime import timedelta, datetime
40from typing import List, Optional, Dict, Union, Any
41from urllib.parse import urlparse
42
43from .env import Env
44
45
46log = logging.getLogger(__name__)
47
48
49class RunProfile:
50
51    STAT_KEYS = ['cpu', 'rss', 'vsz']
52
53    @classmethod
54    def AverageStats(cls, profiles: List['RunProfile']):
55        avg = {}
56        stats = [p.stats for p in profiles]
57        for key in cls.STAT_KEYS:
58            avg[key] = mean([s[key] for s in stats])
59        return avg
60
61    def __init__(self, pid: int, started_at: datetime, run_dir):
62        self._pid = pid
63        self._started_at = started_at
64        self._duration = timedelta(seconds=0)
65        self._run_dir = run_dir
66        self._samples = []
67        self._psu = None
68        self._stats = None
69
70    @property
71    def duration(self) -> timedelta:
72        return self._duration
73
74    @property
75    def stats(self) -> Optional[Dict[str,Any]]:
76        return self._stats
77
78    def sample(self):
79        elapsed = datetime.now() - self._started_at
80        try:
81            if self._psu is None:
82                self._psu = psutil.Process(pid=self._pid)
83            mem = self._psu.memory_info()
84            self._samples.append({
85                'time': elapsed,
86                'cpu': self._psu.cpu_percent(),
87                'vsz': mem.vms,
88                'rss': mem.rss,
89            })
90        except psutil.NoSuchProcess:
91            pass
92
93    def finish(self):
94        self._duration = datetime.now() - self._started_at
95        if len(self._samples) > 0:
96            weights = [s['time'].total_seconds() for s in self._samples]
97            self._stats = {}
98            for key in self.STAT_KEYS:
99                self._stats[key] = fmean([s[key] for s in self._samples], weights)
100        else:
101            self._stats = None
102        self._psu = None
103
104    def __repr__(self):
105        return f'RunProfile[pid={self._pid}, '\
106               f'duration={self.duration.total_seconds():.3f}s, '\
107               f'stats={self.stats}]'
108
109
110class RunTcpDump:
111
112    def __init__(self, env, run_dir):
113        self._env = env
114        self._run_dir = run_dir
115        self._proc = None
116        self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out')
117        self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err')
118
119    @property
120    def stats(self) -> Optional[List[str]]:
121        if self._proc:
122            raise Exception('tcpdump still running')
123        lines = []
124        for l in open(self._stdoutfile).readlines():
125            if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', l):
126                lines.append(l)
127        return lines
128
129    def stats_excluding(self, src_port) -> Optional[List[str]]:
130        if self._proc:
131            raise Exception('tcpdump still running')
132        lines = []
133        for l in self.stats:
134            if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', l):
135                lines.append(l)
136        return lines
137
138    @property
139    def stderr(self) -> List[str]:
140        if self._proc:
141            raise Exception('tcpdump still running')
142        lines = []
143        return open(self._stderrfile).readlines()
144
145    def sample(self):
146        # not sure how to make that detection reliable for all platforms
147        local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo'
148        try:
149            tcpdump = self._env.tcpdump()
150            if tcpdump is None:
151                raise Exception('tcpdump not available')
152            # look with tcpdump for TCP RST packets which indicate
153            # we did not shut down connections cleanly
154            args = []
155            # at least on Linux, we need root permissions to run tcpdump
156            if sys.platform.startswith('linux'):
157                args.append('sudo')
158            args.extend([
159                tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0'
160            ])
161            with open(self._stdoutfile, 'w') as cout:
162                with open(self._stderrfile, 'w') as cerr:
163                    self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
164                                                  text=True, cwd=self._run_dir,
165                                                  shell=False)
166                    assert self._proc
167                    assert self._proc.returncode is None
168                    while self._proc:
169                        try:
170                            self._proc.wait(timeout=1)
171                        except subprocess.TimeoutExpired:
172                            pass
173        except Exception as e:
174            log.error(f'Tcpdump: {e}')
175
176    def start(self):
177        def do_sample():
178            self.sample()
179        t = Thread(target=do_sample)
180        t.start()
181
182    def finish(self):
183        if self._proc:
184            time.sleep(1)
185            self._proc.terminate()
186            self._proc = None
187
188
189class ExecResult:
190
191    def __init__(self, args: List[str], exit_code: int,
192                 stdout: List[str], stderr: List[str],
193                 duration: Optional[timedelta] = None,
194                 with_stats: bool = False,
195                 exception: Optional[str] = None,
196                 profile: Optional[RunProfile] = None,
197                 tcpdump: Optional[RunTcpDump] = None):
198        self._args = args
199        self._exit_code = exit_code
200        self._exception = exception
201        self._stdout = stdout
202        self._stderr = stderr
203        self._profile = profile
204        self._tcpdump = tcpdump
205        self._duration = duration if duration is not None else timedelta()
206        self._response = None
207        self._responses = []
208        self._results = {}
209        self._assets = []
210        self._stats = []
211        self._json_out = None
212        self._with_stats = with_stats
213        if with_stats:
214            self._parse_stats()
215        else:
216            # noinspection PyBroadException
217            try:
218                out = ''.join(self._stdout)
219                self._json_out = json.loads(out)
220            except:
221                pass
222
223    def __repr__(self):
224        return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\
225               f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]"
226
227    def _parse_stats(self):
228        self._stats = []
229        for l in self._stdout:
230            try:
231                self._stats.append(json.loads(l))
232            except:
233                log.error(f'not a JSON stat: {l}')
234                break
235
236    @property
237    def exit_code(self) -> int:
238        return self._exit_code
239
240    @property
241    def args(self) -> List[str]:
242        return self._args
243
244    @property
245    def outraw(self) -> bytes:
246        return ''.join(self._stdout).encode()
247
248    @property
249    def stdout(self) -> str:
250        return ''.join(self._stdout)
251
252    @property
253    def json(self) -> Optional[Dict]:
254        """Output as JSON dictionary or None if not parseable."""
255        return self._json_out
256
257    @property
258    def stderr(self) -> str:
259        return ''.join(self._stderr)
260
261    @property
262    def trace_lines(self) -> List[str]:
263        return self._stderr
264
265    @property
266    def duration(self) -> timedelta:
267        return self._duration
268
269    @property
270    def profile(self) -> Optional[RunProfile]:
271        return self._profile
272
273    @property
274    def tcpdump(self) -> Optional[RunTcpDump]:
275        return self._tcpdump
276
277    @property
278    def response(self) -> Optional[Dict]:
279        return self._response
280
281    @property
282    def responses(self) -> List[Dict]:
283        return self._responses
284
285    @property
286    def results(self) -> Dict:
287        return self._results
288
289    @property
290    def assets(self) -> List:
291        return self._assets
292
293    @property
294    def with_stats(self) -> bool:
295        return self._with_stats
296
297    @property
298    def stats(self) -> List:
299        return self._stats
300
301    @property
302    def total_connects(self) -> Optional[int]:
303        if len(self.stats):
304            n = 0
305            for stat in self.stats:
306                n += stat['num_connects']
307            return n
308        return None
309
310    def add_response(self, resp: Dict):
311        self._response = resp
312        self._responses.append(resp)
313
314    def add_results(self, results: Dict):
315        self._results.update(results)
316        if 'response' in results:
317            self.add_response(results['response'])
318
319    def add_assets(self, assets: List):
320        self._assets.extend(assets)
321
322    def check_exit_code(self, code: Union[int, bool]):
323        if code is True:
324            assert self.exit_code == 0, f'expected exit code {code}, '\
325                                        f'got {self.exit_code}\n{self.dump_logs()}'
326        elif code is False:
327            assert self.exit_code != 0, f'expected exit code {code}, '\
328                                                f'got {self.exit_code}\n{self.dump_logs()}'
329        else:
330            assert self.exit_code == code, f'expected exit code {code}, '\
331                                           f'got {self.exit_code}\n{self.dump_logs()}'
332
333    def check_response(self, http_status: Optional[int] = 200,
334                       count: Optional[int] = 1,
335                       protocol: Optional[str] = None,
336                       exitcode: Optional[int] = 0,
337                       connect_count: Optional[int] = None):
338        if exitcode:
339            self.check_exit_code(exitcode)
340            if self.with_stats and isinstance(exitcode, int):
341                for idx, x in enumerate(self.stats):
342                    if 'exitcode' in x:
343                        assert int(x['exitcode']) == exitcode, \
344                            f'response #{idx} exitcode: expected {exitcode}, '\
345                            f'got {x["exitcode"]}\n{self.dump_logs()}'
346
347        if self.with_stats:
348            assert len(self.stats) == count, \
349                f'response count: expected {count}, ' \
350                f'got {len(self.stats)}\n{self.dump_logs()}'
351        else:
352            assert len(self.responses) == count, \
353                f'response count: expected {count}, ' \
354                f'got {len(self.responses)}\n{self.dump_logs()}'
355        if http_status is not None:
356            if self.with_stats:
357                for idx, x in enumerate(self.stats):
358                    assert 'http_code' in x, \
359                        f'response #{idx} reports no http_code\n{self.dump_stat(x)}'
360                    assert x['http_code'] == http_status, \
361                        f'response #{idx} http_code: expected {http_status}, '\
362                        f'got {x["http_code"]}\n{self.dump_stat(x)}'
363            else:
364                for idx, x in enumerate(self.responses):
365                    assert x['status'] == http_status, \
366                        f'response #{idx} status: expected {http_status},'\
367                        f'got {x["status"]}\n{self.dump_stat(x)}'
368        if protocol is not None:
369            if self.with_stats:
370                http_version = None
371                if protocol == 'HTTP/1.1':
372                    http_version = '1.1'
373                elif protocol == 'HTTP/2':
374                    http_version = '2'
375                elif protocol == 'HTTP/3':
376                    http_version = '3'
377                if http_version is not None:
378                    for idx, x in enumerate(self.stats):
379                        assert x['http_version'] == http_version, \
380                            f'response #{idx} protocol: expected http/{http_version},' \
381                            f'got version {x["http_version"]}\n{self.dump_stat(x)}'
382            else:
383                for idx, x in enumerate(self.responses):
384                    assert x['protocol'] == protocol, \
385                        f'response #{idx} protocol: expected {protocol},'\
386                        f'got {x["protocol"]}\n{self.dump_logs()}'
387        if connect_count is not None:
388            assert self.total_connects == connect_count, \
389                f'expected {connect_count}, but {self.total_connects} '\
390                f'were made\n{self.dump_logs()}'
391
392    def check_stats(self, count: int, http_status: Optional[int] = None,
393                    exitcode: Optional[int] = None,
394                    remote_port: Optional[int] = None,
395                    remote_ip: Optional[str] = None):
396        if exitcode is None:
397            self.check_exit_code(0)
398        assert len(self.stats) == count, \
399            f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}'
400        if http_status is not None:
401            for idx, x in enumerate(self.stats):
402                assert 'http_code' in x, \
403                    f'status #{idx} reports no http_code\n{self.dump_stat(x)}'
404                assert x['http_code'] == http_status, \
405                    f'status #{idx} http_code: expected {http_status}, '\
406                    f'got {x["http_code"]}\n{self.dump_stat(x)}'
407        if exitcode is not None:
408            for idx, x in enumerate(self.stats):
409                if 'exitcode' in x:
410                    assert x['exitcode'] == exitcode, \
411                        f'status #{idx} exitcode: expected {exitcode}, '\
412                        f'got {x["exitcode"]}\n{self.dump_stat(x)}'
413        if remote_port is not None:
414            for idx, x in enumerate(self.stats):
415                assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}'
416                assert x['remote_port'] == remote_port, \
417                        f'status #{idx} remote_port: expected {remote_port}, '\
418                        f'got {x["remote_port"]}\n{self.dump_stat(x)}'
419        if remote_ip is not None:
420            for idx, x in enumerate(self.stats):
421                assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}'
422                assert x['remote_ip'] == remote_ip, \
423                        f'status #{idx} remote_ip: expected {remote_ip}, '\
424                        f'got {x["remote_ip"]}\n{self.dump_stat(x)}'
425
426    def dump_logs(self):
427        lines = ['>>--stdout ----------------------------------------------\n']
428        lines.extend(self._stdout)
429        lines.append('>>--stderr ----------------------------------------------\n')
430        lines.extend(self._stderr)
431        lines.append('<<-------------------------------------------------------\n')
432        return ''.join(lines)
433
434    def dump_stat(self, x):
435        lines = [
436            'json stat from curl:',
437            json.JSONEncoder(indent=2).encode(x),
438        ]
439        if 'xfer_id' in x:
440            xfer_id = x['xfer_id']
441            lines.append(f'>>--xfer {xfer_id} trace:\n')
442            lines.extend(self.xfer_trace_for(xfer_id))
443        else:
444            lines.append('>>--full trace-------------------------------------------\n')
445            lines.extend(self._stderr)
446            lines.append('<<-------------------------------------------------------\n')
447        return ''.join(lines)
448
449    def xfer_trace_for(self, xfer_id) -> List[str]:
450            pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$')
451            return [line for line in self._stderr if pat.match(line)]
452
453
454class CurlClient:
455
456    ALPN_ARG = {
457        'http/0.9': '--http0.9',
458        'http/1.0': '--http1.0',
459        'http/1.1': '--http1.1',
460        'h2': '--http2',
461        'h2c': '--http2',
462        'h3': '--http3-only',
463    }
464
465    def __init__(self, env: Env,
466                 run_dir: Optional[str] = None,
467                 timeout: Optional[float] = None,
468                 silent: bool = False,
469                 run_env: Optional[Dict[str, str]] = None):
470        self.env = env
471        self._timeout = timeout if timeout else env.test_timeout
472        self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
473        self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl')
474        self._stdoutfile = f'{self._run_dir}/curl.stdout'
475        self._stderrfile = f'{self._run_dir}/curl.stderr'
476        self._headerfile = f'{self._run_dir}/curl.headers'
477        self._log_path = f'{self._run_dir}/curl.log'
478        self._silent = silent
479        self._run_env = run_env
480        self._rmrf(self._run_dir)
481        self._mkpath(self._run_dir)
482
483    @property
484    def run_dir(self) -> str:
485        return self._run_dir
486
487    def download_file(self, i: int) -> str:
488        return os.path.join(self.run_dir, f'download_{i}.data')
489
490    def _rmf(self, path):
491        if os.path.exists(path):
492            return os.remove(path)
493
494    def _rmrf(self, path):
495        if os.path.exists(path):
496            return shutil.rmtree(path)
497
498    def _mkpath(self, path):
499        if not os.path.exists(path):
500            return os.makedirs(path)
501
502    def get_proxy_args(self, proto: str = 'http/1.1',
503                       proxys: bool = True, tunnel: bool = False,
504                       use_ip: bool = False):
505        proxy_name = '127.0.0.1' if use_ip else self.env.proxy_domain
506        if proxys:
507            pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port
508            xargs = [
509                '--proxy', f'https://{proxy_name}:{pport}/',
510                '--resolve', f'{proxy_name}:{pport}:127.0.0.1',
511                '--proxy-cacert', self.env.ca.cert_file,
512            ]
513            if proto == 'h2':
514                xargs.append('--proxy-http2')
515        else:
516            xargs = [
517                '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/',
518                '--resolve', f'{proxy_name}:{self.env.proxy_port}:127.0.0.1',
519            ]
520        if tunnel:
521            xargs.append('--proxytunnel')
522        return xargs
523
524    def http_get(self, url: str, extra_args: Optional[List[str]] = None,
525                 alpn_proto: Optional[str] = None,
526                 def_tracing: bool = True,
527                 with_stats: bool = False,
528                 with_profile: bool = False,
529                 with_tcpdump: bool = False):
530        return self._raw(url, options=extra_args,
531                         with_stats=with_stats,
532                         alpn_proto=alpn_proto,
533                         def_tracing=def_tracing,
534                         with_profile=with_profile,
535                         with_tcpdump=with_tcpdump)
536
537    def http_download(self, urls: List[str],
538                      alpn_proto: Optional[str] = None,
539                      with_stats: bool = True,
540                      with_headers: bool = False,
541                      with_profile: bool = False,
542                      with_tcpdump: bool = False,
543                      no_save: bool = False,
544                      extra_args: List[str] = None):
545        if extra_args is None:
546            extra_args = []
547        if no_save:
548            extra_args.extend([
549                '-o', '/dev/null',
550            ])
551        else:
552            extra_args.extend([
553                '-o', 'download_#1.data',
554            ])
555        # remove any existing ones
556        for i in range(100):
557            self._rmf(self.download_file(i))
558        if with_stats:
559            extra_args.extend([
560                '-w', '%{json}\\n'
561            ])
562        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
563                         with_stats=with_stats,
564                         with_headers=with_headers,
565                         with_profile=with_profile,
566                         with_tcpdump=with_tcpdump)
567
568    def http_upload(self, urls: List[str], data: str,
569                    alpn_proto: Optional[str] = None,
570                    with_stats: bool = True,
571                    with_headers: bool = False,
572                    with_profile: bool = False,
573                    with_tcpdump: bool = False,
574                    extra_args: Optional[List[str]] = None):
575        if extra_args is None:
576            extra_args = []
577        extra_args.extend([
578            '--data-binary', data, '-o', 'download_#1.data',
579        ])
580        if with_stats:
581            extra_args.extend([
582                '-w', '%{json}\\n'
583            ])
584        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
585                         with_stats=with_stats,
586                         with_headers=with_headers,
587                         with_profile=with_profile,
588                         with_tcpdump=with_tcpdump)
589
590    def http_delete(self, urls: List[str],
591                    alpn_proto: Optional[str] = None,
592                    with_stats: bool = True,
593                    with_profile: bool = False,
594                    extra_args: Optional[List[str]] = None):
595        if extra_args is None:
596            extra_args = []
597        extra_args.extend([
598            '-X', 'DELETE', '-o', '/dev/null',
599        ])
600        if with_stats:
601            extra_args.extend([
602                '-w', '%{json}\\n'
603            ])
604        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
605                         with_stats=with_stats,
606                         with_headers=False,
607                         with_profile=with_profile)
608
609    def http_put(self, urls: List[str], data=None, fdata=None,
610                 alpn_proto: Optional[str] = None,
611                 with_stats: bool = True,
612                 with_headers: bool = False,
613                 with_profile: bool = False,
614                 extra_args: Optional[List[str]] = None):
615        if extra_args is None:
616            extra_args = []
617        if fdata is not None:
618            extra_args.extend(['-T', fdata])
619        elif data is not None:
620            extra_args.extend(['-T', '-'])
621        extra_args.extend([
622            '-o', 'download_#1.data',
623        ])
624        if with_stats:
625            extra_args.extend([
626                '-w', '%{json}\\n'
627            ])
628        return self._raw(urls, intext=data,
629                         alpn_proto=alpn_proto, options=extra_args,
630                         with_stats=with_stats,
631                         with_headers=with_headers,
632                         with_profile=with_profile)
633
634    def http_form(self, urls: List[str], form: Dict[str, str],
635                  alpn_proto: Optional[str] = None,
636                  with_stats: bool = True,
637                  with_headers: bool = False,
638                  extra_args: Optional[List[str]] = None):
639        if extra_args is None:
640            extra_args = []
641        for key, val in form.items():
642            extra_args.extend(['-F', f'{key}={val}'])
643        extra_args.extend([
644            '-o', 'download_#1.data',
645        ])
646        if with_stats:
647            extra_args.extend([
648                '-w', '%{json}\\n'
649            ])
650        return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
651                         with_stats=with_stats,
652                         with_headers=with_headers)
653
654    def ftp_get(self, urls: List[str],
655                      with_stats: bool = True,
656                      with_profile: bool = False,
657                      with_tcpdump: bool = False,
658                      no_save: bool = False,
659                      extra_args: List[str] = None):
660        if extra_args is None:
661            extra_args = []
662        if no_save:
663            extra_args.extend([
664                '-o', '/dev/null',
665            ])
666        else:
667            extra_args.extend([
668                '-o', 'download_#1.data',
669            ])
670        # remove any existing ones
671        for i in range(100):
672            self._rmf(self.download_file(i))
673        if with_stats:
674            extra_args.extend([
675                '-w', '%{json}\\n'
676            ])
677        return self._raw(urls, options=extra_args,
678                         with_stats=with_stats,
679                         with_headers=False,
680                         with_profile=with_profile,
681                         with_tcpdump=with_tcpdump)
682
683    def ftp_ssl_get(self, urls: List[str],
684                      with_stats: bool = True,
685                      with_profile: bool = False,
686                      with_tcpdump: bool = False,
687                      no_save: bool = False,
688                      extra_args: List[str] = None):
689        if extra_args is None:
690            extra_args = []
691        extra_args.extend([
692            '--ssl-reqd',
693        ])
694        return self.ftp_get(urls=urls, with_stats=with_stats,
695                            with_profile=with_profile, no_save=no_save,
696                            with_tcpdump=with_tcpdump,
697                            extra_args=extra_args)
698
699    def ftp_upload(self, urls: List[str], fupload,
700                   with_stats: bool = True,
701                   with_profile: bool = False,
702                   with_tcpdump: bool = False,
703                   extra_args: List[str] = None):
704        if extra_args is None:
705            extra_args = []
706        extra_args.extend([
707            '--upload-file', fupload
708        ])
709        if with_stats:
710            extra_args.extend([
711                '-w', '%{json}\\n'
712            ])
713        return self._raw(urls, options=extra_args,
714                         with_stats=with_stats,
715                         with_headers=False,
716                         with_profile=with_profile,
717                         with_tcpdump=with_tcpdump)
718
719    def ftp_ssl_upload(self, urls: List[str], fupload,
720                       with_stats: bool = True,
721                       with_profile: bool = False,
722                       with_tcpdump: bool = False,
723                       extra_args: List[str] = None):
724        if extra_args is None:
725            extra_args = []
726        extra_args.extend([
727            '--ssl-reqd',
728        ])
729        return self.ftp_upload(urls=urls, fupload=fupload,
730                               with_stats=with_stats, with_profile=with_profile,
731                               with_tcpdump=with_tcpdump,
732                               extra_args=extra_args)
733
734    def response_file(self, idx: int):
735        return os.path.join(self._run_dir, f'download_{idx}.data')
736
737    def run_direct(self, args, with_stats: bool = False, with_profile: bool = False):
738        my_args = [self._curl]
739        if with_stats:
740            my_args.extend([
741                '-w', '%{json}\\n'
742            ])
743        my_args.extend([
744            '-o', 'download.data',
745        ])
746        my_args.extend(args)
747        return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile)
748
749    def _run(self, args, intext='', with_stats: bool = False,
750             with_profile: bool = True, with_tcpdump: bool = False):
751        self._rmf(self._stdoutfile)
752        self._rmf(self._stderrfile)
753        self._rmf(self._headerfile)
754        exception = None
755        profile = None
756        tcpdump = None
757        started_at = datetime.now()
758        if with_tcpdump:
759            tcpdump = RunTcpDump(self.env, self._run_dir)
760            tcpdump.start()
761        try:
762            with open(self._stdoutfile, 'w') as cout:
763                with open(self._stderrfile, 'w') as cerr:
764                    if with_profile:
765                        end_at = started_at + timedelta(seconds=self._timeout) \
766                            if self._timeout else None
767                        log.info(f'starting: {args}')
768                        p = subprocess.Popen(args, stderr=cerr, stdout=cout,
769                                             cwd=self._run_dir, shell=False,
770                                             env=self._run_env)
771                        profile = RunProfile(p.pid, started_at, self._run_dir)
772                        if intext is not None and False:
773                            p.communicate(input=intext.encode(), timeout=1)
774                        ptimeout = 0.0
775                        while True:
776                            try:
777                                p.wait(timeout=ptimeout)
778                                break
779                            except subprocess.TimeoutExpired:
780                                if end_at and datetime.now() >= end_at:
781                                    p.kill()
782                                    raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
783                                profile.sample()
784                                ptimeout = 0.01
785                        exitcode = p.returncode
786                        profile.finish()
787                        log.info(f'done: exit={exitcode}, profile={profile}')
788                    else:
789                        p = subprocess.run(args, stderr=cerr, stdout=cout,
790                                           cwd=self._run_dir, shell=False,
791                                           input=intext.encode() if intext else None,
792                                           timeout=self._timeout,
793                                           env=self._run_env)
794                        exitcode = p.returncode
795        except subprocess.TimeoutExpired:
796            now = datetime.now()
797            duration = now - started_at
798            log.warning(f'Timeout at {now} after {duration.total_seconds()}s '
799                        f'(configured {self._timeout}s): {args}')
800            exitcode = -1
801            exception = 'TimeoutExpired'
802        if tcpdump:
803            tcpdump.finish()
804        coutput = open(self._stdoutfile).readlines()
805        cerrput = open(self._stderrfile).readlines()
806        return ExecResult(args=args, exit_code=exitcode, exception=exception,
807                          stdout=coutput, stderr=cerrput,
808                          duration=datetime.now() - started_at,
809                          with_stats=with_stats,
810                          profile=profile, tcpdump=tcpdump)
811
812    def _raw(self, urls, intext='', timeout=None, options=None, insecure=False,
813             alpn_proto: Optional[str] = None,
814             force_resolve=True,
815             with_stats=False,
816             with_headers=True,
817             def_tracing=True,
818             with_profile=False,
819             with_tcpdump=False):
820        args = self._complete_args(
821            urls=urls, timeout=timeout, options=options, insecure=insecure,
822            alpn_proto=alpn_proto, force_resolve=force_resolve,
823            with_headers=with_headers, def_tracing=def_tracing)
824        r = self._run(args, intext=intext, with_stats=with_stats,
825                      with_profile=with_profile, with_tcpdump=with_tcpdump)
826        if r.exit_code == 0 and with_headers:
827            self._parse_headerfile(self._headerfile, r=r)
828            if r.json:
829                r.response["json"] = r.json
830        return r
831
832    def _complete_args(self, urls, timeout=None, options=None,
833                       insecure=False, force_resolve=True,
834                       alpn_proto: Optional[str] = None,
835                       with_headers: bool = True,
836                       def_tracing: bool = True):
837        if not isinstance(urls, list):
838            urls = [urls]
839
840        args = [self._curl, "-s", "--path-as-is"]
841        if 'CURL_TEST_EVENT' in os.environ:
842            args.append('--test-event')
843
844        if with_headers:
845            args.extend(["-D", self._headerfile])
846        if def_tracing is not False and not self._silent:
847            args.extend(['-v', '--trace-ids', '--trace-time'])
848            if self.env.verbose > 1:
849                args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy'])
850                pass
851
852        active_options = options
853        if options is not None and '--next' in options:
854            active_options = options[options.index('--next') + 1:]
855
856        for url in urls:
857            u = urlparse(urls[0])
858            if options:
859                args.extend(options)
860            if alpn_proto is not None:
861                if alpn_proto not in self.ALPN_ARG:
862                    raise Exception(f'unknown ALPN protocol: "{alpn_proto}"')
863                args.append(self.ALPN_ARG[alpn_proto])
864
865            if u.scheme == 'http':
866                pass
867            elif insecure:
868                args.append('--insecure')
869            elif active_options and "--cacert" in active_options:
870                pass
871            elif u.hostname:
872                args.extend(["--cacert", self.env.ca.cert_file])
873
874            if force_resolve and u.hostname and u.hostname != 'localhost' \
875                    and not re.match(r'^(\d+|\[|:).*', u.hostname):
876                port = u.port if u.port else 443
877                args.extend(["--resolve", f"{u.hostname}:{port}:127.0.0.1"])
878            if timeout is not None and int(timeout) > 0:
879                args.extend(["--connect-timeout", str(int(timeout))])
880            args.append(url)
881        return args
882
883    def _parse_headerfile(self, headerfile: str, r: ExecResult = None) -> ExecResult:
884        lines = open(headerfile).readlines()
885        if r is None:
886            r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])
887
888        response = None
889
890        def fin_response(resp):
891            if resp:
892                r.add_response(resp)
893
894        expected = ['status']
895        for line in lines:
896            line = line.strip()
897            if re.match(r'^$', line):
898                if 'trailer' in expected:
899                    # end of trailers
900                    fin_response(response)
901                    response = None
902                    expected = ['status']
903                elif 'header' in expected:
904                    # end of header, another status or trailers might follow
905                    expected = ['status', 'trailer']
906                else:
907                    assert False, f"unexpected line: '{line}'"
908                continue
909            if 'status' in expected:
910                # log.debug("reading 1st response line: %s", line)
911                m = re.match(r'^(\S+) (\d+)( .*)?$', line)
912                if m:
913                    fin_response(response)
914                    response = {
915                        "protocol": m.group(1),
916                        "status": int(m.group(2)),
917                        "description": m.group(3),
918                        "header": {},
919                        "trailer": {},
920                        "body": r.outraw
921                    }
922                    expected = ['header']
923                    continue
924            if 'trailer' in expected:
925                m = re.match(r'^([^:]+):\s*(.*)$', line)
926                if m:
927                    response['trailer'][m.group(1).lower()] = m.group(2)
928                    continue
929            if 'header' in expected:
930                m = re.match(r'^([^:]+):\s*(.*)$', line)
931                if m:
932                    response['header'][m.group(1).lower()] = m.group(2)
933                    continue
934            assert False, f"unexpected line: '{line}, expected: {expected}'"
935
936        fin_response(response)
937        return r
938