source: LinkExchange/trunk/linkexchange/db_drivers.py @ 254

Revision 254, 13.9 KB checked in by lostclus, 5 months ago (diff)

Fixed import error.

Line 
1# LinkExchange - Universal link exchange service client
2# Copyright (C) 2009 Konstantin Korikov
3#
4# This library is free software; you can redistribute it and/or
5# modify it under the terms of the GNU Lesser General Public
6# License as published by the Free Software Foundation; either
7# version 2.1 of the License, or (at your option) any later version.
8#
9# This library is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12# Lesser General Public License for more details.
13#
14# You should have received a copy of the GNU Lesser General Public
15# License along with this library; if not, write to the Free Software
16# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17#
18# NOTE: In the context of the Python environment, I interpret "dynamic
19# linking" as importing -- thus the LGPL applies to the contents of
20# the modules, but make no requirements on code importing these
21# modules.
22
23import sys
24import shelve
25import shutil
26import anydbm
27import datetime
28import os
29import os.path
30import select
31import threading
32import glob
33
34class BaseMultiHashDriver(object):
35    """
36    Base multihash driver class. Multihash driver is like dictionary of
37    dictionaries, it stores multiple dictionaries (hashes) accessible by key
38    string. Each hash is read only, to modify use save() method passing new
39    dictionary or sequence of (key, value) to it.
40    """
41
42    def load(self, hashkey):
43        """
44        Loads hash associated with hashkey. If no hash found raises KeyError.
45
46        @param hashkey: hash key string
47        @return: read only dictionary like object
48        """
49        raise KeyError(hashkey)
50
51    def get_mtime(self, hashkey):
52        """
53        Returns hash modification time (last save time). Raises KeyError if no
54        hash found.
55
56        @param hashkey: hash key string
57        @return: datetime.datetime object of last modification
58
59        """
60        raise KeyError(hashkey)
61
62    def save(self, hashkey, newhash, blocking=True):
63        """
64        Creates new hash or overrides existing. New hash initialized by newhash
65        that is dictionary or sequence of (key, value).
66
67        If any concurrent thread/process already updates hash and blocking is
68        True, then call to this method sleeps until other thread/process finish
69        hash updating. If blocking is False and concurrent write access was
70        detected, returns False.
71
72        @param hashkey: hash key string
73        @param newhash: dictionary or sequence of (key, value) to initialize
74                        new hash
75        @keyword blocking: use blocking or unblocking call
76        @return: True if hash was saved, otherwise False
77        """
78        raise KeyError(hashkey)
79
80    def modify(self, hashkey, otherhash, blocking=True):
81        """
82        Updates hash with values in otherhash that is dictionary or sequence of
83        (key, value).
84
85        @param hashkey: hash key string
86        @param otherhash: dictionary or sequence of (key, value) to update
87                          hash from
88        @keyword blocking: use blocking or unblocking call
89        @return: True if hash was modified, otherwise False
90        """
91        raise KeyError(hashkey)
92
93    def delete(self, hashkey, keys, blocking=True):
94        """
95        Removes specified keys.
96
97        @param hashkey: hash key string
98        @param keys: keys sequence
99        @keyword blocking: use blocking or unblocking call
100        @return: True if hash was modified, otherwise False
101        """
102        raise KeyError(hashkey)
103
104class MultiHashInFilesMixin:
105    """
106    Mixin class for multihash drivers that use files to store hash objects.
107    """
108    def __init__(self, filename, max_lock_time=None, no_excl=False,
109            suffix_list=None):
110        """
111        @param filename: file name to use for hash files (string or callable)
112        @param max_lock_time: maximum lock time in seconds or as
113                              datetime.timedelta object
114        @param no_excl: disable usage of O_EXCL flag when locking
115        """
116        self.filename = filename
117        if max_lock_time is None:
118            max_lock_time = datetime.timedelta(seconds = 600)
119        elif type(max_lock_time) in (int, long, float):
120            max_lock_time = datetime.timedelta(seconds = max_lock_time)
121        self.max_lock_time = max_lock_time
122        self.no_excl = no_excl
123        self.suffix_list = suffix_list
124
125    def get_filename(self, hashkey):
126        filename = self.filename
127        if callable(filename):
128            filename = filename(hashkey)
129        elif 'XXX' in filename:
130            filename = filename.replace('XXX', hashkey)
131        else:
132            filename += hashkey
133        return filename
134
135    def get_all_files(self, filename):
136        if not self.suffix_list:
137            return [filename]
138        return [filename + s for s in self.suffix_list]
139
140    def get_new_filename(self, real_filename):
141        basename, ext = os.path.splitext(real_filename)
142        return basename + '.new' + ext
143
144    def get_lock_filename(self, real_filename):
145        return real_filename + '.lock'
146
147    def get_file_mtime(self, hashkey):
148        files = self.get_all_files(self.get_filename(hashkey))
149        try:
150            return datetime.datetime.fromtimestamp(
151                    os.stat(files[0]).st_mtime)
152        except OSError:
153            raise KeyError(hashkey)
154
155    def save_with_locking(self, hashkey, newhash, blocking, do_save):
156        real_filename = self.get_filename(hashkey)
157        lock_filename = self.get_lock_filename(real_filename)
158        new_filename = self.get_new_filename(real_filename)
159        error = None
160        loop_cnt = 0
161
162        while True:
163            loop_cnt += 1
164            if self.no_excl:
165                # if usage of O_EXCL flag is disabled
166                try:
167                    fd = os.open(lock_filename, os.O_RDONLY)
168                except OSError:
169                    fd = os.open(lock_filename, os.O_CREAT)
170                    break
171            else:
172                # locking using O_EXCL flag
173                try:
174                    fd = os.open(lock_filename, os.O_CREAT | os.O_EXCL)
175                except OSError, e:
176                    error = e
177                else:
178                    break
179                try:
180                    fd = os.open(lock_filename, os.O_RDONLY)
181                except OSError:
182                    if loop_cnt >= 3:
183                        raise error
184                    continue
185            try:
186                # check lock time
187                lock_time = datetime.datetime.fromtimestamp(os.fstat(fd).st_ctime)
188                if lock_time + self.max_lock_time <= datetime.datetime.now():
189                    os.unlink(lock_filename)
190                    continue
191                if not blocking:
192                    return False
193                # wait until lock file was removed
194                select.select((), (), (fd,), 5)
195            finally:
196                os.close(fd)
197        try:
198            # save data to newly created file, then just move it to the real
199            # file, so that other precesses/threads can read old data while new
200            # data is not completely written
201            do_save(real_filename, new_filename, newhash)
202            to_move = zip(self.get_all_files(new_filename),
203                    self.get_all_files(real_filename))
204            for src, dest in to_move:
205                shutil.move(src, dest)
206        finally:
207            os.close(fd)
208            os.unlink(lock_filename)
209        return True
210
211class MemMultiHashDriver(BaseMultiHashDriver):
212    """
213    Memory multihash driver.
214
215    >>> drv = MemMultiHashDriver()
216    >>> drv.save('foo', dict(bar = 3))
217    True
218    >>> drv.load('foo')['bar']
219    3
220    """
221    def __init__(self):
222        self.db = {}
223        self.db_mtime = {}
224        self.db_lock = threading.Lock()
225
226    def load(self, hashkey):
227        return self.db[hashkey]
228
229    def get_mtime(self, hashkey):
230        return self.db_mtime[hashkey]
231
232    def save(self, hashkey, newhash, blocking=True):
233        result = self.db_lock.acquire(blocking)
234        if not result:
235            return False
236        try:
237            if isinstance(newhash, dict):
238                newhash = newhash.copy()
239            else:
240                newhash = dict(newhash)
241            self.db[hashkey] = newhash
242            self.db_mtime[hashkey] = datetime.datetime.now()
243        finally:
244            self.db_lock.release()
245        return True
246
247    def modify(self, hashkey, otherhash, blocking=True):
248        result = self.db_lock.acquire(blocking)
249        if not result:
250            return False
251        try:
252            if not isinstance(otherhash, dict):
253                otherhash = dict(otherhash)
254            self.db.setdefault(hashkey, {})
255            self.db[hashkey].update(otherhash)
256            self.db_mtime[hashkey] = datetime.datetime.now()
257        finally:
258            self.db_lock.release()
259        return True
260
261    def delete(self, hashkey, keys, blocking=True):
262        result = self.db_lock.acquire(blocking)
263        if not result:
264            return False
265        try:
266            self.db.setdefault(hashkey, {})
267            for k in keys:
268                self.db[hashkey].pop(k, None)
269            self.db_mtime[hashkey] = datetime.datetime.now()
270        finally:
271            self.db_lock.release()
272        return True
273
274class ShelveMultiHashDriver(MultiHashInFilesMixin, BaseMultiHashDriver):
275    """
276    Multihash driver that use shelve module to store hashes.
277
278    >>> import os
279    >>> import tempfile
280    >>> import glob
281    >>> tmpdir = tempfile.mkdtemp()
282    >>> filename = os.path.join(tmpdir, 'test_XXX')
283    >>> drv = ShelveMultiHashDriver(filename, db_module='dumbdbm')
284    >>> all_files = drv.get_all_files(drv.get_filename('foo'))
285    >>> map(os.path.exists, all_files) == [False] * len(all_files)
286    True
287    >>> drv.save('foo', dict(bar=3))
288    True
289    >>> map(os.path.exists, all_files) == [True] * len(all_files)
290    True
291    >>> drv.load('foo')['bar']
292    3
293    >>> for fn in glob.glob(os.path.join(tmpdir, '*')): os.unlink(fn)
294    >>> os.rmdir(tmpdir)
295    """
296
297    def __init__(self, filename, max_lock_time=None, no_excl=False,
298            db_module=None, suffix_list=None):
299        """
300        @param filename: file name to use for hash files (string or callable)
301        @keyword max_lock_time: maximum lock time in seconds or as
302                                datetime.timedelta object
303        @keyword no_excl: disable usage of O_EXCL flag when locking
304        @keyword db_module: DBM module to use
305        """
306        MultiHashInFilesMixin.__init__(self, filename, max_lock_time, no_excl,
307                suffix_list)
308        if isinstance(db_module, basestring):
309            db_module = __import__(db_module)
310        elif not db_module:
311            for mn in ('gdbm', 'dbm', 'dumbdbm'):
312                try:
313                    db_module = __import__(mn)
314                except ImportError:
315                    pass
316                else:
317                    break
318        self.db_module = db_module
319        if self.db_module and not self.suffix_list:
320            if self.db_module.__name__ == 'dbm':
321                if self.db_module.library == 'Berkeley DB':
322                    self.suffix_list = ['.db']
323                elif self.db_module.library == 'GNU gdbm':
324                    if 'freebsd' in sys.platform:
325                        self.suffix_list = ['.db']
326                    else:
327                        self.suffix_list = ['.dir', '.pag']
328            elif self.db_module.__name__ == 'dumbdbm':
329                self.suffix_list = ['.dat', '.dir']
330
331    def load(self, hashkey):
332        try:
333            return shelve.open(self.get_filename(hashkey), 'r')
334        except anydbm.error:
335            raise KeyError(hashkey)
336
337    def get_mtime(self, hashkey):
338        return self.get_file_mtime(hashkey)
339
340    def _db_init(self, filename):
341        if self.db_module:
342            # if appropriate db module is specified, use it to create empty
343            # database
344            db = self.db_module.open(filename, 'n')
345            if hasattr(db, 'sync'):
346                db.sync()
347            del db
348            db = shelve.open(filename, 'w')
349        else:
350            db = shelve.open(filename, 'n')
351        return db
352
353    def save(self, hashkey, newhash, blocking=True):
354        def do_save(real_filename, new_filename, newhash):
355            if isinstance(newhash, dict):
356                newhash = newhash.items()
357            db = self._db_init(new_filename)
358            for k, v in newhash:
359                db[k] = v
360            db.sync()
361        return self.save_with_locking(hashkey, newhash, blocking, do_save)
362
363    def modify(self, hashkey, otherhash, blocking=True):
364        def do_modify(real_filename, new_filename, newhash):
365            if isinstance(newhash, dict):
366                newhash = newhash.items()
367            try:
368                to_copy = zip(self.get_all_files(real_filename),
369                        self.get_all_files(new_filename))
370                for src, dest in to_copy:
371                    shutil.copy(src, dest)
372            except IOError:
373                db = self._db_init(new_filename)
374            else:
375                db = shelve.open(new_filename, 'w')
376            for k, v in newhash:
377                db[k] = v
378            db.sync()
379        return self.save_with_locking(hashkey, otherhash, blocking, do_modify)
380
381    def delete(self, hashkey, keys, blocking=True):
382        def do_delete(real_filename, new_filename, keys):
383            try:
384                to_copy = zip(self.get_all_files(real_filename),
385                        self.get_all_files(new_filename))
386                for src, dest in to_copy:
387                    shutil.copy(src, dest)
388            except IOError:
389                db = self._db_init(new_filename)
390            else:
391                db = shelve.open(new_filename, 'w')
392            for k in keys:
393                db.pop(k, None)
394            db.sync()
395        return self.save_with_locking(hashkey, keys, blocking, do_delete)
396
397if __name__ == "__main__":
398    import doctest
399    doctest.testmod()
Note: See TracBrowser for help on using the repository browser.