MAX_BUCKETS = 200 # per peer
ShareData = StringConstraint(400000) # 1MB segment / k=3 = 334kB
URIExtensionData = StringConstraint(1000)
+LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
+LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
+
class RIIntroducerClient(RemoteInterface):
def new_peers(furls=SetOf(FURL)):
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
+ renew_secret=LeaseRenewSecret,
+ cancel_secret=LeaseCancelSecret,
sharenums=SetOf(int, maxLength=MAX_BUCKETS),
allocated_size=int, canary=Referenceable):
"""
- @param canary: If the canary is lost before close(), the bucket is deleted.
+ @param storage_index: the index of the bucket to be created or
+ increfed.
+ @param sharenums: these are the share numbers (probably between 0 and
+ 99) that the sender is proposing to store on this
+ server.
+ @param renew_secret: This is the secret used to protect bucket refresh
+ This secret is generated by the client and
+ stored for later comparison by the server. Each
+ server is given a different secret.
+ @param cancel_secret: Like renew_secret, but protects bucket decref.
+ @param canary: If the canary is lost before close(), the bucket is
+ deleted.
@return: tuple of (alreadygot, allocated), where alreadygot is what we
- already have and is what we hereby agree to accept
+ already have and is what we hereby agree to accept. New
+ leases are added for shares in both lists.
"""
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
space += bw.allocated_size()
return space
- def remote_allocate_buckets(self, storage_index, sharenums, allocated_size,
+ def remote_allocate_buckets(self, storage_index,
+ renew_secret, cancel_secret,
+ sharenums, allocated_size,
canary):
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer
+RS = hashutil.tagged_hash("blah", "foo")
+CS = RS
+
class Bucket(unittest.TestCase):
def make_workdir(self, name):
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
canary = Referenceable()
- already,writers = ss.remote_allocate_buckets("vid", [0,1,2],
+ already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2],
75, canary)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# now if we about writing again, the server should offer those three
# buckets as already present
- already,writers = ss.remote_allocate_buckets("vid", [0,1,2,3,4],
+ already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2,3,4],
75, canary)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# tell new uploaders that they already exist (so that we don't try to
# upload into them a second time)
- already,writers = ss.remote_allocate_buckets("vid", [2,3,4,5],
+ already,writers = ss.remote_allocate_buckets("vid", RS, CS, [2,3,4,5],
75, canary)
self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
ss = self.create("test_sizelimits", 100)
canary = Referenceable()
- already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
+ already,writers = ss.remote_allocate_buckets("vid1", RS, CS, [0,1,2],
25, canary)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 75 bytes provisionally allocated,
# allowing only 25 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
- already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
+ already2,writers2 = ss.remote_allocate_buckets("vid2", RS, CS, [0,1,2],
25, canary)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
self.failUnlessEqual(len(ss._active_writers), 0)
# now there should be 25 bytes allocated, and 75 free
- already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
+ already3,writers3 = ss.remote_allocate_buckets("vid3", RS, CS,
+ [0,1,2,3],
25, canary)
self.failUnlessEqual(len(writers3), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
# during runtime, so if we were creating any metadata, the allocation
# would be more than 25 bytes and this test would need to be changed.
ss = self.create("test_sizelimits", 100)
- already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
+ already4,writers4 = ss.remote_allocate_buckets("vid4",
+ RS, CS, [0,1,2,3],
25, canary)
self.failUnlessEqual(len(writers4), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
d.addCallback(lambda res: _call())
return d
- def allocate_buckets(self, crypttext_hash, sharenums,
- share_size, canary):
+ def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
+ sharenums, share_size, canary):
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
if self.mode == "full":
return (set(), {},)
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, num_segments, num_share_hashes,
- crypttext_hash):
+ storage_index):
self.peerid = peerid
self.permutedid = permutedid
self.connection = connection # to an RIClient
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
- self.crypttext_hash = crypttext_hash
+ self.storage_index = storage_index
self._storageserver = None
+ h = hashutil.bucket_renewal_secret_hash
+ # XXX
+ self.my_secret = "secret"
+ self.renew_secret = h(self.my_secret, self.storage_index, self.peerid)
+ h = hashutil.bucket_cancel_secret_hash
+ self.cancel_secret = h(self.my_secret, self.storage_index, self.peerid)
+
def query(self, sharenums):
if not self._storageserver:
d = self.connection.callRemote("get_service", "storageserver")
def _query(self, sharenums):
#print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets",
- self.crypttext_hash,
+ self.storage_index,
+ self.renew_secret,
+ self.cancel_secret,
sharenums,
self.allocated_size,
canary=Referenceable())
def random_key():
return os.urandom(KEYLEN)
+def file_renewal_secret_hash(my_secret, storage_index):
+ my_renewal_secret = tagged_hash(my_secret, "bucket_renewal_secret")
+ file_renewal_secret = tagged_pair_hash("file_renewal_secret",
+ my_renewal_secret, storage_index)
+ return file_renewal_secret
+
+def file_cancel_secret_hash(my_secret, storage_index):
+ my_cancel_secret = tagged_hash(my_secret, "bucket_cancel_secret")
+ file_cancel_secret = tagged_pair_hash("file_cancel_secret",
+ my_cancel_secret, storage_index)
+ return file_cancel_secret
+
+def bucket_renewal_secret_hash(my_secret, storage_index, peerid):
+ my_renewal_secret = tagged_hash(my_secret, "bucket_renewal_secret")
+ file_renewal_secret = tagged_pair_hash("file_renewal_secret",
+ my_renewal_secret, storage_index)
+ bucket_renewal_secret = tagged_pair_hash("bucket_renewal_secret",
+ file_renewal_secret, peerid)
+ return bucket_renewal_secret
+
+def bucket_cancel_secret_hash(my_secret, storage_index, peerid):
+ my_cancel_secret = tagged_hash(my_secret, "bucket_cancel_secret")
+ file_cancel_secret = tagged_pair_hash("file_cancel_secret",
+ my_cancel_secret, storage_index)
+ bucket_cancel_secret = tagged_pair_hash("bucket_cancel_secret",
+ file_cancel_secret, peerid)
+ return bucket_cancel_secret
+
def dir_write_enabler_hash(write_key):
return tagged_hash("allmydata_dir_write_enabler_v1", write_key)
def dir_read_key_hash(write_key):