xref: /curl/tests/http/testenv/vsftpd.py (revision 0f7ba5c5)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#***************************************************************************
4#                                  _   _ ____  _
5#  Project                     ___| | | |  _ \| |
6#                             / __| | | | |_) | |
7#                            | (__| |_| |  _ <| |___
8#                             \___|\___/|_| \_\_____|
9#
10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
11#
12# This software is licensed as described in the file COPYING, which
13# you should have received as part of this distribution. The terms
14# are also available at https://curl.se/docs/copyright.html.
15#
16# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17# copies of the Software, and permit persons to whom the Software is
18# furnished to do so, under the terms of the COPYING file.
19#
20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21# KIND, either express or implied.
22#
23# SPDX-License-Identifier: curl
24#
25###########################################################################
26#
27import logging
28import os
29import subprocess
30import time
31
32from datetime import datetime, timedelta
33
34from .curl import CurlClient
35from .env import Env
36
37
38log = logging.getLogger(__name__)
39
40
41class VsFTPD:
42
43    def __init__(self, env: Env, with_ssl=False):
44        self.env = env
45        self._cmd = env.vsftpd
46        self._scheme = 'ftp'
47        self._with_ssl = with_ssl
48        if self._with_ssl:
49            self._port = self.env.ftps_port
50            name = 'vsftpds'
51        else:
52            self._port = self.env.ftp_port
53            name = 'vsftpd'
54        self._vsftpd_dir = os.path.join(env.gen_dir, name)
55        self._run_dir = os.path.join(self._vsftpd_dir, 'run')
56        self._docs_dir = os.path.join(self._vsftpd_dir, 'docs')
57        self._tmp_dir = os.path.join(self._vsftpd_dir, 'tmp')
58        self._conf_file = os.path.join(self._vsftpd_dir, 'test.conf')
59        self._pid_file = os.path.join(self._vsftpd_dir, 'vsftpd.pid')
60        self._error_log = os.path.join(self._vsftpd_dir, 'vsftpd.log')
61        self._process = None
62
63        self.clear_logs()
64
65    @property
66    def domain(self):
67        return self.env.ftp_domain
68
69    @property
70    def docs_dir(self):
71        return self._docs_dir
72
73    @property
74    def port(self) -> int:
75        return self._port
76
77    def clear_logs(self):
78        self._rmf(self._error_log)
79
80    def exists(self):
81        return os.path.exists(self._cmd)
82
83    def is_running(self):
84        if self._process:
85            self._process.poll()
86            return self._process.returncode is None
87        return False
88
89    def start_if_needed(self):
90        if not self.is_running():
91            return self.start()
92        return True
93
94    def stop_if_running(self):
95        if self.is_running():
96            return self.stop()
97        return True
98
99    def stop(self, wait_dead=True):
100        self._mkpath(self._tmp_dir)
101        if self._process:
102            self._process.terminate()
103            self._process.wait(timeout=2)
104            self._process = None
105            return not wait_dead or self.wait_dead(timeout=timedelta(seconds=5))
106        return True
107
108    def restart(self):
109        self.stop()
110        return self.start()
111
112    def start(self, wait_live=True):
113        self._mkpath(self._tmp_dir)
114        if self._process:
115            self.stop()
116        self._write_config()
117        args = [
118            self._cmd,
119            f'{self._conf_file}',
120        ]
121        procerr = open(self._error_log, 'a')
122        self._process = subprocess.Popen(args=args, stderr=procerr)
123        if self._process.returncode is not None:
124            return False
125        return not wait_live or self.wait_live(timeout=timedelta(seconds=5))
126
127    def wait_dead(self, timeout: timedelta):
128        curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
129        try_until = datetime.now() + timeout
130        while datetime.now() < try_until:
131            check_url = f'{self._scheme}://{self.domain}:{self.port}/'
132            r = curl.ftp_get(urls=[check_url], extra_args=['-v'])
133            if r.exit_code != 0:
134                return True
135            log.debug(f'waiting for vsftpd to stop responding: {r}')
136            time.sleep(.1)
137        log.debug(f"Server still responding after {timeout}")
138        return False
139
140    def wait_live(self, timeout: timedelta):
141        curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
142        try_until = datetime.now() + timeout
143        while datetime.now() < try_until:
144            check_url = f'{self._scheme}://{self.domain}:{self.port}/'
145            r = curl.ftp_get(urls=[check_url], extra_args=[
146                '--trace', 'curl-start.trace', '--trace-time'
147            ])
148            if r.exit_code == 0:
149                return True
150            log.debug(f'waiting for vsftpd to become responsive: {r}')
151            time.sleep(.1)
152        log.error(f"Server still not responding after {timeout}")
153        return False
154
155    def _rmf(self, path):
156        if os.path.exists(path):
157            return os.remove(path)
158
159    def _mkpath(self, path):
160        if not os.path.exists(path):
161            return os.makedirs(path)
162
163    def _write_config(self):
164        self._mkpath(self._docs_dir)
165        self._mkpath(self._tmp_dir)
166        conf = [  # base server config
167            'listen=YES',
168            'run_as_launching_user=YES',
169            '#listen_address=127.0.0.1',
170            f'listen_port={self.port}',
171            'local_enable=NO',
172            'anonymous_enable=YES',
173            f'anon_root={self._docs_dir}',
174            'dirmessage_enable=YES',
175            'write_enable=YES',
176            'anon_upload_enable=YES',
177            'log_ftp_protocol=YES',
178            'xferlog_enable=YES',
179            'xferlog_std_format=NO',
180            f'vsftpd_log_file={self._error_log}',
181            '\n',
182        ]
183        if self._with_ssl:
184            creds = self.env.get_credentials(self.domain)
185            assert creds  # convince pytype this isn't None
186            conf.extend([
187                'ssl_enable=YES',
188                'debug_ssl=YES',
189                'allow_anon_ssl=YES',
190                f'rsa_cert_file={creds.cert_file}',
191                f'rsa_private_key_file={creds.pkey_file}',
192                # require_ssl_reuse=YES means ctrl and data connection need to use the same session
193                'require_ssl_reuse=NO',
194            ])
195
196        with open(self._conf_file, 'w') as fd:
197            fd.write("\n".join(conf))
198