1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3#*************************************************************************** 4# _ _ ____ _ 5# Project ___| | | | _ \| | 6# / __| | | | |_) | | 7# | (__| |_| | _ <| |___ 8# \___|\___/|_| \_\_____| 9# 10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 11# 12# This software is licensed as described in the file COPYING, which 13# you should have received as part of this distribution. The terms 14# are also available at https://curl.se/docs/copyright.html. 15# 16# You may opt to use, copy, modify, merge, publish, distribute and/or sell 17# copies of the Software, and permit persons to whom the Software is 18# furnished to do so, under the terms of the COPYING file. 19# 20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21# KIND, either express or implied. 22# 23# SPDX-License-Identifier: curl 24# 25########################################################################### 26# 27import json 28import logging 29import os 30import sys 31import time 32from threading import Thread 33 34import psutil 35import re 36import shutil 37import subprocess 38from statistics import mean, fmean 39from datetime import timedelta, datetime 40from typing import List, Optional, Dict, Union, Any 41from urllib.parse import urlparse 42 43from .env import Env 44 45 46log = logging.getLogger(__name__) 47 48 49class RunProfile: 50 51 STAT_KEYS = ['cpu', 'rss', 'vsz'] 52 53 @classmethod 54 def AverageStats(cls, profiles: List['RunProfile']): 55 avg = {} 56 stats = [p.stats for p in profiles] 57 for key in cls.STAT_KEYS: 58 avg[key] = mean([s[key] for s in stats]) 59 return avg 60 61 def __init__(self, pid: int, started_at: datetime, run_dir): 62 self._pid = pid 63 self._started_at = started_at 64 self._duration = timedelta(seconds=0) 65 self._run_dir = run_dir 66 self._samples = [] 67 self._psu = None 68 self._stats = None 69 70 @property 71 def duration(self) -> timedelta: 72 return self._duration 73 74 @property 75 def stats(self) -> Optional[Dict[str,Any]]: 76 return self._stats 77 78 def sample(self): 79 elapsed = datetime.now() - self._started_at 80 try: 81 if self._psu is None: 82 self._psu = psutil.Process(pid=self._pid) 83 mem = self._psu.memory_info() 84 self._samples.append({ 85 'time': elapsed, 86 'cpu': self._psu.cpu_percent(), 87 'vsz': mem.vms, 88 'rss': mem.rss, 89 }) 90 except psutil.NoSuchProcess: 91 pass 92 93 def finish(self): 94 self._duration = datetime.now() - self._started_at 95 if len(self._samples) > 0: 96 weights = [s['time'].total_seconds() for s in self._samples] 97 self._stats = {} 98 for key in self.STAT_KEYS: 99 self._stats[key] = fmean([s[key] for s in self._samples], weights) 100 else: 101 self._stats = None 102 self._psu = None 103 104 def __repr__(self): 105 return f'RunProfile[pid={self._pid}, '\ 106 f'duration={self.duration.total_seconds():.3f}s, '\ 107 f'stats={self.stats}]' 108 109 110class RunTcpDump: 111 112 def __init__(self, env, run_dir): 113 self._env = env 114 self._run_dir = run_dir 115 self._proc = None 116 self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out') 117 self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err') 118 119 @property 120 def stats(self) -> Optional[List[str]]: 121 if self._proc: 122 raise Exception('tcpdump still running') 123 lines = [] 124 for l in open(self._stdoutfile).readlines(): 125 if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', l): 126 lines.append(l) 127 return lines 128 129 def stats_excluding(self, src_port) -> Optional[List[str]]: 130 if self._proc: 131 raise Exception('tcpdump still running') 132 lines = [] 133 for l in self.stats: 134 if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', l): 135 lines.append(l) 136 return lines 137 138 @property 139 def stderr(self) -> List[str]: 140 if self._proc: 141 raise Exception('tcpdump still running') 142 lines = [] 143 return open(self._stderrfile).readlines() 144 145 def sample(self): 146 # not sure how to make that detection reliable for all platforms 147 local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo' 148 try: 149 tcpdump = self._env.tcpdump() 150 if tcpdump is None: 151 raise Exception('tcpdump not available') 152 # look with tcpdump for TCP RST packets which indicate 153 # we did not shut down connections cleanly 154 args = [] 155 # at least on Linux, we need root permissions to run tcpdump 156 if sys.platform.startswith('linux'): 157 args.append('sudo') 158 args.extend([ 159 tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0' 160 ]) 161 with open(self._stdoutfile, 'w') as cout: 162 with open(self._stderrfile, 'w') as cerr: 163 self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr, 164 text=True, cwd=self._run_dir, 165 shell=False) 166 assert self._proc 167 assert self._proc.returncode is None 168 while self._proc: 169 try: 170 self._proc.wait(timeout=1) 171 except subprocess.TimeoutExpired: 172 pass 173 except Exception as e: 174 log.error(f'Tcpdump: {e}') 175 176 def start(self): 177 def do_sample(): 178 self.sample() 179 t = Thread(target=do_sample) 180 t.start() 181 182 def finish(self): 183 if self._proc: 184 time.sleep(1) 185 self._proc.terminate() 186 self._proc = None 187 188 189class ExecResult: 190 191 def __init__(self, args: List[str], exit_code: int, 192 stdout: List[str], stderr: List[str], 193 duration: Optional[timedelta] = None, 194 with_stats: bool = False, 195 exception: Optional[str] = None, 196 profile: Optional[RunProfile] = None, 197 tcpdump: Optional[RunTcpDump] = None): 198 self._args = args 199 self._exit_code = exit_code 200 self._exception = exception 201 self._stdout = stdout 202 self._stderr = stderr 203 self._profile = profile 204 self._tcpdump = tcpdump 205 self._duration = duration if duration is not None else timedelta() 206 self._response = None 207 self._responses = [] 208 self._results = {} 209 self._assets = [] 210 self._stats = [] 211 self._json_out = None 212 self._with_stats = with_stats 213 if with_stats: 214 self._parse_stats() 215 else: 216 # noinspection PyBroadException 217 try: 218 out = ''.join(self._stdout) 219 self._json_out = json.loads(out) 220 except: 221 pass 222 223 def __repr__(self): 224 return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\ 225 f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]" 226 227 def _parse_stats(self): 228 self._stats = [] 229 for l in self._stdout: 230 try: 231 self._stats.append(json.loads(l)) 232 except: 233 log.error(f'not a JSON stat: {l}') 234 break 235 236 @property 237 def exit_code(self) -> int: 238 return self._exit_code 239 240 @property 241 def args(self) -> List[str]: 242 return self._args 243 244 @property 245 def outraw(self) -> bytes: 246 return ''.join(self._stdout).encode() 247 248 @property 249 def stdout(self) -> str: 250 return ''.join(self._stdout) 251 252 @property 253 def json(self) -> Optional[Dict]: 254 """Output as JSON dictionary or None if not parseable.""" 255 return self._json_out 256 257 @property 258 def stderr(self) -> str: 259 return ''.join(self._stderr) 260 261 @property 262 def trace_lines(self) -> List[str]: 263 return self._stderr 264 265 @property 266 def duration(self) -> timedelta: 267 return self._duration 268 269 @property 270 def profile(self) -> Optional[RunProfile]: 271 return self._profile 272 273 @property 274 def tcpdump(self) -> Optional[RunTcpDump]: 275 return self._tcpdump 276 277 @property 278 def response(self) -> Optional[Dict]: 279 return self._response 280 281 @property 282 def responses(self) -> List[Dict]: 283 return self._responses 284 285 @property 286 def results(self) -> Dict: 287 return self._results 288 289 @property 290 def assets(self) -> List: 291 return self._assets 292 293 @property 294 def with_stats(self) -> bool: 295 return self._with_stats 296 297 @property 298 def stats(self) -> List: 299 return self._stats 300 301 @property 302 def total_connects(self) -> Optional[int]: 303 if len(self.stats): 304 n = 0 305 for stat in self.stats: 306 n += stat['num_connects'] 307 return n 308 return None 309 310 def add_response(self, resp: Dict): 311 self._response = resp 312 self._responses.append(resp) 313 314 def add_results(self, results: Dict): 315 self._results.update(results) 316 if 'response' in results: 317 self.add_response(results['response']) 318 319 def add_assets(self, assets: List): 320 self._assets.extend(assets) 321 322 def check_exit_code(self, code: Union[int, bool]): 323 if code is True: 324 assert self.exit_code == 0, f'expected exit code {code}, '\ 325 f'got {self.exit_code}\n{self.dump_logs()}' 326 elif code is False: 327 assert self.exit_code != 0, f'expected exit code {code}, '\ 328 f'got {self.exit_code}\n{self.dump_logs()}' 329 else: 330 assert self.exit_code == code, f'expected exit code {code}, '\ 331 f'got {self.exit_code}\n{self.dump_logs()}' 332 333 def check_response(self, http_status: Optional[int] = 200, 334 count: Optional[int] = 1, 335 protocol: Optional[str] = None, 336 exitcode: Optional[int] = 0, 337 connect_count: Optional[int] = None): 338 if exitcode: 339 self.check_exit_code(exitcode) 340 if self.with_stats and isinstance(exitcode, int): 341 for idx, x in enumerate(self.stats): 342 if 'exitcode' in x: 343 assert int(x['exitcode']) == exitcode, \ 344 f'response #{idx} exitcode: expected {exitcode}, '\ 345 f'got {x["exitcode"]}\n{self.dump_logs()}' 346 347 if self.with_stats: 348 assert len(self.stats) == count, \ 349 f'response count: expected {count}, ' \ 350 f'got {len(self.stats)}\n{self.dump_logs()}' 351 else: 352 assert len(self.responses) == count, \ 353 f'response count: expected {count}, ' \ 354 f'got {len(self.responses)}\n{self.dump_logs()}' 355 if http_status is not None: 356 if self.with_stats: 357 for idx, x in enumerate(self.stats): 358 assert 'http_code' in x, \ 359 f'response #{idx} reports no http_code\n{self.dump_stat(x)}' 360 assert x['http_code'] == http_status, \ 361 f'response #{idx} http_code: expected {http_status}, '\ 362 f'got {x["http_code"]}\n{self.dump_stat(x)}' 363 else: 364 for idx, x in enumerate(self.responses): 365 assert x['status'] == http_status, \ 366 f'response #{idx} status: expected {http_status},'\ 367 f'got {x["status"]}\n{self.dump_stat(x)}' 368 if protocol is not None: 369 if self.with_stats: 370 http_version = None 371 if protocol == 'HTTP/1.1': 372 http_version = '1.1' 373 elif protocol == 'HTTP/2': 374 http_version = '2' 375 elif protocol == 'HTTP/3': 376 http_version = '3' 377 if http_version is not None: 378 for idx, x in enumerate(self.stats): 379 assert x['http_version'] == http_version, \ 380 f'response #{idx} protocol: expected http/{http_version},' \ 381 f'got version {x["http_version"]}\n{self.dump_stat(x)}' 382 else: 383 for idx, x in enumerate(self.responses): 384 assert x['protocol'] == protocol, \ 385 f'response #{idx} protocol: expected {protocol},'\ 386 f'got {x["protocol"]}\n{self.dump_logs()}' 387 if connect_count is not None: 388 assert self.total_connects == connect_count, \ 389 f'expected {connect_count}, but {self.total_connects} '\ 390 f'were made\n{self.dump_logs()}' 391 392 def check_stats(self, count: int, http_status: Optional[int] = None, 393 exitcode: Optional[int] = None, 394 remote_port: Optional[int] = None, 395 remote_ip: Optional[str] = None): 396 if exitcode is None: 397 self.check_exit_code(0) 398 assert len(self.stats) == count, \ 399 f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}' 400 if http_status is not None: 401 for idx, x in enumerate(self.stats): 402 assert 'http_code' in x, \ 403 f'status #{idx} reports no http_code\n{self.dump_stat(x)}' 404 assert x['http_code'] == http_status, \ 405 f'status #{idx} http_code: expected {http_status}, '\ 406 f'got {x["http_code"]}\n{self.dump_stat(x)}' 407 if exitcode is not None: 408 for idx, x in enumerate(self.stats): 409 if 'exitcode' in x: 410 assert x['exitcode'] == exitcode, \ 411 f'status #{idx} exitcode: expected {exitcode}, '\ 412 f'got {x["exitcode"]}\n{self.dump_stat(x)}' 413 if remote_port is not None: 414 for idx, x in enumerate(self.stats): 415 assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}' 416 assert x['remote_port'] == remote_port, \ 417 f'status #{idx} remote_port: expected {remote_port}, '\ 418 f'got {x["remote_port"]}\n{self.dump_stat(x)}' 419 if remote_ip is not None: 420 for idx, x in enumerate(self.stats): 421 assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}' 422 assert x['remote_ip'] == remote_ip, \ 423 f'status #{idx} remote_ip: expected {remote_ip}, '\ 424 f'got {x["remote_ip"]}\n{self.dump_stat(x)}' 425 426 def dump_logs(self): 427 lines = ['>>--stdout ----------------------------------------------\n'] 428 lines.extend(self._stdout) 429 lines.append('>>--stderr ----------------------------------------------\n') 430 lines.extend(self._stderr) 431 lines.append('<<-------------------------------------------------------\n') 432 return ''.join(lines) 433 434 def dump_stat(self, x): 435 lines = [ 436 'json stat from curl:', 437 json.JSONEncoder(indent=2).encode(x), 438 ] 439 if 'xfer_id' in x: 440 xfer_id = x['xfer_id'] 441 lines.append(f'>>--xfer {xfer_id} trace:\n') 442 lines.extend(self.xfer_trace_for(xfer_id)) 443 else: 444 lines.append('>>--full trace-------------------------------------------\n') 445 lines.extend(self._stderr) 446 lines.append('<<-------------------------------------------------------\n') 447 return ''.join(lines) 448 449 def xfer_trace_for(self, xfer_id) -> List[str]: 450 pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$') 451 return [line for line in self._stderr if pat.match(line)] 452 453 454class CurlClient: 455 456 ALPN_ARG = { 457 'http/0.9': '--http0.9', 458 'http/1.0': '--http1.0', 459 'http/1.1': '--http1.1', 460 'h2': '--http2', 461 'h2c': '--http2', 462 'h3': '--http3-only', 463 } 464 465 def __init__(self, env: Env, 466 run_dir: Optional[str] = None, 467 timeout: Optional[float] = None, 468 silent: bool = False, 469 run_env: Optional[Dict[str, str]] = None): 470 self.env = env 471 self._timeout = timeout if timeout else env.test_timeout 472 self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl 473 self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl') 474 self._stdoutfile = f'{self._run_dir}/curl.stdout' 475 self._stderrfile = f'{self._run_dir}/curl.stderr' 476 self._headerfile = f'{self._run_dir}/curl.headers' 477 self._log_path = f'{self._run_dir}/curl.log' 478 self._silent = silent 479 self._run_env = run_env 480 self._rmrf(self._run_dir) 481 self._mkpath(self._run_dir) 482 483 @property 484 def run_dir(self) -> str: 485 return self._run_dir 486 487 def download_file(self, i: int) -> str: 488 return os.path.join(self.run_dir, f'download_{i}.data') 489 490 def _rmf(self, path): 491 if os.path.exists(path): 492 return os.remove(path) 493 494 def _rmrf(self, path): 495 if os.path.exists(path): 496 return shutil.rmtree(path) 497 498 def _mkpath(self, path): 499 if not os.path.exists(path): 500 return os.makedirs(path) 501 502 def get_proxy_args(self, proto: str = 'http/1.1', 503 proxys: bool = True, tunnel: bool = False, 504 use_ip: bool = False): 505 proxy_name = '127.0.0.1' if use_ip else self.env.proxy_domain 506 if proxys: 507 pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port 508 xargs = [ 509 '--proxy', f'https://{proxy_name}:{pport}/', 510 '--resolve', f'{proxy_name}:{pport}:127.0.0.1', 511 '--proxy-cacert', self.env.ca.cert_file, 512 ] 513 if proto == 'h2': 514 xargs.append('--proxy-http2') 515 else: 516 xargs = [ 517 '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/', 518 '--resolve', f'{proxy_name}:{self.env.proxy_port}:127.0.0.1', 519 ] 520 if tunnel: 521 xargs.append('--proxytunnel') 522 return xargs 523 524 def http_get(self, url: str, extra_args: Optional[List[str]] = None, 525 alpn_proto: Optional[str] = None, 526 def_tracing: bool = True, 527 with_stats: bool = False, 528 with_profile: bool = False, 529 with_tcpdump: bool = False): 530 return self._raw(url, options=extra_args, 531 with_stats=with_stats, 532 alpn_proto=alpn_proto, 533 def_tracing=def_tracing, 534 with_profile=with_profile, 535 with_tcpdump=with_tcpdump) 536 537 def http_download(self, urls: List[str], 538 alpn_proto: Optional[str] = None, 539 with_stats: bool = True, 540 with_headers: bool = False, 541 with_profile: bool = False, 542 with_tcpdump: bool = False, 543 no_save: bool = False, 544 extra_args: List[str] = None): 545 if extra_args is None: 546 extra_args = [] 547 if no_save: 548 extra_args.extend([ 549 '-o', '/dev/null', 550 ]) 551 else: 552 extra_args.extend([ 553 '-o', 'download_#1.data', 554 ]) 555 # remove any existing ones 556 for i in range(100): 557 self._rmf(self.download_file(i)) 558 if with_stats: 559 extra_args.extend([ 560 '-w', '%{json}\\n' 561 ]) 562 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 563 with_stats=with_stats, 564 with_headers=with_headers, 565 with_profile=with_profile, 566 with_tcpdump=with_tcpdump) 567 568 def http_upload(self, urls: List[str], data: str, 569 alpn_proto: Optional[str] = None, 570 with_stats: bool = True, 571 with_headers: bool = False, 572 with_profile: bool = False, 573 with_tcpdump: bool = False, 574 extra_args: Optional[List[str]] = None): 575 if extra_args is None: 576 extra_args = [] 577 extra_args.extend([ 578 '--data-binary', data, '-o', 'download_#1.data', 579 ]) 580 if with_stats: 581 extra_args.extend([ 582 '-w', '%{json}\\n' 583 ]) 584 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 585 with_stats=with_stats, 586 with_headers=with_headers, 587 with_profile=with_profile, 588 with_tcpdump=with_tcpdump) 589 590 def http_delete(self, urls: List[str], 591 alpn_proto: Optional[str] = None, 592 with_stats: bool = True, 593 with_profile: bool = False, 594 extra_args: Optional[List[str]] = None): 595 if extra_args is None: 596 extra_args = [] 597 extra_args.extend([ 598 '-X', 'DELETE', '-o', '/dev/null', 599 ]) 600 if with_stats: 601 extra_args.extend([ 602 '-w', '%{json}\\n' 603 ]) 604 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 605 with_stats=with_stats, 606 with_headers=False, 607 with_profile=with_profile) 608 609 def http_put(self, urls: List[str], data=None, fdata=None, 610 alpn_proto: Optional[str] = None, 611 with_stats: bool = True, 612 with_headers: bool = False, 613 with_profile: bool = False, 614 extra_args: Optional[List[str]] = None): 615 if extra_args is None: 616 extra_args = [] 617 if fdata is not None: 618 extra_args.extend(['-T', fdata]) 619 elif data is not None: 620 extra_args.extend(['-T', '-']) 621 extra_args.extend([ 622 '-o', 'download_#1.data', 623 ]) 624 if with_stats: 625 extra_args.extend([ 626 '-w', '%{json}\\n' 627 ]) 628 return self._raw(urls, intext=data, 629 alpn_proto=alpn_proto, options=extra_args, 630 with_stats=with_stats, 631 with_headers=with_headers, 632 with_profile=with_profile) 633 634 def http_form(self, urls: List[str], form: Dict[str, str], 635 alpn_proto: Optional[str] = None, 636 with_stats: bool = True, 637 with_headers: bool = False, 638 extra_args: Optional[List[str]] = None): 639 if extra_args is None: 640 extra_args = [] 641 for key, val in form.items(): 642 extra_args.extend(['-F', f'{key}={val}']) 643 extra_args.extend([ 644 '-o', 'download_#1.data', 645 ]) 646 if with_stats: 647 extra_args.extend([ 648 '-w', '%{json}\\n' 649 ]) 650 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 651 with_stats=with_stats, 652 with_headers=with_headers) 653 654 def ftp_get(self, urls: List[str], 655 with_stats: bool = True, 656 with_profile: bool = False, 657 with_tcpdump: bool = False, 658 no_save: bool = False, 659 extra_args: List[str] = None): 660 if extra_args is None: 661 extra_args = [] 662 if no_save: 663 extra_args.extend([ 664 '-o', '/dev/null', 665 ]) 666 else: 667 extra_args.extend([ 668 '-o', 'download_#1.data', 669 ]) 670 # remove any existing ones 671 for i in range(100): 672 self._rmf(self.download_file(i)) 673 if with_stats: 674 extra_args.extend([ 675 '-w', '%{json}\\n' 676 ]) 677 return self._raw(urls, options=extra_args, 678 with_stats=with_stats, 679 with_headers=False, 680 with_profile=with_profile, 681 with_tcpdump=with_tcpdump) 682 683 def ftp_ssl_get(self, urls: List[str], 684 with_stats: bool = True, 685 with_profile: bool = False, 686 with_tcpdump: bool = False, 687 no_save: bool = False, 688 extra_args: List[str] = None): 689 if extra_args is None: 690 extra_args = [] 691 extra_args.extend([ 692 '--ssl-reqd', 693 ]) 694 return self.ftp_get(urls=urls, with_stats=with_stats, 695 with_profile=with_profile, no_save=no_save, 696 with_tcpdump=with_tcpdump, 697 extra_args=extra_args) 698 699 def ftp_upload(self, urls: List[str], fupload, 700 with_stats: bool = True, 701 with_profile: bool = False, 702 with_tcpdump: bool = False, 703 extra_args: List[str] = None): 704 if extra_args is None: 705 extra_args = [] 706 extra_args.extend([ 707 '--upload-file', fupload 708 ]) 709 if with_stats: 710 extra_args.extend([ 711 '-w', '%{json}\\n' 712 ]) 713 return self._raw(urls, options=extra_args, 714 with_stats=with_stats, 715 with_headers=False, 716 with_profile=with_profile, 717 with_tcpdump=with_tcpdump) 718 719 def ftp_ssl_upload(self, urls: List[str], fupload, 720 with_stats: bool = True, 721 with_profile: bool = False, 722 with_tcpdump: bool = False, 723 extra_args: List[str] = None): 724 if extra_args is None: 725 extra_args = [] 726 extra_args.extend([ 727 '--ssl-reqd', 728 ]) 729 return self.ftp_upload(urls=urls, fupload=fupload, 730 with_stats=with_stats, with_profile=with_profile, 731 with_tcpdump=with_tcpdump, 732 extra_args=extra_args) 733 734 def response_file(self, idx: int): 735 return os.path.join(self._run_dir, f'download_{idx}.data') 736 737 def run_direct(self, args, with_stats: bool = False, with_profile: bool = False): 738 my_args = [self._curl] 739 if with_stats: 740 my_args.extend([ 741 '-w', '%{json}\\n' 742 ]) 743 my_args.extend([ 744 '-o', 'download.data', 745 ]) 746 my_args.extend(args) 747 return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile) 748 749 def _run(self, args, intext='', with_stats: bool = False, 750 with_profile: bool = True, with_tcpdump: bool = False): 751 self._rmf(self._stdoutfile) 752 self._rmf(self._stderrfile) 753 self._rmf(self._headerfile) 754 exception = None 755 profile = None 756 tcpdump = None 757 started_at = datetime.now() 758 if with_tcpdump: 759 tcpdump = RunTcpDump(self.env, self._run_dir) 760 tcpdump.start() 761 try: 762 with open(self._stdoutfile, 'w') as cout: 763 with open(self._stderrfile, 'w') as cerr: 764 if with_profile: 765 end_at = started_at + timedelta(seconds=self._timeout) \ 766 if self._timeout else None 767 log.info(f'starting: {args}') 768 p = subprocess.Popen(args, stderr=cerr, stdout=cout, 769 cwd=self._run_dir, shell=False, 770 env=self._run_env) 771 profile = RunProfile(p.pid, started_at, self._run_dir) 772 if intext is not None and False: 773 p.communicate(input=intext.encode(), timeout=1) 774 ptimeout = 0.0 775 while True: 776 try: 777 p.wait(timeout=ptimeout) 778 break 779 except subprocess.TimeoutExpired: 780 if end_at and datetime.now() >= end_at: 781 p.kill() 782 raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout) 783 profile.sample() 784 ptimeout = 0.01 785 exitcode = p.returncode 786 profile.finish() 787 log.info(f'done: exit={exitcode}, profile={profile}') 788 else: 789 p = subprocess.run(args, stderr=cerr, stdout=cout, 790 cwd=self._run_dir, shell=False, 791 input=intext.encode() if intext else None, 792 timeout=self._timeout, 793 env=self._run_env) 794 exitcode = p.returncode 795 except subprocess.TimeoutExpired: 796 now = datetime.now() 797 duration = now - started_at 798 log.warning(f'Timeout at {now} after {duration.total_seconds()}s ' 799 f'(configured {self._timeout}s): {args}') 800 exitcode = -1 801 exception = 'TimeoutExpired' 802 if tcpdump: 803 tcpdump.finish() 804 coutput = open(self._stdoutfile).readlines() 805 cerrput = open(self._stderrfile).readlines() 806 return ExecResult(args=args, exit_code=exitcode, exception=exception, 807 stdout=coutput, stderr=cerrput, 808 duration=datetime.now() - started_at, 809 with_stats=with_stats, 810 profile=profile, tcpdump=tcpdump) 811 812 def _raw(self, urls, intext='', timeout=None, options=None, insecure=False, 813 alpn_proto: Optional[str] = None, 814 force_resolve=True, 815 with_stats=False, 816 with_headers=True, 817 def_tracing=True, 818 with_profile=False, 819 with_tcpdump=False): 820 args = self._complete_args( 821 urls=urls, timeout=timeout, options=options, insecure=insecure, 822 alpn_proto=alpn_proto, force_resolve=force_resolve, 823 with_headers=with_headers, def_tracing=def_tracing) 824 r = self._run(args, intext=intext, with_stats=with_stats, 825 with_profile=with_profile, with_tcpdump=with_tcpdump) 826 if r.exit_code == 0 and with_headers: 827 self._parse_headerfile(self._headerfile, r=r) 828 if r.json: 829 r.response["json"] = r.json 830 return r 831 832 def _complete_args(self, urls, timeout=None, options=None, 833 insecure=False, force_resolve=True, 834 alpn_proto: Optional[str] = None, 835 with_headers: bool = True, 836 def_tracing: bool = True): 837 if not isinstance(urls, list): 838 urls = [urls] 839 840 args = [self._curl, "-s", "--path-as-is"] 841 if 'CURL_TEST_EVENT' in os.environ: 842 args.append('--test-event') 843 844 if with_headers: 845 args.extend(["-D", self._headerfile]) 846 if def_tracing is not False and not self._silent: 847 args.extend(['-v', '--trace-ids', '--trace-time']) 848 if self.env.verbose > 1: 849 args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy']) 850 pass 851 852 active_options = options 853 if options is not None and '--next' in options: 854 active_options = options[options.index('--next') + 1:] 855 856 for url in urls: 857 u = urlparse(urls[0]) 858 if options: 859 args.extend(options) 860 if alpn_proto is not None: 861 if alpn_proto not in self.ALPN_ARG: 862 raise Exception(f'unknown ALPN protocol: "{alpn_proto}"') 863 args.append(self.ALPN_ARG[alpn_proto]) 864 865 if u.scheme == 'http': 866 pass 867 elif insecure: 868 args.append('--insecure') 869 elif active_options and "--cacert" in active_options: 870 pass 871 elif u.hostname: 872 args.extend(["--cacert", self.env.ca.cert_file]) 873 874 if force_resolve and u.hostname and u.hostname != 'localhost' \ 875 and not re.match(r'^(\d+|\[|:).*', u.hostname): 876 port = u.port if u.port else 443 877 args.extend(["--resolve", f"{u.hostname}:{port}:127.0.0.1"]) 878 if timeout is not None and int(timeout) > 0: 879 args.extend(["--connect-timeout", str(int(timeout))]) 880 args.append(url) 881 return args 882 883 def _parse_headerfile(self, headerfile: str, r: ExecResult = None) -> ExecResult: 884 lines = open(headerfile).readlines() 885 if r is None: 886 r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[]) 887 888 response = None 889 890 def fin_response(resp): 891 if resp: 892 r.add_response(resp) 893 894 expected = ['status'] 895 for line in lines: 896 line = line.strip() 897 if re.match(r'^$', line): 898 if 'trailer' in expected: 899 # end of trailers 900 fin_response(response) 901 response = None 902 expected = ['status'] 903 elif 'header' in expected: 904 # end of header, another status or trailers might follow 905 expected = ['status', 'trailer'] 906 else: 907 assert False, f"unexpected line: '{line}'" 908 continue 909 if 'status' in expected: 910 # log.debug("reading 1st response line: %s", line) 911 m = re.match(r'^(\S+) (\d+)( .*)?$', line) 912 if m: 913 fin_response(response) 914 response = { 915 "protocol": m.group(1), 916 "status": int(m.group(2)), 917 "description": m.group(3), 918 "header": {}, 919 "trailer": {}, 920 "body": r.outraw 921 } 922 expected = ['header'] 923 continue 924 if 'trailer' in expected: 925 m = re.match(r'^([^:]+):\s*(.*)$', line) 926 if m: 927 response['trailer'][m.group(1).lower()] = m.group(2) 928 continue 929 if 'header' in expected: 930 m = re.match(r'^([^:]+):\s*(.*)$', line) 931 if m: 932 response['header'][m.group(1).lower()] = m.group(2) 933 continue 934 assert False, f"unexpected line: '{line}, expected: {expected}'" 935 936 fin_response(response) 937 return r 938