From: Daira Hopwood Date: Thu, 22 Nov 2012 06:43:49 +0000 (+0000) Subject: Move BucketWriter and BucketReader to storage/bucket.py. X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/css/about.html?a=commitdiff_plain;h=8bdee42e9d5c41283caed11d88857820ecdb7861;p=tahoe-lafs%2Ftahoe-lafs.git Move BucketWriter and BucketReader to storage/bucket.py. Signed-off-by: David-Sarah Hopwood --- diff --git a/src/allmydata/storage/backends/disk/immutable.py b/src/allmydata/storage/backends/disk/immutable.py index 780b04a4..a8b0fc08 100644 --- a/src/allmydata/storage/backends/disk/immutable.py +++ b/src/allmydata/storage/backends/disk/immutable.py @@ -1,16 +1,11 @@ -import os, struct, time +import os, struct -from foolscap.api import Referenceable - -from zope.interface import implements -from allmydata.interfaces import RIBucketWriter, RIBucketReader -from allmydata.util import base32, fileutil, log +from allmydata.util import fileutil from allmydata.util.fileutil import get_used_space from allmydata.util.assertutil import precondition from allmydata.storage.common import UnknownImmutableContainerVersionError, \ DataTooLargeError -from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE # Each share file (in storage/shares/$SI/$SHNUM) contains share data that @@ -116,135 +111,3 @@ class ShareFile: f.write(data) finally: f.close() - - -class BucketWriter(Referenceable): - implements(RIBucketWriter) - - def __init__(self, ss, account, storage_index, shnum, - incominghome, finalhome, max_size, canary): - self.ss = ss - self.incominghome = incominghome - self.finalhome = finalhome - self._max_size = max_size # don't allow the client to write more than this - self._account = account - self._storage_index = storage_index - self._shnum = shnum - self._canary = canary - self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) - self.closed = False - self.throw_out_all_data = False - self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) - self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE) - - def allocated_size(self): - return self._max_size - - def remote_write(self, offset, data): - start = time.time() - precondition(not self.closed) - if self.throw_out_all_data: - return - self._sharefile.write_share_data(offset, data) - self.ss.add_latency("write", time.time() - start) - self.ss.count("write") - - def remote_close(self): - precondition(not self.closed) - start = time.time() - - fileutil.make_dirs(os.path.dirname(self.finalhome)) - fileutil.rename(self.incominghome, self.finalhome) - try: - # self.incominghome is like storage/shares/incoming/ab/abcde/4 . - # We try to delete the parent (.../ab/abcde) to avoid leaving - # these directories lying around forever, but the delete might - # fail if we're working on another share for the same storage - # index (like ab/abcde/5). The alternative approach would be to - # use a hierarchy of objects (PrefixHolder, BucketHolder, - # ShareWriter), each of which is responsible for a single - # directory on disk, and have them use reference counting of - # their children to know when they should do the rmdir. This - # approach is simpler, but relies on os.rmdir refusing to delete - # a non-empty directory. Do *not* use fileutil.rm_dir() here! - os.rmdir(os.path.dirname(self.incominghome)) - # we also delete the grandparent (prefix) directory, .../ab , - # again to avoid leaving directories lying around. This might - # fail if there is another bucket open that shares a prefix (like - # ab/abfff). - os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) - # we leave the great-grandparent (incoming/) directory in place. - except EnvironmentError: - # ignore the "can't rmdir because the directory is not empty" - # exceptions, those are normal consequences of the - # above-mentioned conditions. - pass - self._sharefile = None - self.closed = True - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - - filelen = get_used_space(self.finalhome) - self.ss.bucket_writer_closed(self, filelen) - self._account.add_or_renew_default_lease(self._storage_index, self._shnum) - self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen) - self.ss.add_latency("close", time.time() - start) - self.ss.count("close") - - def _disconnected(self): - if not self.closed: - self._abort() - - def remote_abort(self): - log.msg("storage: aborting sharefile %s" % self.incominghome, - facility="tahoe.storage", level=log.UNUSUAL) - if not self.closed: - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) - self._abort() - self.ss.count("abort") - - def _abort(self): - if self.closed: - return - - os.remove(self.incominghome) - # if we were the last share to be moved, remove the incoming/ - # directory that was our parent - parentdir = os.path.split(self.incominghome)[0] - if not os.listdir(parentdir): - os.rmdir(parentdir) - self._sharefile = None - self._account.remove_share_and_leases(self._storage_index, self._shnum) - - # We are now considered closed for further writing. We must tell - # the storage server about this so that it stops expecting us to - # use the space it allocated for us earlier. - self.closed = True - self.ss.bucket_writer_closed(self, 0) - - -class BucketReader(Referenceable): - implements(RIBucketReader) - - def __init__(self, ss, sharefname, storage_index=None, shnum=None): - self.ss = ss - self._share_file = ShareFile(sharefname) - self.storage_index = storage_index - self.shnum = shnum - - def __repr__(self): - return "<%s %s %s>" % (self.__class__.__name__, - base32.b2a_l(self.storage_index[:8], 60), - self.shnum) - - def remote_read(self, offset, length): - start = time.time() - data = self._share_file.read_share_data(offset, length) - self.ss.add_latency("read", time.time() - start) - self.ss.count("read") - return data - - def remote_advise_corrupt_share(self, reason): - return self.ss.client_advise_corrupt_share("immutable", - self.storage_index, - self.shnum, - reason) diff --git a/src/allmydata/storage/bucket.py b/src/allmydata/storage/bucket.py new file mode 100644 index 00000000..1cf99cbf --- /dev/null +++ b/src/allmydata/storage/bucket.py @@ -0,0 +1,146 @@ + +import os, time + +from foolscap.api import Referenceable + +from zope.interface import implements +from allmydata.interfaces import RIBucketWriter, RIBucketReader + +from allmydata.util import base32, fileutil, log +from allmydata.util.fileutil import get_used_space +from allmydata.util.assertutil import precondition +from allmydata.storage.backends.disk.immutable import ShareFile +from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE + + + +class BucketWriter(Referenceable): + implements(RIBucketWriter) + + def __init__(self, ss, account, storage_index, shnum, + incominghome, finalhome, max_size, canary): + self.ss = ss + self.incominghome = incominghome + self.finalhome = finalhome + self._max_size = max_size # don't allow the client to write more than this + self._account = account + self._storage_index = storage_index + self._shnum = shnum + self._canary = canary + self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) + self.closed = False + self.throw_out_all_data = False + self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) + self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE) + + def allocated_size(self): + return self._max_size + + def remote_write(self, offset, data): + start = time.time() + precondition(not self.closed) + if self.throw_out_all_data: + return + self._sharefile.write_share_data(offset, data) + self.ss.add_latency("write", time.time() - start) + self.ss.count("write") + + def remote_close(self): + precondition(not self.closed) + start = time.time() + + fileutil.make_dirs(os.path.dirname(self.finalhome)) + fileutil.rename(self.incominghome, self.finalhome) + try: + # self.incominghome is like storage/shares/incoming/ab/abcde/4 . + # We try to delete the parent (.../ab/abcde) to avoid leaving + # these directories lying around forever, but the delete might + # fail if we're working on another share for the same storage + # index (like ab/abcde/5). The alternative approach would be to + # use a hierarchy of objects (PrefixHolder, BucketHolder, + # ShareWriter), each of which is responsible for a single + # directory on disk, and have them use reference counting of + # their children to know when they should do the rmdir. This + # approach is simpler, but relies on os.rmdir refusing to delete + # a non-empty directory. Do *not* use fileutil.rm_dir() here! + os.rmdir(os.path.dirname(self.incominghome)) + # we also delete the grandparent (prefix) directory, .../ab , + # again to avoid leaving directories lying around. This might + # fail if there is another bucket open that shares a prefix (like + # ab/abfff). + os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) + # we leave the great-grandparent (incoming/) directory in place. + except EnvironmentError: + # ignore the "can't rmdir because the directory is not empty" + # exceptions, those are normal consequences of the + # above-mentioned conditions. + pass + self._sharefile = None + self.closed = True + self._canary.dontNotifyOnDisconnect(self._disconnect_marker) + + filelen = get_used_space(self.finalhome) + self.ss.bucket_writer_closed(self, filelen) + self._account.add_or_renew_default_lease(self._storage_index, self._shnum) + self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen) + self.ss.add_latency("close", time.time() - start) + self.ss.count("close") + + def _disconnected(self): + if not self.closed: + self._abort() + + def remote_abort(self): + log.msg("storage: aborting sharefile %s" % self.incominghome, + facility="tahoe.storage", level=log.UNUSUAL) + if not self.closed: + self._canary.dontNotifyOnDisconnect(self._disconnect_marker) + self._abort() + self.ss.count("abort") + + def _abort(self): + if self.closed: + return + + os.remove(self.incominghome) + # if we were the last share to be moved, remove the incoming/ + # directory that was our parent + parentdir = os.path.split(self.incominghome)[0] + if not os.listdir(parentdir): + os.rmdir(parentdir) + self._sharefile = None + self._account.remove_share_and_leases(self._storage_index, self._shnum) + + # We are now considered closed for further writing. We must tell + # the storage server about this so that it stops expecting us to + # use the space it allocated for us earlier. + self.closed = True + self.ss.bucket_writer_closed(self, 0) + + +class BucketReader(Referenceable): + implements(RIBucketReader) + + def __init__(self, ss, sharefname, storage_index=None, shnum=None): + self.ss = ss + self._share_file = ShareFile(sharefname) + self.storage_index = storage_index + self.shnum = shnum + + def __repr__(self): + return "<%s %s %s>" % (self.__class__.__name__, + base32.b2a_l(self.storage_index[:8], 60), + self.shnum) + + def remote_read(self, offset, length): + start = time.time() + data = self._share_file.read_share_data(offset, length) + self.ss.add_latency("read", time.time() - start) + self.ss.count("read") + return data + + def remote_advise_corrupt_share(self, reason): + return self.ss.client_advise_corrupt_share("immutable", + self.storage_index, + self.shnum, + reason) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 2403e11b..91e5347c 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -13,7 +13,8 @@ _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE -from allmydata.storage.backends.disk.immutable import ShareFile, BucketWriter, BucketReader +from allmydata.storage.backends.disk.immutable import ShareFile +from allmydata.storage.bucket import BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.accountant import Accountant from allmydata.storage.expiration import ExpirationPolicy diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index ab515fc2..74122936 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -13,7 +13,8 @@ from allmydata import interfaces from allmydata.util import fileutil, hashutil, base32, time_format from allmydata.storage.server import StorageServer from allmydata.storage.backends.disk.mutable import MutableShareFile -from allmydata.storage.backends.disk.immutable import BucketWriter, BucketReader, ShareFile +from allmydata.storage.backends.disk.immutable import ShareFile +from allmydata.storage.bucket import BucketWriter, BucketReader from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE