can be used to pre-allocate space for a series of upcoming writes, or
truncate existing data. If the container is growing, new_length will
be applied before datav. If the container is shrinking, it will be
- applied afterwards.
+ applied afterwards. If new_length==0, the share will be deleted.
The read vector is used to extract data from all known shares,
*before* any writes have been applied. The same vector is used for
"storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 2**32,
"tolerates-immutable-read-overrun": False,
+ "delete-mutable-shares-with-zero-length-writev": False,
},
"application-version": "unknown: no get_version()",
},
version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": remaining_space,
"tolerates-immutable-read-overrun": True,
+ "delete-mutable-shares-with-zero-length-writev": True,
},
"application-version": str(allmydata.__version__),
}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
+ ownerid = 1 # TODO
+ expire_time = time.time() + 31*24*60*60 # one month
+ lease_info = LeaseInfo(ownerid,
+ renew_secret, cancel_secret,
+ expire_time, self.my_nodeid)
+
if testv_is_good:
# now apply the write vectors
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
- if sharenum not in shares:
- # allocate a new share
- allocated_size = 2000 # arbitrary, really
- share = self._allocate_slot_share(bucketdir, secrets,
- sharenum,
- allocated_size,
- owner_num=0)
- shares[sharenum] = share
- shares[sharenum].writev(datav, new_length)
- # and update the leases on all shares
- ownerid = 1 # TODO
- expire_time = time.time() + 31*24*60*60 # one month
- lease_info = LeaseInfo(ownerid,
- renew_secret, cancel_secret,
- expire_time, self.my_nodeid)
- for share in shares.values():
- share.add_or_renew_lease(lease_info)
+ if new_length == 0:
+ if sharenum in shares:
+ shares[sharenum].unlink()
+ else:
+ if sharenum not in shares:
+ # allocate a new share
+ allocated_size = 2000 # arbitrary, really
+ share = self._allocate_slot_share(bucketdir, secrets,
+ sharenum,
+ allocated_size,
+ owner_num=0)
+ shares[sharenum] = share
+ shares[sharenum].writev(datav, new_length)
+ # and update the lease
+ shares[sharenum].add_or_renew_lease(lease_info)
+
+ if new_length == 0:
+ # delete empty bucket directories
+ if not os.listdir(bucketdir):
+ os.rmdir(bucketdir)
+
# all done
self.add_latency("writev", time.time() - start)
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si2", "nonsecret")
+ def test_remove(self):
+ ss = self.create("test_remove")
+ self.allocate(ss, "si1", "we1", self._lease_secret.next(),
+ set([0,1,2]), 100)
+ readv = ss.remote_slot_readv
+ writev = ss.remote_slot_testv_and_readv_and_writev
+ secrets = ( self.write_enabler("we1"),
+ self.renew_secret("we1"),
+ self.cancel_secret("we1") )
+ # delete sh0 by setting its size to zero
+ answer = writev("si1", secrets,
+ {0: ([], [], 0)},
+ [])
+ # the answer should mention all the shares that existed before the
+ # write
+ self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+ # but a new read should show only sh1 and sh2
+ self.failUnlessEqual(readv("si1", [], [(0,10)]),
+ {1: [""], 2: [""]})
+
+ # delete sh1 by setting its size to zero
+ answer = writev("si1", secrets,
+ {1: ([], [], 0)},
+ [])
+ self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
+ self.failUnlessEqual(readv("si1", [], [(0,10)]),
+ {2: [""]})
+
+ # delete sh2 by setting its size to zero
+ answer = writev("si1", secrets,
+ {2: ([], [], 0)},
+ [])
+ self.failUnlessEqual(answer, (True, {2:[]}) )
+ self.failUnlessEqual(readv("si1", [], [(0,10)]),
+ {})
+ # and the bucket directory should now be gone
+ si = base32.b2a("si1")
+ # note: this is a detail of the storage server implementation, and
+ # may change in the future
+ prefix = si[:2]
+ prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
+ bucketdir = os.path.join(prefixdir, si)
+ self.failUnless(os.path.exists(prefixdir))
+ self.failIf(os.path.exists(bucketdir))
class Stats(unittest.TestCase):