From: David-Sarah Hopwood <david-sarah@jacaranda.org>
Date: Thu, 22 Nov 2012 06:43:49 +0000 (+0000)
Subject: Move BucketWriter and BucketReader to storage/bucket.py.
X-Git-Url: https://git.rkrishnan.org/%5B/frontends/%22file:/?a=commitdiff_plain;h=e9655c6d356a0cf5cf1d0e8040e0e1d468d741e2;p=tahoe-lafs%2Ftahoe-lafs.git

Move BucketWriter and BucketReader to storage/bucket.py.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
---

diff --git a/src/allmydata/storage/backends/disk/immutable.py b/src/allmydata/storage/backends/disk/immutable.py
index 780b04a4..a8b0fc08 100644
--- a/src/allmydata/storage/backends/disk/immutable.py
+++ b/src/allmydata/storage/backends/disk/immutable.py
@@ -1,16 +1,11 @@
 
-import os, struct, time
+import os, struct
 
-from foolscap.api import Referenceable
-
-from zope.interface import implements
-from allmydata.interfaces import RIBucketWriter, RIBucketReader
-from allmydata.util import base32, fileutil, log
+from allmydata.util import fileutil
 from allmydata.util.fileutil import get_used_space
 from allmydata.util.assertutil import precondition
 from allmydata.storage.common import UnknownImmutableContainerVersionError, \
      DataTooLargeError
-from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
 
 
 # Each share file (in storage/shares/$SI/$SHNUM) contains share data that
@@ -116,135 +111,3 @@ class ShareFile:
             f.write(data)
         finally:
             f.close()
