source: LinkExchange/trunk/linkexchange/tests.py @ 222

Revision 222, 9.1 KB checked in by lostclus, 13 months ago (diff)

New parameter no_query_string was added to Sape like clients.

Line 
1# LinkExchange - Universal link exchange service client
2# Copyright (C) 2009-2011 Konstantin Korikov
3#
4# This library is free software; you can redistribute it and/or
5# modify it under the terms of the GNU Lesser General Public
6# License as published by the Free Software Foundation; either
7# version 2.1 of the License, or (at your option) any later version.
8#
9# This library is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12# Lesser General Public License for more details.
13#
14# You should have received a copy of the GNU Lesser General Public
15# License along with this library; if not, write to the Free Software
16# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17#
18# NOTE: In the context of the Python environment, I interpret "dynamic
19# linking" as importing -- thus the LGPL applies to the contents of
20# the modules, but make no requirements on code importing these
21# modules.
22
23import sys
24import os
25import tempfile
26import urllib
27import urlparse
28import datetime
29import re
30import signal
31import socket
32import time
33try:
34    import subprocess
35except ImportError:
36    subprocess = None
37
38from linkexchange.clients import PageRequest
39from linkexchange.utils import find_links
40
41class MultiHashDriverTestMixin:
42    """
43    Implements tests for multihash driver API methods.
44    """
45
46    with_blocking = True
47
48    def test_load(self):
49        self.assertRaises(KeyError, self.db.load, 'notexists')
50        self.db.save('testkey', [('k1', 'v1'), ('k2', 'v2')])
51        hash = self.db.load('testkey')
52        self.assertEqual(hash['k1'], 'v1')
53        self.assertEqual(hash['k2'], 'v2')
54
55    def test_get_mtime(self):
56        def round_dt(dt):
57            return dt.replace(microsecond=0)
58        self.assertRaises(KeyError, self.db.get_mtime, 'notexists')
59        t1 = round_dt(datetime.datetime.now())
60        self.db.save('testkey',
61                [('k%d' % i, 'v%d' % i) for i in range(100)])
62        t2 = round_dt(datetime.datetime.now())
63        mt = round_dt(self.db.get_mtime('testkey'))
64        self.assertEqual(t1 <= mt <= t2, True)
65
66    def test_modify(self):
67        self.db.save('testkey', [('k1', 'v1'), ('k2', 'v2')])
68        self.db.modify('testkey', [('k2', 'v2x')])
69        hash = self.db.load('testkey')
70        self.assertEqual(hash['k1'], 'v1')
71        self.assertEqual(hash['k2'], 'v2x')
72
73    def test_delete(self):
74        self.db.save('testkey', [('k1', 'v1'), ('k2', 'v2')])
75        self.db.delete('testkey', ['k2'])
76        hash = self.db.load('testkey')
77        self.assertEqual(len(hash), 1)
78
79    def test_blocking(self):
80        def test_generator():
81          for i in range(100):
82            if i == 5:
83              result = self.db.save('testkey', dict(bar=3),
84                      blocking=False)
85              if self.with_blocking:
86                  self.assertEqual(result, False)
87              else:
88                  self.assertEqual(result, True)
89            yield ('bar%d' % i, i)
90        result = self.db.save('testkey', test_generator())
91        self.assertEqual(result, True)
92        hash = self.db.load('testkey')
93        self.assertEqual(hash['bar55'], 55)
94
95class SimpleFileTestServer(object):
96    """
97    Simple test server that stores data in file on file system and sets url
98    attribute that points to it.
99    """
100
101    filename = None
102    raw_data = ''
103
104    def __init__(self, filename=None, raw_data=None):
105        if filename:
106            self.filename = filename
107        if self.filename:
108            fo = open(self.filename, 'w')
109        else:
110            fd, self.filename = tempfile.mkstemp()
111            fo = os.fdopen(fd, 'w')
112        if raw_data is not None:
113            self.raw_data = raw_data
114        fo.write(self.raw_data)
115        path = urllib.pathname2url(os.path.realpath(self.filename))
116        self.url = urlparse.urlunsplit(('file', '', path, '', ''))
117        self._unlink = os.unlink
118
119    def __del__(self):
120        try:
121            self._unlink(self.filename)
122        except OSError:
123            pass
124
125class ClientBaseTestMixin:
126    host = 'example.com'
127    bot_ip = '123.45.67.89'
128    cookies = None
129
130    def create_servers(cls):
131        raise NotImplementedError()
132    create_servers = classmethod(create_servers)
133
134    def setUpClass(cls):
135        cls.servers = cls.create_servers()
136    setUpClass = classmethod(setUpClass)
137
138    def tearDownClass(cls):
139        del cls.servers
140    tearDownClass = classmethod(tearDownClass)
141
142    def new_client(self, **kw):
143        raise NotImplementedError()
144
145    def new_request(self, **kw):
146        kw.setdefault('host', self.host)
147        kw.setdefault('remote_addr', self.bot_ip)
148        kw.setdefault('cookies', self.cookies)
149        return PageRequest(**kw)
150
151class ClientLinksTestMixin(ClientBaseTestMixin):
152    page_link_map = {
153            '/':[
154                ('http://example1.com/', 'example text 1'),
155                ]}
156    check_code = ''
157    html_links_link_pattern = (r'<a[^>]+?href="(?P<href>[^"]+)"[^>]*>'
158            '(?P<anchor>[^<>]+?)</a>')
159    html_links_delim_pattern = None
160
161    def test_links_get_raw_links(self):
162        client = self.new_client()
163        for test_uri, test_links in self.page_link_map.items():
164            request = self.new_request(uri=test_uri)
165            raw_links = client.get_raw_links(request)
166            self.assertEqual(len(raw_links), len(test_links))
167            for i in range(len(test_links)):
168                test_href, test_anchor = test_links[i]
169                attrs, anchor = find_links(raw_links[i])[0]
170                self.assertEqual(attrs.get('href', ''), test_href)
171                self.assertEqual(anchor.strip(), test_anchor.strip())
172
173    def test_links_get_html_links(self):
174        client = self.new_client()
175        link_re = re.compile(self.html_links_link_pattern)
176        if self.html_links_delim_pattern:
177            delim_re = re.compile(self.html_links_delim_pattern)
178        else:
179            delim_re = None
180
181        for test_uri, test_links in self.page_link_map.items():
182            request = self.new_request(uri=test_uri)
183            html = client.get_html_links(request)
184            links = list(link_re.finditer(html))
185            self.assertEqual(len(links), len(test_links))
186            for i in range(len(links)):
187                test_href, test_anchor = test_links[i]
188                self.assertEqual(links[i].group('href'), test_href)
189                self.assertEqual(links[i].group('anchor'), test_anchor)
190                if i > 0 and delim_re:
191                    delim_found = delim_re.search(html,
192                            links[i-1].end(), links[i].start()) is not None
193                    self.assertEqual(delim_found, True)
194
195    def test_links_check_code(self):
196        client = self.new_client()
197        request = self.new_request(uri='/not_exists')
198        self.assertEqual(
199                self.check_code in client.get_raw_links(request)[0], True)
200
201    def test_links_broken_server(self):
202        client = self.new_client(broken_server=True)
203        for test_uri, test_links in self.page_link_map.items():
204            request = self.new_request(uri=test_uri)
205            raw_links = client.get_raw_links(request)
206            self.assertEqual(raw_links, [])
207
208class ClientContentFilterTestMixin(ClientBaseTestMixin):
209    page_content_map = {
210            '/': [
211                ('Some text content.', 'Some text content filtered.'),
212                ],
213            }
214
215    def test_content_filter(self):
216        client = self.new_client()
217        for test_uri, test_content_list in self.page_content_map.items():
218            request = self.new_request(uri=test_uri)
219            for test_content in test_content_list:
220                filtered = client.content_filter(
221                        request, test_content[0])
222                self.assertEqual(filtered, test_content[1])
223
224def _wait_socket(addr, timeout=10):
225    t = time.time()
226    while True:
227        try:
228            s = socket.socket()
229            s.connect(addr)
230        except socket.error:
231            pass
232        else:
233            s.close()
234            return True
235        if time.time() - t > timeout:
236            return False
237        time.sleep(1)
238
239class _WebAppSubProcess(object):
240    def __init__(self, args, addr):
241        kw = {}
242        if sys.platform != 'win32':
243            kw['preexec_fn'] = os.setpgrp
244        self._proc = subprocess.Popen(args, **kw)
245        _wait_socket(addr)
246
247    def terminate(self):
248        if sys.platform != 'win32':
249            os.killpg(self._proc.pid, signal.SIGTERM)
250        else:
251            os.system("taskkill /F /T /PID %d" % self._proc.pid)
252
253    def wait(self):
254        return self._proc.wait()
255
256class _WebAppUnixProcess(object):
257    def __init__(self, args, addr):
258        self.pid = os.fork()
259        if not self.pid:
260            os.setpgrp()
261            os.execvp(args[0], args)
262            sys.exit(1)
263        _wait_socket(addr)
264
265    def terminate(self):
266        os.killpg(self.pid, signal.SIGTERM)
267
268    def wait(self):
269        pid, status = os.waitpid(self.pid, 0)
270        return status << 8
271
272if subprocess is not None:
273    WebAppProcess = _WebAppSubProcess
274else:
275    WebAppProcess = _WebAppUnixProcess
Note: See TracBrowser for help on using the repository browser.