1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3#*************************************************************************** 4# _ _ ____ _ 5# Project ___| | | | _ \| | 6# / __| | | | |_) | | 7# | (__| |_| | _ <| |___ 8# \___|\___/|_| \_\_____| 9# 10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 11# 12# This software is licensed as described in the file COPYING, which 13# you should have received as part of this distribution. The terms 14# are also available at https://curl.se/docs/copyright.html. 15# 16# You may opt to use, copy, modify, merge, publish, distribute and/or sell 17# copies of the Software, and permit persons to whom the Software is 18# furnished to do so, under the terms of the COPYING file. 19# 20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21# KIND, either express or implied. 22# 23# SPDX-License-Identifier: curl 24# 25########################################################################### 26# 27import json 28import logging 29import os 30import sys 31import time 32from threading import Thread 33 34import psutil 35import re 36import shutil 37import subprocess 38from statistics import mean, fmean 39from datetime import timedelta, datetime 40from typing import List, Optional, Dict, Union, Any 41from urllib.parse import urlparse 42 43from .env import Env 44 45 46log = logging.getLogger(__name__) 47 48 49class RunProfile: 50 51 STAT_KEYS = ['cpu', 'rss', 'vsz'] 52 53 @classmethod 54 def AverageStats(cls, profiles: List['RunProfile']): 55 avg = {} 56 stats = [p.stats for p in profiles] 57 for key in cls.STAT_KEYS: 58 vals = [s[key] for s in stats] 59 avg[key] = mean(vals) if len(vals) else 0.0 60 return avg 61 62 def __init__(self, pid: int, started_at: datetime, run_dir): 63 self._pid = pid 64 self._started_at = started_at 65 self._duration = timedelta(seconds=0) 66 self._run_dir = run_dir 67 self._samples = [] 68 self._psu = None 69 self._stats = None 70 71 @property 72 def duration(self) -> timedelta: 73 return self._duration 74 75 @property 76 def stats(self) -> Optional[Dict[str,Any]]: 77 return self._stats 78 79 def sample(self): 80 elapsed = datetime.now() - self._started_at 81 try: 82 if self._psu is None: 83 self._psu = psutil.Process(pid=self._pid) 84 mem = self._psu.memory_info() 85 self._samples.append({ 86 'time': elapsed, 87 'cpu': self._psu.cpu_percent(), 88 'vsz': mem.vms, 89 'rss': mem.rss, 90 }) 91 except psutil.NoSuchProcess: 92 pass 93 94 def finish(self): 95 self._duration = datetime.now() - self._started_at 96 if len(self._samples) > 0: 97 weights = [s['time'].total_seconds() for s in self._samples] 98 self._stats = {} 99 for key in self.STAT_KEYS: 100 self._stats[key] = fmean([s[key] for s in self._samples], weights) 101 else: 102 self._stats = None 103 self._psu = None 104 105 def __repr__(self): 106 return f'RunProfile[pid={self._pid}, '\ 107 f'duration={self.duration.total_seconds():.3f}s, '\ 108 f'stats={self.stats}]' 109 110 111class RunTcpDump: 112 113 def __init__(self, env, run_dir): 114 self._env = env 115 self._run_dir = run_dir 116 self._proc = None 117 self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out') 118 self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err') 119 120 @property 121 def stats(self) -> Optional[List[str]]: 122 if self._proc: 123 raise Exception('tcpdump still running') 124 return [line 125 for line in open(self._stdoutfile) 126 if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line)] 127 128 def stats_excluding(self, src_port) -> Optional[List[str]]: 129 if self._proc: 130 raise Exception('tcpdump still running') 131 return [line 132 for line in self.stats 133 if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line)] 134 135 @property 136 def stderr(self) -> List[str]: 137 if self._proc: 138 raise Exception('tcpdump still running') 139 return open(self._stderrfile).readlines() 140 141 def sample(self): 142 # not sure how to make that detection reliable for all platforms 143 local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo' 144 try: 145 tcpdump = self._env.tcpdump() 146 if tcpdump is None: 147 raise Exception('tcpdump not available') 148 # look with tcpdump for TCP RST packets which indicate 149 # we did not shut down connections cleanly 150 args = [] 151 # at least on Linux, we need root permissions to run tcpdump 152 if sys.platform.startswith('linux'): 153 args.append('sudo') 154 args.extend([ 155 tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0' 156 ]) 157 with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr: 158 self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr, 159 text=True, cwd=self._run_dir, 160 shell=False) 161 assert self._proc 162 assert self._proc.returncode is None 163 while self._proc: 164 try: 165 self._proc.wait(timeout=1) 166 except subprocess.TimeoutExpired: 167 pass 168 except Exception: 169 log.exception('Tcpdump') 170 171 def start(self): 172 def do_sample(): 173 self.sample() 174 t = Thread(target=do_sample) 175 t.start() 176 177 def finish(self): 178 if self._proc: 179 time.sleep(1) 180 self._proc.terminate() 181 self._proc = None 182 183 184class ExecResult: 185 186 def __init__(self, args: List[str], exit_code: int, 187 stdout: List[str], stderr: List[str], 188 duration: Optional[timedelta] = None, 189 with_stats: bool = False, 190 exception: Optional[str] = None, 191 profile: Optional[RunProfile] = None, 192 tcpdump: Optional[RunTcpDump] = None): 193 self._args = args 194 self._exit_code = exit_code 195 self._exception = exception 196 self._stdout = stdout 197 self._stderr = stderr 198 self._profile = profile 199 self._tcpdump = tcpdump 200 self._duration = duration if duration is not None else timedelta() 201 self._response = None 202 self._responses = [] 203 self._results = {} 204 self._assets = [] 205 self._stats = [] 206 self._json_out = None 207 self._with_stats = with_stats 208 if with_stats: 209 self._parse_stats() 210 else: 211 # noinspection PyBroadException 212 try: 213 out = ''.join(self._stdout) 214 self._json_out = json.loads(out) 215 except: # noqa: E722 216 pass 217 218 def __repr__(self): 219 return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\ 220 f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]" 221 222 def _parse_stats(self): 223 self._stats = [] 224 for line in self._stdout: 225 try: 226 self._stats.append(json.loads(line)) 227 # TODO: specify specific exceptions here 228 except: # noqa: E722 229 log.exception(f'not a JSON stat: {line}') 230 break 231 232 @property 233 def exit_code(self) -> int: 234 return self._exit_code 235 236 @property 237 def args(self) -> List[str]: 238 return self._args 239 240 @property 241 def outraw(self) -> bytes: 242 return ''.join(self._stdout).encode() 243 244 @property 245 def stdout(self) -> str: 246 return ''.join(self._stdout) 247 248 @property 249 def json(self) -> Optional[Dict]: 250 """Output as JSON dictionary or None if not parseable.""" 251 return self._json_out 252 253 @property 254 def stderr(self) -> str: 255 return ''.join(self._stderr) 256 257 @property 258 def trace_lines(self) -> List[str]: 259 return self._stderr 260 261 @property 262 def duration(self) -> timedelta: 263 return self._duration 264 265 @property 266 def profile(self) -> Optional[RunProfile]: 267 return self._profile 268 269 @property 270 def tcpdump(self) -> Optional[RunTcpDump]: 271 return self._tcpdump 272 273 @property 274 def response(self) -> Optional[Dict]: 275 return self._response 276 277 @property 278 def responses(self) -> List[Dict]: 279 return self._responses 280 281 @property 282 def results(self) -> Dict: 283 return self._results 284 285 @property 286 def assets(self) -> List: 287 return self._assets 288 289 @property 290 def with_stats(self) -> bool: 291 return self._with_stats 292 293 @property 294 def stats(self) -> List: 295 return self._stats 296 297 @property 298 def total_connects(self) -> Optional[int]: 299 if len(self.stats): 300 n = 0 301 for stat in self.stats: 302 n += stat['num_connects'] 303 return n 304 return None 305 306 def add_response(self, resp: Dict): 307 self._response = resp 308 self._responses.append(resp) 309 310 def add_results(self, results: Dict): 311 self._results.update(results) 312 if 'response' in results: 313 self.add_response(results['response']) 314 315 def add_assets(self, assets: List): 316 self._assets.extend(assets) 317 318 def check_exit_code(self, code: Union[int, bool]): 319 if code is True: 320 assert self.exit_code == 0, f'expected exit code {code}, '\ 321 f'got {self.exit_code}\n{self.dump_logs()}' 322 elif code is False: 323 assert self.exit_code != 0, f'expected exit code {code}, '\ 324 f'got {self.exit_code}\n{self.dump_logs()}' 325 else: 326 assert self.exit_code == code, f'expected exit code {code}, '\ 327 f'got {self.exit_code}\n{self.dump_logs()}' 328 329 def check_response(self, http_status: Optional[int] = 200, 330 count: Optional[int] = 1, 331 protocol: Optional[str] = None, 332 exitcode: Optional[int] = 0, 333 connect_count: Optional[int] = None): 334 if exitcode: 335 self.check_exit_code(exitcode) 336 if self.with_stats and isinstance(exitcode, int): 337 for idx, x in enumerate(self.stats): 338 if 'exitcode' in x: 339 assert int(x['exitcode']) == exitcode, \ 340 f'response #{idx} exitcode: expected {exitcode}, '\ 341 f'got {x["exitcode"]}\n{self.dump_logs()}' 342 343 if self.with_stats: 344 assert len(self.stats) == count, \ 345 f'response count: expected {count}, ' \ 346 f'got {len(self.stats)}\n{self.dump_logs()}' 347 else: 348 assert len(self.responses) == count, \ 349 f'response count: expected {count}, ' \ 350 f'got {len(self.responses)}\n{self.dump_logs()}' 351 if http_status is not None: 352 if self.with_stats: 353 for idx, x in enumerate(self.stats): 354 assert 'http_code' in x, \ 355 f'response #{idx} reports no http_code\n{self.dump_stat(x)}' 356 assert x['http_code'] == http_status, \ 357 f'response #{idx} http_code: expected {http_status}, '\ 358 f'got {x["http_code"]}\n{self.dump_stat(x)}' 359 else: 360 for idx, x in enumerate(self.responses): 361 assert x['status'] == http_status, \ 362 f'response #{idx} status: expected {http_status},'\ 363 f'got {x["status"]}\n{self.dump_stat(x)}' 364 if protocol is not None: 365 if self.with_stats: 366 http_version = None 367 if protocol == 'HTTP/1.1': 368 http_version = '1.1' 369 elif protocol == 'HTTP/2': 370 http_version = '2' 371 elif protocol == 'HTTP/3': 372 http_version = '3' 373 if http_version is not None: 374 for idx, x in enumerate(self.stats): 375 assert x['http_version'] == http_version, \ 376 f'response #{idx} protocol: expected http/{http_version},' \ 377 f'got version {x["http_version"]}\n{self.dump_stat(x)}' 378 else: 379 for idx, x in enumerate(self.responses): 380 assert x['protocol'] == protocol, \ 381 f'response #{idx} protocol: expected {protocol},'\ 382 f'got {x["protocol"]}\n{self.dump_logs()}' 383 if connect_count is not None: 384 assert self.total_connects == connect_count, \ 385 f'expected {connect_count}, but {self.total_connects} '\ 386 f'were made\n{self.dump_logs()}' 387 388 def check_stats(self, count: int, http_status: Optional[int] = None, 389 exitcode: Optional[int] = None, 390 remote_port: Optional[int] = None, 391 remote_ip: Optional[str] = None): 392 if exitcode is None: 393 self.check_exit_code(0) 394 assert len(self.stats) == count, \ 395 f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}' 396 if http_status is not None: 397 for idx, x in enumerate(self.stats): 398 assert 'http_code' in x, \ 399 f'status #{idx} reports no http_code\n{self.dump_stat(x)}' 400 assert x['http_code'] == http_status, \ 401 f'status #{idx} http_code: expected {http_status}, '\ 402 f'got {x["http_code"]}\n{self.dump_stat(x)}' 403 if exitcode is not None: 404 for idx, x in enumerate(self.stats): 405 if 'exitcode' in x: 406 assert x['exitcode'] == exitcode, \ 407 f'status #{idx} exitcode: expected {exitcode}, '\ 408 f'got {x["exitcode"]}\n{self.dump_stat(x)}' 409 if remote_port is not None: 410 for idx, x in enumerate(self.stats): 411 assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}' 412 assert x['remote_port'] == remote_port, \ 413 f'status #{idx} remote_port: expected {remote_port}, '\ 414 f'got {x["remote_port"]}\n{self.dump_stat(x)}' 415 if remote_ip is not None: 416 for idx, x in enumerate(self.stats): 417 assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}' 418 assert x['remote_ip'] == remote_ip, \ 419 f'status #{idx} remote_ip: expected {remote_ip}, '\ 420 f'got {x["remote_ip"]}\n{self.dump_stat(x)}' 421 422 def dump_logs(self): 423 lines = ['>>--stdout ----------------------------------------------\n'] 424 lines.extend(self._stdout) 425 lines.append('>>--stderr ----------------------------------------------\n') 426 lines.extend(self._stderr) 427 lines.append('<<-------------------------------------------------------\n') 428 return ''.join(lines) 429 430 def dump_stat(self, x): 431 lines = [ 432 'json stat from curl:', 433 json.JSONEncoder(indent=2).encode(x), 434 ] 435 if 'xfer_id' in x: 436 xfer_id = x['xfer_id'] 437 lines.append(f'>>--xfer {xfer_id} trace:\n') 438 lines.extend(self.xfer_trace_for(xfer_id)) 439 else: 440 lines.append('>>--full trace-------------------------------------------\n') 441 lines.extend(self._stderr) 442 lines.append('<<-------------------------------------------------------\n') 443 return ''.join(lines) 444 445 def xfer_trace_for(self, xfer_id) -> List[str]: 446 pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$') 447 return [line for line in self._stderr if pat.match(line)] 448 449 450class CurlClient: 451 452 ALPN_ARG = { 453 'http/0.9': '--http0.9', 454 'http/1.0': '--http1.0', 455 'http/1.1': '--http1.1', 456 'h2': '--http2', 457 'h2c': '--http2', 458 'h3': '--http3-only', 459 } 460 461 def __init__(self, env: Env, 462 run_dir: Optional[str] = None, 463 timeout: Optional[float] = None, 464 silent: bool = False, 465 run_env: Optional[Dict[str, str]] = None, 466 server_addr: Optional[str] = None): 467 self.env = env 468 self._timeout = timeout if timeout else env.test_timeout 469 self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl 470 self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl') 471 self._stdoutfile = f'{self._run_dir}/curl.stdout' 472 self._stderrfile = f'{self._run_dir}/curl.stderr' 473 self._headerfile = f'{self._run_dir}/curl.headers' 474 self._log_path = f'{self._run_dir}/curl.log' 475 self._silent = silent 476 self._run_env = run_env 477 self._server_addr = server_addr if server_addr else '127.0.0.1' 478 self._rmrf(self._run_dir) 479 self._mkpath(self._run_dir) 480 481 @property 482 def run_dir(self) -> str: 483 return self._run_dir 484 485 def download_file(self, i: int) -> str: 486 return os.path.join(self.run_dir, f'download_{i}.data') 487 488 def _rmf(self, path): 489 if os.path.exists(path): 490 return os.remove(path) 491 492 def _rmrf(self, path): 493 if os.path.exists(path): 494 return shutil.rmtree(path) 495 496 def _mkpath(self, path): 497 if not os.path.exists(path): 498 return os.makedirs(path) 499 500 def get_proxy_args(self, proto: str = 'http/1.1', 501 proxys: bool = True, tunnel: bool = False, 502 use_ip: bool = False): 503 proxy_name = self._server_addr if use_ip else self.env.proxy_domain 504 if proxys: 505 pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port 506 xargs = [ 507 '--proxy', f'https://{proxy_name}:{pport}/', 508 '--resolve', f'{proxy_name}:{pport}:{self._server_addr}', 509 '--proxy-cacert', self.env.ca.cert_file, 510 ] 511 if proto == 'h2': 512 xargs.append('--proxy-http2') 513 else: 514 xargs = [ 515 '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/', 516 '--resolve', f'{proxy_name}:{self.env.proxy_port}:{self._server_addr}', 517 ] 518 if tunnel: 519 xargs.append('--proxytunnel') 520 return xargs 521 522 def http_get(self, url: str, extra_args: Optional[List[str]] = None, 523 alpn_proto: Optional[str] = None, 524 def_tracing: bool = True, 525 with_stats: bool = False, 526 with_profile: bool = False, 527 with_tcpdump: bool = False): 528 return self._raw(url, options=extra_args, 529 with_stats=with_stats, 530 alpn_proto=alpn_proto, 531 def_tracing=def_tracing, 532 with_profile=with_profile, 533 with_tcpdump=with_tcpdump) 534 535 def http_download(self, urls: List[str], 536 alpn_proto: Optional[str] = None, 537 with_stats: bool = True, 538 with_headers: bool = False, 539 with_profile: bool = False, 540 with_tcpdump: bool = False, 541 no_save: bool = False, 542 extra_args: Optional[List[str]] = None): 543 if extra_args is None: 544 extra_args = [] 545 if no_save: 546 extra_args.extend([ 547 '-o', '/dev/null', 548 ]) 549 else: 550 extra_args.extend([ 551 '-o', 'download_#1.data', 552 ]) 553 # remove any existing ones 554 for i in range(100): 555 self._rmf(self.download_file(i)) 556 if with_stats: 557 extra_args.extend([ 558 '-w', '%{json}\\n' 559 ]) 560 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 561 with_stats=with_stats, 562 with_headers=with_headers, 563 with_profile=with_profile, 564 with_tcpdump=with_tcpdump) 565 566 def http_upload(self, urls: List[str], data: str, 567 alpn_proto: Optional[str] = None, 568 with_stats: bool = True, 569 with_headers: bool = False, 570 with_profile: bool = False, 571 with_tcpdump: bool = False, 572 extra_args: Optional[List[str]] = None): 573 if extra_args is None: 574 extra_args = [] 575 extra_args.extend([ 576 '--data-binary', data, '-o', 'download_#1.data', 577 ]) 578 if with_stats: 579 extra_args.extend([ 580 '-w', '%{json}\\n' 581 ]) 582 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 583 with_stats=with_stats, 584 with_headers=with_headers, 585 with_profile=with_profile, 586 with_tcpdump=with_tcpdump) 587 588 def http_delete(self, urls: List[str], 589 alpn_proto: Optional[str] = None, 590 with_stats: bool = True, 591 with_profile: bool = False, 592 extra_args: Optional[List[str]] = None): 593 if extra_args is None: 594 extra_args = [] 595 extra_args.extend([ 596 '-X', 'DELETE', '-o', '/dev/null', 597 ]) 598 if with_stats: 599 extra_args.extend([ 600 '-w', '%{json}\\n' 601 ]) 602 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 603 with_stats=with_stats, 604 with_headers=False, 605 with_profile=with_profile) 606 607 def http_put(self, urls: List[str], data=None, fdata=None, 608 alpn_proto: Optional[str] = None, 609 with_stats: bool = True, 610 with_headers: bool = False, 611 with_profile: bool = False, 612 extra_args: Optional[List[str]] = None): 613 if extra_args is None: 614 extra_args = [] 615 if fdata is not None: 616 extra_args.extend(['-T', fdata]) 617 elif data is not None: 618 extra_args.extend(['-T', '-']) 619 extra_args.extend([ 620 '-o', 'download_#1.data', 621 ]) 622 if with_stats: 623 extra_args.extend([ 624 '-w', '%{json}\\n' 625 ]) 626 return self._raw(urls, intext=data, 627 alpn_proto=alpn_proto, options=extra_args, 628 with_stats=with_stats, 629 with_headers=with_headers, 630 with_profile=with_profile) 631 632 def http_form(self, urls: List[str], form: Dict[str, str], 633 alpn_proto: Optional[str] = None, 634 with_stats: bool = True, 635 with_headers: bool = False, 636 extra_args: Optional[List[str]] = None): 637 if extra_args is None: 638 extra_args = [] 639 for key, val in form.items(): 640 extra_args.extend(['-F', f'{key}={val}']) 641 extra_args.extend([ 642 '-o', 'download_#1.data', 643 ]) 644 if with_stats: 645 extra_args.extend([ 646 '-w', '%{json}\\n' 647 ]) 648 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 649 with_stats=with_stats, 650 with_headers=with_headers) 651 652 def ftp_get(self, urls: List[str], 653 with_stats: bool = True, 654 with_profile: bool = False, 655 with_tcpdump: bool = False, 656 no_save: bool = False, 657 extra_args: Optional[List[str]] = None): 658 if extra_args is None: 659 extra_args = [] 660 if no_save: 661 extra_args.extend([ 662 '-o', '/dev/null', 663 ]) 664 else: 665 extra_args.extend([ 666 '-o', 'download_#1.data', 667 ]) 668 # remove any existing ones 669 for i in range(100): 670 self._rmf(self.download_file(i)) 671 if with_stats: 672 extra_args.extend([ 673 '-w', '%{json}\\n' 674 ]) 675 return self._raw(urls, options=extra_args, 676 with_stats=with_stats, 677 with_headers=False, 678 with_profile=with_profile, 679 with_tcpdump=with_tcpdump) 680 681 def ftp_ssl_get(self, urls: List[str], 682 with_stats: bool = True, 683 with_profile: bool = False, 684 with_tcpdump: bool = False, 685 no_save: bool = False, 686 extra_args: Optional[List[str]] = None): 687 if extra_args is None: 688 extra_args = [] 689 extra_args.extend([ 690 '--ssl-reqd', 691 ]) 692 return self.ftp_get(urls=urls, with_stats=with_stats, 693 with_profile=with_profile, no_save=no_save, 694 with_tcpdump=with_tcpdump, 695 extra_args=extra_args) 696 697 def ftp_upload(self, urls: List[str], 698 fupload: Optional[Any] = None, 699 updata: Optional[str] = None, 700 with_stats: bool = True, 701 with_profile: bool = False, 702 with_tcpdump: bool = False, 703 extra_args: Optional[List[str]] = None): 704 if extra_args is None: 705 extra_args = [] 706 if fupload is not None: 707 extra_args.extend([ 708 '--upload-file', fupload 709 ]) 710 elif updata is not None: 711 extra_args.extend([ 712 '--upload-file', '-' 713 ]) 714 else: 715 raise Exception('need either file or data to upload') 716 if with_stats: 717 extra_args.extend([ 718 '-w', '%{json}\\n' 719 ]) 720 return self._raw(urls, options=extra_args, 721 intext=updata, 722 with_stats=with_stats, 723 with_headers=False, 724 with_profile=with_profile, 725 with_tcpdump=with_tcpdump) 726 727 def ftp_ssl_upload(self, urls: List[str], 728 fupload: Optional[Any] = None, 729 updata: Optional[str] = None, 730 with_stats: bool = True, 731 with_profile: bool = False, 732 with_tcpdump: bool = False, 733 extra_args: Optional[List[str]] = None): 734 if extra_args is None: 735 extra_args = [] 736 extra_args.extend([ 737 '--ssl-reqd', 738 ]) 739 return self.ftp_upload(urls=urls, fupload=fupload, updata=updata, 740 with_stats=with_stats, with_profile=with_profile, 741 with_tcpdump=with_tcpdump, 742 extra_args=extra_args) 743 744 def response_file(self, idx: int): 745 return os.path.join(self._run_dir, f'download_{idx}.data') 746 747 def run_direct(self, args, with_stats: bool = False, with_profile: bool = False): 748 my_args = [self._curl] 749 if with_stats: 750 my_args.extend([ 751 '-w', '%{json}\\n' 752 ]) 753 my_args.extend([ 754 '-o', 'download.data', 755 ]) 756 my_args.extend(args) 757 return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile) 758 759 def _run(self, args, intext='', with_stats: bool = False, 760 with_profile: bool = True, with_tcpdump: bool = False): 761 self._rmf(self._stdoutfile) 762 self._rmf(self._stderrfile) 763 self._rmf(self._headerfile) 764 exception = None 765 profile = None 766 tcpdump = None 767 started_at = datetime.now() 768 if with_tcpdump: 769 tcpdump = RunTcpDump(self.env, self._run_dir) 770 tcpdump.start() 771 try: 772 with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr: 773 if with_profile: 774 end_at = started_at + timedelta(seconds=self._timeout) \ 775 if self._timeout else None 776 log.info(f'starting: {args}') 777 p = subprocess.Popen(args, stderr=cerr, stdout=cout, 778 cwd=self._run_dir, shell=False, 779 env=self._run_env) 780 profile = RunProfile(p.pid, started_at, self._run_dir) 781 if intext is not None and False: 782 p.communicate(input=intext.encode(), timeout=1) 783 ptimeout = 0.0 784 while True: 785 try: 786 p.wait(timeout=ptimeout) 787 break 788 except subprocess.TimeoutExpired: 789 if end_at and datetime.now() >= end_at: 790 p.kill() 791 raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout) 792 profile.sample() 793 ptimeout = 0.01 794 exitcode = p.returncode 795 profile.finish() 796 log.info(f'done: exit={exitcode}, profile={profile}') 797 else: 798 p = subprocess.run(args, stderr=cerr, stdout=cout, 799 cwd=self._run_dir, shell=False, 800 input=intext.encode() if intext else None, 801 timeout=self._timeout, 802 env=self._run_env) 803 exitcode = p.returncode 804 except subprocess.TimeoutExpired: 805 now = datetime.now() 806 duration = now - started_at 807 log.warning(f'Timeout at {now} after {duration.total_seconds()}s ' 808 f'(configured {self._timeout}s): {args}') 809 exitcode = -1 810 exception = 'TimeoutExpired' 811 if tcpdump: 812 tcpdump.finish() 813 coutput = open(self._stdoutfile).readlines() 814 cerrput = open(self._stderrfile).readlines() 815 return ExecResult(args=args, exit_code=exitcode, exception=exception, 816 stdout=coutput, stderr=cerrput, 817 duration=datetime.now() - started_at, 818 with_stats=with_stats, 819 profile=profile, tcpdump=tcpdump) 820 821 def _raw(self, urls, intext='', timeout=None, options=None, insecure=False, 822 alpn_proto: Optional[str] = None, 823 force_resolve=True, 824 with_stats=False, 825 with_headers=True, 826 def_tracing=True, 827 with_profile=False, 828 with_tcpdump=False): 829 args = self._complete_args( 830 urls=urls, timeout=timeout, options=options, insecure=insecure, 831 alpn_proto=alpn_proto, force_resolve=force_resolve, 832 with_headers=with_headers, def_tracing=def_tracing) 833 r = self._run(args, intext=intext, with_stats=with_stats, 834 with_profile=with_profile, with_tcpdump=with_tcpdump) 835 if r.exit_code == 0 and with_headers: 836 self._parse_headerfile(self._headerfile, r=r) 837 return r 838 839 def _complete_args(self, urls, timeout=None, options=None, 840 insecure=False, force_resolve=True, 841 alpn_proto: Optional[str] = None, 842 with_headers: bool = True, 843 def_tracing: bool = True): 844 if not isinstance(urls, list): 845 urls = [urls] 846 847 args = [self._curl, "-s", "--path-as-is"] 848 if 'CURL_TEST_EVENT' in os.environ: 849 args.append('--test-event') 850 851 if with_headers: 852 args.extend(["-D", self._headerfile]) 853 if def_tracing is not False and not self._silent: 854 args.extend(['-v', '--trace-ids', '--trace-time']) 855 if self.env.verbose > 1: 856 args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy']) 857 858 active_options = options 859 if options is not None and '--next' in options: 860 active_options = options[options.index('--next') + 1:] 861 862 for url in urls: 863 u = urlparse(urls[0]) 864 if options: 865 args.extend(options) 866 if alpn_proto is not None: 867 if alpn_proto not in self.ALPN_ARG: 868 raise Exception(f'unknown ALPN protocol: "{alpn_proto}"') 869 args.append(self.ALPN_ARG[alpn_proto]) 870 871 if u.scheme == 'http': 872 pass 873 elif insecure: 874 args.append('--insecure') 875 elif active_options and "--cacert" in active_options: 876 pass 877 elif u.hostname: 878 args.extend(["--cacert", self.env.ca.cert_file]) 879 880 if force_resolve and u.hostname and u.hostname != 'localhost' \ 881 and not re.match(r'^(\d+|\[|:).*', u.hostname): 882 port = u.port if u.port else 443 883 args.extend([ 884 '--resolve', f'{u.hostname}:{port}:{self._server_addr}', 885 ]) 886 if timeout is not None and int(timeout) > 0: 887 args.extend(["--connect-timeout", str(int(timeout))]) 888 args.append(url) 889 return args 890 891 def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult: 892 lines = open(headerfile).readlines() 893 if r is None: 894 r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[]) 895 896 response = None 897 898 def fin_response(resp): 899 if resp: 900 r.add_response(resp) 901 902 expected = ['status'] 903 for line in lines: 904 line = line.strip() 905 if re.match(r'^$', line): 906 if 'trailer' in expected: 907 # end of trailers 908 fin_response(response) 909 response = None 910 expected = ['status'] 911 elif 'header' in expected: 912 # end of header, another status or trailers might follow 913 expected = ['status', 'trailer'] 914 else: 915 assert False, f"unexpected line: '{line}'" 916 continue 917 if 'status' in expected: 918 # log.debug("reading 1st response line: %s", line) 919 m = re.match(r'^(\S+) (\d+)( .*)?$', line) 920 if m: 921 fin_response(response) 922 response = { 923 "protocol": m.group(1), 924 "status": int(m.group(2)), 925 "description": m.group(3), 926 "header": {}, 927 "trailer": {}, 928 "body": r.outraw 929 } 930 expected = ['header'] 931 continue 932 if 'trailer' in expected: 933 m = re.match(r'^([^:]+):\s*(.*)$', line) 934 if m: 935 response['trailer'][m.group(1).lower()] = m.group(2) 936 continue 937 if 'header' in expected: 938 m = re.match(r'^([^:]+):\s*(.*)$', line) 939 if m: 940 response['header'][m.group(1).lower()] = m.group(2) 941 continue 942 assert False, f"unexpected line: '{line}, expected: {expected}'" 943 944 fin_response(response) 945 return r 946