-
-
-class BucketWriter(Referenceable):
-    implements(RIBucketWriter)
-
-    def __init__(self, ss, account, storage_index, shnum,
-                 incominghome, finalhome, max_size, canary):
-        self.ss = ss
-        self.incominghome = incominghome
-        self.finalhome = finalhome
-        self._max_size = max_size # don't allow the client to write more than this
-        self._account = account
-        self._storage_index = storage_index
-        self._shnum = shnum
-        self._canary = canary
-        self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
-        self.closed = False
-        self.throw_out_all_data = False
-        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
-        self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
-
-    def allocated_size(self):
-        return self._max_size
-
-    def remote_write(self, offset, data):
-        start = time.time()
-        precondition(not self.closed)
-        if self.throw_out_all_data:
-            return
-        self._sharefile.write_share_data(offset, data)
-        self.ss.add_latency("write", time.time() - start)
-        self.ss.count("write")
-
-    def remote_close(self):
-        precondition(not self.closed)
-        start = time.time()
-
-        fileutil.make_dirs(os.path.dirname(self.finalhome))
-        fileutil.rename(self.incominghome, self.finalhome)
-        try:
-            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
-            # We try to delete the parent (.../ab/abcde) to avoid leaving
-            # these directories lying around forever, but the delete might
-            # fail if we're working on another share for the same storage
-            # index (like ab/abcde/5). The alternative approach would be to
-            # use a hierarchy of objects (PrefixHolder, BucketHolder,
-            # ShareWriter), each of which is responsible for a single
-            # directory on disk, and have them use reference counting of
-            # their children to know when they should do the rmdir. This
-            # approach is simpler, but relies on os.rmdir refusing to delete
-            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
-            os.rmdir(os.path.dirname(self.incominghome))
-            # we also delete the grandparent (prefix) directory, .../ab ,
-            # again to avoid leaving directories lying around. This might
-            # fail if there is another bucket open that shares a prefix (like
-            # ab/abfff).
-            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
-            # we leave the great-grandparent (incoming/) directory in place.
-        except EnvironmentError:
-            # ignore the "can't rmdir because the directory is not empty"
-            # exceptions, those are normal consequences of the
-            # above-mentioned conditions.
-            pass
-        self._sharefile = None
-        self.closed = True
-        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-
-        filelen = get_used_space(self.finalhome)
-        self.ss.bucket_writer_closed(self, filelen)
-        self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
-        self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
-        self.ss.add_latency("close", time.time() - start)
-        self.ss.count("close")
-
-    def _disconnected(self):
-        if not self.closed:
-            self._abort()
-
-    def remote_abort(self):
-        log.msg("storage: aborting sharefile %s" % self.incominghome,
-                facility="tahoe.storage", level=log.UNUSUAL)
-        if not self.closed:
-            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-        self._abort()
-        self.ss.count("abort")
-
-    def _abort(self):
-        if self.closed:
-            return
-
-        os.remove(self.incominghome)
-        # if we were the last share to be moved, remove the incoming/
-        # directory that was our parent
-        parentdir = os.path.split(self.incominghome)[0]
-        if not os.listdir(parentdir):
-            os.rmdir(parentdir)
-        self._sharefile = None
-        self._account.remove_share_and_leases(self._storage_index, self._shnum)
-
-        # We are now considered closed for further writing. We must tell
-        # the storage server about this so that it stops expecting us to
-        # use the space it allocated for us earlier.
-        self.closed = True
-        self.ss.bucket_writer_closed(self, 0)
-
-
-class BucketReader(Referenceable):
-    implements(RIBucketReader)
-
-    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
-        self.ss = ss
-        self._share_file = ShareFile(sharefname)
-        self.storage_index = storage_index
-        self.shnum = shnum
-
-    def __repr__(self):
-        return "<%s %s %s>" % (self.__class__.__name__,
-                               base32.b2a_l(self.storage_index[:8], 60),
-                               self.shnum)
-
-    def remote_read(self, offset, length):
-        start = time.time()
-        data = self._share_file.read_share_data(offset, length)
-        self.ss.add_latency("read", time.time() - start)
-        self.ss.count("read")
-        return data
-
-    def remote_advise_corrupt_share(self, reason):
-        return self.ss.client_advise_corrupt_share("immutable",
-                                                   self.storage_index,
-                                                   self.shnum,
-                                                   reason)
diff --git a/src/allmydata/storage/bucket.py b/src/allmydata/storage/bucket.py
new file mode 100644
index 00000000..1cf99cbf
--- /dev/null
+++ b/src/allmydata/storage/bucket.py
@@ -0,0 +1,146 @@
+
+import os, time
+
+from foolscap.api import Referenceable
+
+from zope.interface import implements
+from allmydata.interfaces import RIBucketWriter, RIBucketReader
+
+from allmydata.util import base32, fileutil, log
+from allmydata.util.fileutil import get_used_space
+from allmydata.util.assertutil import precondition
+from allmydata.storage.backends.disk.immutable import ShareFile
+from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE
+
+
+
+class BucketWriter(Referenceable):
+    implements(RIBucketWriter)
+
+    def __init__(self, ss, account, storage_index, shnum,
+                 incominghome, finalhome, max_size, canary):
+        self.ss = ss
+        self.incominghome = incominghome
+        self.finalhome = finalhome
+        self._max_size = max_size # don't allow the client to write more than this
+        self._account = account
+        self._storage_index = storage_index
+        self._shnum = shnum
+        self._canary = canary
+        self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
+        self.closed = False
+        self.throw_out_all_data = False
+        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
+        self._account.add_share(self._storage_index, self._shnum, max_size, SHARETYPE_IMMUTABLE)
+
+    def allocated_size(self):
+        return self._max_size
+
+    def remote_write(self, offset, data):
+        start = time.time()
+        precondition(not self.closed)
+        if self.throw_out_all_data:
+            return
+        self._sharefile.write_share_data(offset, data)
+        self.ss.add_latency("write", time.time() - start)
+        self.ss.count("write")
+
+    def remote_close(self):
+        precondition(not self.closed)
+        start = time.time()
+
+        fileutil.make_dirs(os.path.dirname(self.finalhome))
+        fileutil.rename(self.incominghome, self.finalhome)
+        try:
+            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
+            # We try to delete the parent (.../ab/abcde) to avoid leaving
+            # these directories lying around forever, but the delete might
+            # fail if we're working on another share for the same storage
+            # index (like ab/abcde/5). The alternative approach would be to
+            # use a hierarchy of objects (PrefixHolder, BucketHolder,
+            # ShareWriter), each of which is responsible for a single
+            # directory on disk, and have them use reference counting of
+            # their children to know when they should do the rmdir. This
+            # approach is simpler, but relies on os.rmdir refusing to delete
+            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
+            os.rmdir(os.path.dirname(self.incominghome))
+            # we also delete the grandparent (prefix) directory, .../ab ,
+            # again to avoid leaving directories lying around. This might
+            # fail if there is another bucket open that shares a prefix (like
+            # ab/abfff).
+            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
+            # we leave the great-grandparent (incoming/) directory in place.
+        except EnvironmentError:
+            # ignore the "can't rmdir because the directory is not empty"
+            # exceptions, those are normal consequences of the
+            # above-mentioned conditions.
+            pass
+        self._sharefile = None
+        self.closed = True
+        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+
+        filelen = get_used_space(self.finalhome)
+        self.ss.bucket_writer_closed(self, filelen)
+        self._account.add_or_renew_default_lease(self._storage_index, self._shnum)
+        self._account.mark_share_as_stable(self._storage_index, self._shnum, filelen)
+        self.ss.add_latency("close", time.time() - start)
+        self.ss.count("close")
+
+    def _disconnected(self):
+        if not self.closed:
+            self._abort()
+
+    def remote_abort(self):
+        log.msg("storage: aborting sharefile %s" % self.incominghome,
+                facility="tahoe.storage", level=log.UNUSUAL)
+        if not self.closed:
+            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+        self._abort()
+        self.ss.count("abort")
+
+    def _abort(self):
+        if self.closed:
+            return
+
+        os.remove(self.incominghome)
+        # if we were the last share to be moved, remove the incoming/
+        # directory that was our parent
+        parentdir = os.path.split(self.incominghome)[0]
+        if not os.listdir(parentdir):
+            os.rmdir(parentdir)
+        self._sharefile = None
+        self._account.remove_share_and_leases(self._storage_index, self._shnum)
+
+        # We are now considered closed for further writing. We must tell
+        # the storage server about this so that it stops expecting us to
+        # use the space it allocated for us earlier.
+        self.closed = True
+        self.ss.bucket_writer_closed(self, 0)
+
+
+class BucketReader(Referenceable):
+    implements(RIBucketReader)
+
+    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
+        self.ss = ss
+        self._share_file = ShareFile(sharefname)
+        self.storage_index = storage_index
+        self.shnum = shnum
+
+    def __repr__(self):
+        return "<%s %s %s>" % (self.__class__.__name__,
+                               base32.b2a_l(self.storage_index[:8], 60),
+                               self.shnum)
+
+    def remote_read(self, offset, length):
+        start = time.time()
+        data = self._share_file.read_share_data(offset, length)
+        self.ss.add_latency("read", time.time() - start)
+        self.ss.count("read")
+        return data
+
+    def remote_advise_corrupt_share(self, reason):
+        return self.ss.client_advise_corrupt_share("immutable",
+                                                   self.storage_index,
+                                                   self.shnum,
+                                                   reason)
diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py
index 2403e11b..91e5347c 100644
--- a/src/allmydata/storage/server.py
+++ b/src/allmydata/storage/server.py
@@ -13,7 +13,8 @@ _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
 from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \
      create_mutable_sharefile
 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-from allmydata.storage.backends.disk.immutable import ShareFile, BucketWriter, BucketReader
+from allmydata.storage.backends.disk.immutable import ShareFile
+from allmydata.storage.bucket import BucketWriter, BucketReader
 from allmydata.storage.crawler import BucketCountingCrawler
 from allmydata.storage.accountant import Accountant
 from allmydata.storage.expiration import ExpirationPolicy
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index b7a6fcdd..bbb6472a 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -13,7 +13,8 @@ from allmydata import interfaces
 from allmydata.util import fileutil, hashutil, base32, time_format
 from allmydata.storage.server import StorageServer
 from allmydata.storage.backends.disk.mutable import MutableShareFile
-from allmydata.storage.backends.disk.immutable import BucketWriter, BucketReader, ShareFile
+from allmydata.storage.backends.disk.immutable import ShareFile
+from allmydata.storage.bucket import BucketWriter, BucketReader
 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
 from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE