storage: add add_lease/update_write_enabler to remote API, revamp lease handling
authorBrian Warner <warner@lothar.com>
Thu, 10 Jul 2008 01:06:55 +0000 (18:06 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 10 Jul 2008 01:06:55 +0000 (18:06 -0700)
src/allmydata/interfaces.py
src/allmydata/scripts/debug.py
src/allmydata/storage.py
src/allmydata/test/test_storage.py

index 5ef7c38aa68a56e5466335b438478480b9491906..ff6b6b5b5a654ca0303d809ab4478ff64775431a 100644 (file)
@@ -21,6 +21,7 @@ URIExtensionData = StringConstraint(1000)
 Number = IntegerConstraint(8) # 2**(8*8) == 16EiB ~= 18e18 ~= 18 exabytes
 Offset = Number
 ReadSize = int # the 'int' constraint is 2**31 == 2Gib -- large files are processed in not-so-large increments
+WriteEnablerSecret = Hash # used to protect mutable bucket modifications
 LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
 LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
 
@@ -110,11 +111,21 @@ class RIStorageServer(RemoteInterface):
         return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
                        DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
 
+    def add_lease(storage_index=StorageIndex,
+                  renew_secret=LeaseRenewSecret,
+                  cancel_secret=LeaseCancelSecret):
+        """
+        Add a new lease on the given bucket. If the renew_secret matches an
+        existing lease, that lease will be renewed instead.
+        """
+        return None
+
     def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
         """
         Renew the lease on a given bucket. Some networks will use this, some
         will not.
         """
+        return None
 
     def cancel_lease(storage_index=StorageIndex,
                      cancel_secret=LeaseCancelSecret):
@@ -122,6 +133,7 @@ class RIStorageServer(RemoteInterface):
         Cancel the lease on a given bucket. If this was the last lease on the
         bucket, the bucket will be deleted.
         """
+        return None
 
     def get_buckets(storage_index=StorageIndex):
         return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
@@ -136,7 +148,9 @@ class RIStorageServer(RemoteInterface):
         return DictOf(int, ReadData) # shnum -> results
 
     def slot_testv_and_readv_and_writev(storage_index=StorageIndex,
-                                        secrets=TupleOf(Hash, Hash, Hash),
+                                        secrets=TupleOf(WriteEnablerSecret,
+                                                        LeaseRenewSecret,
+                                                        LeaseCancelSecret),
                                         tw_vectors=TestAndWriteVectorsForShares,
                                         r_vector=ReadVector,
                                         ):
@@ -200,8 +214,9 @@ class RIStorageServer(RemoteInterface):
         for each element of the read vector.
 
         If the write_enabler is wrong, this will raise BadWriteEnablerError.
-        To enable share migration, the exception will have the nodeid used
-        for the old write enabler embedded in it, in the following string::
+        To enable share migration (using update_write_enabler), the exception
+        will have the nodeid used for the old write enabler embedded in it,
+        in the following string::
 
          The write enabler was recorded by nodeid '%s'.
 
@@ -211,6 +226,24 @@ class RIStorageServer(RemoteInterface):
         """
         return TupleOf(bool, DictOf(int, ReadData))
 
+    def update_write_enabler(storage_index=StorageIndex,
+                             old_write_enabler=WriteEnablerSecret,
+                             new_write_enabler=WriteEnablerSecret):
+        """
+        Replace the write-enabler on a given bucket. This is used when a
+        share has been moved from one server to another, causing the secret
+        (which is scoped to a given server's nodeid) to become invalid. The
+        client discovers this when it gets a BadWriteEnablerError, and the
+        string body of the exception will contain a message that includes the
+        nodeid that was used for the old secret.
+
+        The client should compute the old write-enabler secret, and send it
+        in conjunction with the new one. The server will then update the
+        share to record the new write-enabler instead of the old one. The
+        client can then retry its writev call.
+        """
+        return None
+
 class IStorageBucketWriter(Interface):
     def put_block(segmentnum=int, data=ShareData):
         """@param data: For most segments, this data will be 'blocksize'
index 19edc77734da163371f4bed666e2c2ec6d7484d0..f930ff94fe682ca6adcbf19b73437d97da6dead2 100644 (file)
@@ -88,10 +88,9 @@ def dump_share(config, out=sys.stdout, err=sys.stderr):
     leases = list(f.iter_leases())
     if leases:
         for i,lease in enumerate(leases):
-            (owner_num, renew_secret, cancel_secret, expiration_time) = lease
-            when = format_expiration_time(expiration_time)
-            print >>out, " Lease #%d: owner=%d, expire in %s" % (i, owner_num,
-                                                                 when)
+            when = format_expiration_time(lease.expiration_time)
+            print >>out, " Lease #%d: owner=%d, expire in %s" \
+                  % (i, lease.owner_num, when)
     else:
         print >>out, " No leases."
 
@@ -137,15 +136,15 @@ def dump_mutable_share(config, out, err):
     print >>out, " container_size: %d" % container_size
     print >>out, " data_length: %d" % data_length
     if leases:
-        for (leasenum, (oid,et,rs,cs,anid)) in leases:
+        for (leasenum, lease) in leases:
             print >>out
             print >>out, " Lease #%d:" % leasenum
-            print >>out, "  ownerid: %d" % oid
-            when = format_expiration_time(et)
+            print >>out, "  ownerid: %d" % lease.owner_num
+            when = format_expiration_time(lease.expiration_time)
             print >>out, "  expires in %s" % when
-            print >>out, "  renew_secret: %s" % base32.b2a(rs)
-            print >>out, "  cancel_secret: %s" % base32.b2a(cs)
-            print >>out, "  secrets are for nodeid: %s" % idlib.nodeid_b2a(anid)
+            print >>out, "  renew_secret: %s" % base32.b2a(lease.renew_secret)
+            print >>out, "  cancel_secret: %s" % base32.b2a(lease.cancel_secret)
+            print >>out, "  secrets are for nodeid: %s" % idlib.nodeid_b2a(lease.nodeid)
     else:
         print >>out, "No leases."
     print >>out
@@ -402,10 +401,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out, err):
         extra_lease_offset = m._read_extra_lease_offset(f)
         container_size = extra_lease_offset - m.DATA_OFFSET
         leases = list(m._enumerate_leases(f))
-        expiration_time = min( [expiration_time
-                                for (leasenum,
-                                     (ownerid, expiration_time, rs, cs, nodeid))
-                                in leases] )
+        expiration_time = min( [lease[1].expiration_time
+                                for lease in leases] )
         expiration = max(0, expiration_time - now)
 
         share_type = "unknown"
@@ -448,9 +445,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out, err):
         length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
         seek += 4
         UEB_data = sf.read_share_data(seek, length)
-        expiration_time = min( [expiration_time
-                                for (ownerid, rs, cs, expiration_time)
-                                in sf.iter_leases()] )
+        expiration_time = min( [lease.expiration_time
+                                for lease in sf.iter_leases()] )
         expiration = max(0, expiration_time - now)
 
         unpacked = uri.unpack_extension_readable(UEB_data)
index b31195e8d76920852cf720e350318bc184773341..513b486f6b59af0a071a2b31f1148386f0188172 100644 (file)
@@ -56,6 +56,45 @@ def storage_index_to_dir(storageindex):
     sia = si_b2a(storageindex)
     return os.path.join(sia[:2], sia)
 
+class LeaseInfo:
+    def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
+                 expiration_time=None, nodeid=None):
+        self.owner_num = owner_num
+        self.renew_secret = renew_secret
+        self.cancel_secret = cancel_secret
+        self.expiration_time = expiration_time
+        if nodeid is not None:
+            assert isinstance(nodeid, str)
+            assert len(nodeid) == 20
+        self.nodeid = nodeid
+
+    def from_immutable_data(self, data):
+        (self.owner_num,
+         self.renew_secret,
+         self.cancel_secret,
+         self.expiration_time) = struct.unpack(">L32s32sL", data)
+        self.nodeid = None
+        return self
+    def to_immutable_data(self):
+        return struct.pack(">L32s32sL",
+                           self.owner_num,
+                           self.renew_secret, self.cancel_secret,
+                           int(self.expiration_time))
+
+    def to_mutable_data(self):
+        return struct.pack(">LL32s32s20s",
+                           self.owner_num,
+                           int(self.expiration_time),
+                           self.renew_secret, self.cancel_secret,
+                           self.nodeid)
+    def from_mutable_data(self, data):
+        (self.owner_num,
+         self.expiration_time,
+         self.renew_secret, self.cancel_secret,
+         self.nodeid) = struct.unpack(">LL32s32s20s", data)
+        return self
+
+
 class ShareFile:
     LEASE_SIZE = struct.calcsize(">L32s32sL")
 
@@ -69,6 +108,9 @@ class ShareFile:
         self._data_offset = 0xc
         self._lease_offset = 0xc + self._size
 
+    def unlink(self):
+        os.unlink(self.home)
+
     def read_share_data(self, offset, length):
         precondition(offset >= 0)
         precondition(offset+length <= self._size)
@@ -88,13 +130,10 @@ class ShareFile:
         f.close()
 
     def _write_lease_record(self, f, lease_number, lease_info):
-        (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
         offset = self._lease_offset + lease_number * self.LEASE_SIZE
         f.seek(offset)
         assert f.tell() == offset
-        f.write(struct.pack(">L32s32sL",
-                            owner_num, renew_secret, cancel_secret,
-                            int(expiration_time)))
+        f.write(lease_info.to_immutable_data())
 
     def _read_num_leases(self, f):
         f.seek(0x08)
@@ -117,7 +156,7 @@ class ShareFile:
         for i in range(num_leases):
             data = f.read(self.LEASE_SIZE)
             if data:
-                yield struct.unpack(">L32s32sL", data)
+                yield LeaseInfo().from_immutable_data(data)
 
     def add_lease(self, lease_info):
         f = open(self.home, 'rb+')
@@ -127,36 +166,39 @@ class ShareFile:
         f.close()
 
     def renew_lease(self, renew_secret, new_expire_time):
-        for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
-            if rs == renew_secret:
+        for i,lease in enumerate(self.iter_leases()):
+            if lease.renew_secret == renew_secret:
                 # yup. See if we need to update the owner time.
-                if new_expire_time > et:
+                if new_expire_time > lease.expiration_time:
                     # yes
-                    new_lease = (on,rs,cs,new_expire_time)
+                    lease.expiration_time = new_expire_time
                     f = open(self.home, 'rb+')
-                    self._write_lease_record(f, i, new_lease)
+                    self._write_lease_record(f, i, lease)
                     f.close()
                 return
         raise IndexError("unable to renew non-existent lease")
 
     def add_or_renew_lease(self, lease_info):
-        owner_num, renew_secret, cancel_secret, expire_time = lease_info
         try:
-            self.renew_lease(renew_secret, expire_time)
+            self.renew_lease(lease_info.renew_secret,
+                             lease_info.expiration_time)
         except IndexError:
             self.add_lease(lease_info)
 
+
     def cancel_lease(self, cancel_secret):
-        """Remove a lease with the given cancel_secret. Return
-        (num_remaining_leases, space_freed). Raise IndexError if there was no
-        lease with the given cancel_secret."""
+        """Remove a lease with the given cancel_secret. If the last lease is
+        cancelled, the file will be removed. Return the number of bytes that
+        were freed (by truncating the list of leases, and possibly by
+        deleting the file. Raise IndexError if there was no lease with the
+        given cancel_secret.
+        """
 
         leases = list(self.iter_leases())
         num_leases = len(leases)
         num_leases_removed = 0
-        for i,lease_info in enumerate(leases[:]):
-            (on,rs,cs,et) = lease_info
-            if cs == cancel_secret:
+        for i,lease in enumerate(leases[:]):
+            if lease.cancel_secret == cancel_secret:
                 leases[i] = None
                 num_leases_removed += 1
         if not num_leases_removed:
@@ -172,7 +214,11 @@ class ShareFile:
             self._write_num_leases(f, len(leases))
             self._truncate_leases(f, len(leases))
             f.close()
-        return len(leases), self.LEASE_SIZE * num_leases_removed
+        space_freed = self.LEASE_SIZE * num_leases_removed
+        if not len(leases):
+            space_freed += os.stat(self.home)[stat.ST_SIZE]
+            self.unlink()
+        return space_freed
 
 
 class BucketWriter(Referenceable):
@@ -365,6 +411,9 @@ class MutableShareFile:
         # extra leases go here, none at creation
         f.close()
 
+    def unlink(self):
+        os.unlink(self.home)
+
     def _read_data_length(self, f):
         f.seek(self.DATA_LENGTH_OFFSET)
         (data_length,) = struct.unpack(">Q", f.read(8))
@@ -457,8 +506,6 @@ class MutableShareFile:
         return
 
     def _write_lease_record(self, f, lease_number, lease_info):
-        (ownerid, expiration_time,
-         renew_secret, cancel_secret, nodeid) = lease_info
         extra_lease_offset = self._read_extra_lease_offset(f)
         num_extra_leases = self._read_num_extra_leases(f)
         if lease_number < 4:
@@ -475,12 +522,10 @@ class MutableShareFile:
                       + (lease_number-4)*self.LEASE_SIZE)
         f.seek(offset)
         assert f.tell() == offset
-        f.write(struct.pack(">LL32s32s20s",
-                            ownerid, int(expiration_time),
-                            renew_secret, cancel_secret, nodeid))
+        f.write(lease_info.to_mutable_data())
 
     def _read_lease_record(self, f, lease_number):
-        # returns a 5-tuple of lease info, or None
+        # returns a LeaseInfo instance, or None
         extra_lease_offset = self._read_extra_lease_offset(f)
         num_extra_leases = self._read_num_extra_leases(f)
         if lease_number < 4:
@@ -494,10 +539,8 @@ class MutableShareFile:
         f.seek(offset)
         assert f.tell() == offset
         data = f.read(self.LEASE_SIZE)
-        lease_info = struct.unpack(">LL32s32s20s", data)
-        (ownerid, expiration_time,
-         renew_secret, cancel_secret, nodeid) = lease_info
-        if ownerid == 0:
+        lease_info = LeaseInfo().from_mutable_data(data)
+        if lease_info.owner_num == 0:
             return None
         return lease_info
 
@@ -546,16 +589,16 @@ class MutableShareFile:
     def renew_lease(self, renew_secret, new_expire_time):
         accepting_nodeids = set()
         f = open(self.home, 'rb+')
-        for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
-            if rs == renew_secret:
+        for (leasenum,lease) in self._enumerate_leases(f):
+            if lease.renew_secret == renew_secret:
                 # yup. See if we need to update the owner time.
-                if new_expire_time > et:
+                if new_expire_time > lease.expiration_time:
                     # yes
-                    new_lease = (oid,new_expire_time,rs,cs,anid)
-                    self._write_lease_record(f, leasenum, new_lease)
+                    lease.expiration_time = new_expire_time
+                    self._write_lease_record(f, leasenum, lease)
                 f.close()
                 return
-            accepting_nodeids.add(anid)
+            accepting_nodeids.add(lease.nodeid)
         f.close()
         # Return the accepting_nodeids set, to give the client a chance to
         # update the leases on a share which has been migrated from its
@@ -568,25 +611,31 @@ class MutableShareFile:
         raise IndexError(msg)
 
     def add_or_renew_lease(self, lease_info):
-        ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
         try:
-            self.renew_lease(renew_secret, expire_time)
+            self.renew_lease(lease_info.renew_secret,
+                             lease_info.expiration_time)
         except IndexError:
             self.add_lease(lease_info)
 
     def cancel_lease(self, cancel_secret):
-        """Remove any leases with the given cancel_secret. Return
-        (num_remaining_leases, space_freed). Raise IndexError if there was no
-        lease with the given cancel_secret."""
+        """Remove any leases with the given cancel_secret. If the last lease
+        is cancelled, the file will be removed. Return the number of bytes
+        that were freed (by truncating the list of leases, and possibly by
+        deleting the file. Raise IndexError if there was no lease with the
+        given cancel_secret."""
 
         accepting_nodeids = set()
         modified = 0
         remaining = 0
-        blank_lease = (0, 0, "\x00"*32, "\x00"*32, "\x00"*20)
+        blank_lease = LeaseInfo(owner_num=0,
+                                renew_secret="\x00"*32,
+                                cancel_secret="\x00"*32,
+                                expiration_time=0,
+                                nodeid="\x00"*20)
         f = open(self.home, 'rb+')
-        for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
-            accepting_nodeids.add(anid)
-            if cs == cancel_secret:
+        for (leasenum,lease) in self._enumerate_leases(f):
+            accepting_nodeids.add(lease.nodeid)
+            if lease.cancel_secret == cancel_secret:
                 self._write_lease_record(f, leasenum, blank_lease)
                 modified += 1
             else:
@@ -594,7 +643,11 @@ class MutableShareFile:
         if modified:
             freed_space = self._pack_leases(f)
             f.close()
-            return (remaining, freed_space)
+            if not remaining:
+                freed_space += os.stat(self.home)[stat.ST_SIZE]
+                self.unlink()
+            return freed_space
+
         msg = ("Unable to cancel non-existent lease. I have leases "
                "accepted by nodeids: ")
         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
@@ -647,6 +700,16 @@ class MutableShareFile:
                   (idlib.nodeid_b2a(write_enabler_nodeid),)
             raise BadWriteEnablerError(msg)
 
+    def update_write_enabler(self, old_write_enabler, new_write_enabler,
+                             my_nodeid, si_s):
+        self.check_write_enabler(old_write_enabler, si_s)
+        f = open(self.home, 'rb+')
+        f.seek(0)
+        header = struct.pack(">32s20s32s",
+                             self.MAGIC, my_nodeid, new_write_enabler)
+        f.write(header)
+        f.close()
+
     def check_testv(self, testv):
         test_good = True
         f = open(self.home, 'rb+')
@@ -839,7 +902,9 @@ class StorageServer(service.MultiService, Referenceable):
         # separate database. Note that the lease should not be added until
         # the BucketWrite has been closed.
         expire_time = time.time() + 31*24*60*60
-        lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
+        lease_info = LeaseInfo(owner_num,
+                               renew_secret, cancel_secret,
+                               expire_time, self.my_nodeid)
 
         space_per_bucket = allocated_size
         no_limits = self.sizelimit is None
@@ -893,13 +958,8 @@ class StorageServer(service.MultiService, Referenceable):
         self.add_latency("allocate", time.time() - start)
         return alreadygot, bucketwriters
 
-    def remote_renew_lease(self, storage_index, renew_secret):
-        start = time.time()
-        self.count("renew")
-        new_expire_time = time.time() + 31*24*60*60
-        found_buckets = False
+    def _iter_share_files(self, storage_index):
         for shnum, filename in self._get_bucket_shares(storage_index):
-            found_buckets = True
             f = open(filename, 'rb')
             header = f.read(32)
             f.close()
@@ -911,7 +971,36 @@ class StorageServer(service.MultiService, Referenceable):
             elif header[:4] == struct.pack(">L", 1):
                 sf = ShareFile(filename)
             else:
-                pass # non-sharefile
+                continue # non-sharefile
+            yield sf
+
+    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
+                         owner_num=0):
+        start = time.time()
+        self.count("add-lease")
+        new_expire_time = time.time() + 31*24*60*60
+        lease_info = LeaseInfo(owner_num,
+                               renew_secret, cancel_secret,
+                               new_expire_time, self.my_nodeid)
+        found_buckets = False
+        for sf in self._iter_share_files(storage_index):
+            found_buckets = True
+            # note: if the share has been migrated, the renew_lease()
+            # call will throw an exception, with information to help the
+            # client update the lease.
+            sf.add_or_renew_lease(lease_info)
+        self.add_latency("add-lease", time.time() - start)
+        if not found_buckets:
+            raise IndexError("no such storage index to do add-lease")
+
+
+    def remote_renew_lease(self, storage_index, renew_secret):
+        start = time.time()
+        self.count("renew")
+        new_expire_time = time.time() + 31*24*60*60
+        found_buckets = False
+        for sf in self._iter_share_files(storage_index):
+            found_buckets = True
             sf.renew_lease(renew_secret, new_expire_time)
         self.add_latency("renew", time.time() - start)
         if not found_buckets:
@@ -920,49 +1009,32 @@ class StorageServer(service.MultiService, Referenceable):
     def remote_cancel_lease(self, storage_index, cancel_secret):
         start = time.time()
         self.count("cancel")
-        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
 
-        remaining_files = 0
         total_space_freed = 0
         found_buckets = False
-        for shnum, filename in self._get_bucket_shares(storage_index):
+        for sf in self._iter_share_files(storage_index):
             # note: if we can't find a lease on one share, we won't bother
             # looking in the others. Unless something broke internally
             # (perhaps we ran out of disk space while adding a lease), the
             # leases on all shares will be identical.
             found_buckets = True
-            f = open(filename, 'rb')
-            header = f.read(32)
-            f.close()
-            if header[:32] == MutableShareFile.MAGIC:
-                sf = MutableShareFile(filename, self)
-                # note: if the share has been migrated, the renew_lease()
-                # call will throw an exception, with information to help the
-                # client update the lease.
-            elif header[:4] == struct.pack(">L", 1):
-                sf = ShareFile(filename)
-            else:
-                pass # non-sharefile
-            # this raises IndexError if the lease wasn't present
-            remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
-            total_space_freed += space_freed
-            if remaining_leases:
-                remaining_files += 1
-            else:
-                # now remove the sharefile. We'll almost certainly be
-                # removing the entire directory soon.
-                filelen = os.stat(filename)[stat.ST_SIZE]
-                os.unlink(filename)
-                total_space_freed += filelen
-        if not remaining_files:
-            fileutil.rm_dir(storagedir)
+            # this raises IndexError if the lease wasn't present XXXX
+            total_space_freed += sf.cancel_lease(cancel_secret)
+
+        if found_buckets:
+            storagedir = os.path.join(self.sharedir,
+                                      storage_index_to_dir(storage_index))
+            if not os.listdir(storagedir):
+                os.rmdir(storagedir)
+
         if self.consumed is not None:
             self.consumed -= total_space_freed
         if self.stats_provider:
-            self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
+            self.stats_provider.count('storage_server.bytes_freed',
+                                      total_space_freed)
         self.add_latency("cancel", time.time() - start)
         if not found_buckets:
-            raise IndexError("no such lease to cancel")
+            raise IndexError("no such storage index")
 
     def bucket_writer_closed(self, bw, consumed_size):
         if self.consumed is not None:
@@ -1077,10 +1149,9 @@ class StorageServer(service.MultiService, Referenceable):
             # and update the leases on all shares
             ownerid = 1 # TODO
             expire_time = time.time() + 31*24*60*60   # one month
-            my_nodeid = self.my_nodeid
-            anid = my_nodeid
-            lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
-                          anid)
+            lease_info = LeaseInfo(ownerid,
+                                   renew_secret, cancel_secret,
+                                   expire_time, self.my_nodeid)
             for share in shares.values():
                 share.add_or_renew_lease(lease_info)
 
@@ -1125,6 +1196,14 @@ class StorageServer(service.MultiService, Referenceable):
         self.add_latency("readv", time.time() - start)
         return datavs
 
+    def remote_update_write_enabler(self, storage_index,
+                                    old_write_enabler, new_write_enabler):
+        si_s = si_b2a(storage_index)
+        for sf in self._iter_share_files(storage_index):
+            if not isinstance(sf, MutableShareFile):
+                continue
+            sf.update_write_enabler(old_write_enabler, new_write_enabler,
+                                    self.my_nodeid, si_s)
 
 
 # the code before here runs on the storage server, not the client
index fca443a655943943578ebd8c72828efef027e965..d660caa75de682a68b742950bac12a7933415a01 100644 (file)
@@ -8,7 +8,7 @@ from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
 from allmydata.storage import BucketWriter, BucketReader, \
      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
-     storage_index_to_dir, DataTooLargeError
+     storage_index_to_dir, DataTooLargeError, LeaseInfo
 from allmydata.interfaces import BadWriteEnablerError
 from allmydata.test.common import LoggingServiceParent
 
@@ -56,7 +56,8 @@ class Bucket(unittest.TestCase):
         renew_secret = os.urandom(32)
         cancel_secret = os.urandom(32)
         expiration_time = time.time() + 5000
-        return (owner_num, renew_secret, cancel_secret, expiration_time)
+        return LeaseInfo(owner_num, renew_secret, cancel_secret,
+                         expiration_time, "\x00" * 20)
 
     def test_create(self):
         incoming, final = self.make_workdir("test_create")
@@ -109,7 +110,8 @@ class BucketProxy(unittest.TestCase):
         renew_secret = os.urandom(32)
         cancel_secret = os.urandom(32)
         expiration_time = time.time() + 5000
-        return (owner_num, renew_secret, cancel_secret, expiration_time)
+        return LeaseInfo(owner_num, renew_secret, cancel_secret,
+                         expiration_time, "\x00" * 20)
 
     def bucket_writer_closed(self, bw, consumed):
         pass
@@ -228,6 +230,7 @@ class Server(unittest.TestCase):
         workdir = self.workdir(name)
         ss = StorageServer(workdir, sizelimit,
                            stats_provider=FakeStatsProvider())
+        ss.setNodeID("\x00" * 20)
         ss.setServiceParent(self.sparent)
         return ss
 
@@ -450,7 +453,7 @@ class Server(unittest.TestCase):
 
         leases = list(ss.get_leases("si0"))
         self.failUnlessEqual(len(leases), 1)
-        self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
+        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
 
         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
@@ -469,7 +472,7 @@ class Server(unittest.TestCase):
 
         leases = list(ss.get_leases("si1"))
         self.failUnlessEqual(len(leases), 2)
-        self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
+        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
 
         # check that si0 is readable
         readers = ss.remote_get_buckets("si0")
@@ -505,7 +508,7 @@ class Server(unittest.TestCase):
 
         leases = list(ss.get_leases("si1"))
         self.failUnlessEqual(len(leases), 1)
-        self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
+        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
 
         ss.remote_renew_lease("si1", rs2)
         # cancelling the second should make it go away
@@ -549,6 +552,7 @@ class Server(unittest.TestCase):
     def test_readonly(self):
         workdir = self.workdir("test_readonly")
         ss = StorageServer(workdir, readonly_storage=True)
+        ss.setNodeID("\x00" * 20)
         ss.setServiceParent(self.sparent)
 
         canary = FakeCanary()
@@ -560,6 +564,7 @@ class Server(unittest.TestCase):
         # discard is really only used for other tests, but we test it anyways
         workdir = self.workdir("test_discard")
         ss = StorageServer(workdir, discard_storage=True)
+        ss.setNodeID("\x00" * 20)
         ss.setServiceParent(self.sparent)
 
         canary = FakeCanary()
@@ -594,7 +599,7 @@ class MutableServer(unittest.TestCase):
         workdir = self.workdir(name)
         ss = StorageServer(workdir, sizelimit)
         ss.setServiceParent(self.sparent)
-        ss.setNodeID("\x00" * 32)
+        ss.setNodeID("\x00" * 20)
         return ss
 
     def test_create(self):
@@ -925,17 +930,28 @@ class MutableServer(unittest.TestCase):
                                       1: ["1"*10],
                                       2: ["2"*10]})
 
-    def compare_leases_without_timestamps(self, a, b):
-        self.failUnlessEqual(len(a), len(b))
-        for i in range(len(a)):
-            (num_a, (ownerid_a, expiration_time_a,
-                   renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
-            (num_b, (ownerid_b, expiration_time_b,
-                   renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
-            self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
-                                   cancel_secret_a, nodeid_a),
-                                  (num_b, ownerid_b, renew_secret_b,
-                                   cancel_secret_b, nodeid_b) )
+    def compare_leases_without_timestamps(self, leases_a, leases_b):
+        self.failUnlessEqual(len(leases_a), len(leases_b))
+        for i in range(len(leases_a)):
+            num_a, a = leases_a[i]
+            num_b, b = leases_b[i]
+            self.failUnlessEqual(num_a, num_b)
+            self.failUnlessEqual(a.owner_num,       b.owner_num)
+            self.failUnlessEqual(a.renew_secret,    b.renew_secret)
+            self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
+            self.failUnlessEqual(a.nodeid,          b.nodeid)
+
+    def compare_leases(self, leases_a, leases_b):
+        self.failUnlessEqual(len(leases_a), len(leases_b))
+        for i in range(len(leases_a)):
+            num_a, a = leases_a[i]
+            num_b, b = leases_b[i]
+            self.failUnlessEqual(num_a, num_b)
+            self.failUnlessEqual(a.owner_num,       b.owner_num)
+            self.failUnlessEqual(a.renew_secret,    b.renew_secret)
+            self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
+            self.failUnlessEqual(a.nodeid,          b.nodeid)
+            self.failUnlessEqual(a.expiration_time, b.expiration_time)
 
     def test_leases(self):
         ss = self.create("test_leases", sizelimit=1000*1000)
@@ -1002,20 +1018,25 @@ class MutableServer(unittest.TestCase):
         all_leases = s0.debug_get_leases()
         # renewing with a bogus token should prompt an error message
 
-        # TODO: examine the exception thus raised, make sure the old nodeid
-        # is present, to provide for share migration
-        self.failUnlessRaises(IndexError,
-                              ss.remote_renew_lease, "si1",
-                              secrets(20)[1])
+        # examine the exception thus raised, make sure the old nodeid is
+        # present, to provide for share migration
+        e = self.failUnlessRaises(IndexError,
+                                  ss.remote_renew_lease, "si1",
+                                  secrets(20)[1])
+        e_s = str(e)
+        self.failUnless("Unable to renew non-existent lease" in e_s)
+        self.failUnless("I have leases accepted by nodeids:" in e_s)
+        self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
+
         # same for cancelling
         self.failUnlessRaises(IndexError,
                               ss.remote_cancel_lease, "si1",
                               secrets(20)[2])
-        self.failUnlessEqual(all_leases, s0.debug_get_leases())
+        self.compare_leases(all_leases, s0.debug_get_leases())
 
         # reading shares should not modify the timestamp
         read("si1", [], [(0,200)])
-        self.failUnlessEqual(all_leases, s0.debug_get_leases())
+        self.compare_leases(all_leases, s0.debug_get_leases())
 
         write("si1", secrets(0),
               {0: ([], [(200, "make me bigger")], None)}, [])
@@ -1056,6 +1077,43 @@ class MutableServer(unittest.TestCase):
         self.failUnlessRaises(IndexError,
                               ss.remote_cancel_lease, "si2", "nonsecret")
 
+    def test_update_write_enabler(self):
+        ss = self.create("test_update_write_enabler", sizelimit=1000*1000)
+        secrets = ( self.write_enabler("we1"),
+                    self.renew_secret("we1-0"),
+                    self.cancel_secret("we1-0") )
+        old_write_enabler = secrets[0]
+        new_write_enabler = self.write_enabler("we2")
+        new_secrets = (new_write_enabler, secrets[1], secrets[2])
+
+        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+        write = ss.remote_slot_testv_and_readv_and_writev
+        read = ss.remote_slot_readv
+        update_write_enabler = ss.remote_update_write_enabler
+
+        rc = write("si1", secrets, {0: ([], [(0,data)], None)}, [])
+        self.failUnlessEqual(rc, (True, {}))
+
+        rc = write("si1", secrets, {0: ([], [(1,data)], None)}, [])
+        self.failUnlessEqual(rc[0], True)
+
+        f = self.failUnlessRaises(BadWriteEnablerError,
+                                  write, "si1", new_secrets,
+                                  {}, [])
+        self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
+        ss.setNodeID("\xff" * 20)
+
+        rc = update_write_enabler("si1", old_write_enabler, new_write_enabler)
+        self.failUnlessEqual(rc, None)
+
+        f = self.failUnlessRaises(BadWriteEnablerError,
+                                  write, "si1", secrets,
+                                  {}, [])
+        self.failUnless("The write enabler was recorded by nodeid '77777777777777777777777777777777'." in f, f)
+
+        rc = write("si1", new_secrets, {0: ([], [(2,data)], None)}, [])
+        self.failUnlessEqual(rc[0], True)
+
 
 class Stats(unittest.TestCase):
 
@@ -1072,6 +1130,7 @@ class Stats(unittest.TestCase):
     def create(self, name, sizelimit=None):
         workdir = self.workdir(name)
         ss = StorageServer(workdir, sizelimit)
+        ss.setNodeID("\x00" * 20)
         ss.setServiceParent(self.sparent)
         return ss