From: Brian Warner Date: Fri, 13 Jul 2007 21:04:49 +0000 (-0700) Subject: storage: use one file per share instead of 7 (#85). work-in-progress, tests still... X-Git-Url: https://git.rkrishnan.org/simplejson/%22news.html/module-simplejson.tests.html?a=commitdiff_plain;h=cd8648d39b89768459d54ed1cc3e570f9082623a;p=tahoe-lafs%2Ftahoe-lafs.git storage: use one file per share instead of 7 (#85). work-in-progress, tests still fail --- diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 008daf05..e048f1d5 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -102,8 +102,17 @@ class ValidatedBucket: self.share_hash_tree = share_hash_tree self._roothash = roothash self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks) + self.started = False def get_block(self, blocknum): + if not self.started: + d = self.bucket.start() + def _started(res): + self.started = True + return self.get_block(blocknum) + d.addCallback(_started) + return d + # the first time we use this bucket, we need to fetch enough elements # of the share hash tree to validate it from our share hash up to the # hashroot. @@ -380,7 +389,8 @@ class FileDownloader: bucket = sources[0] sources = sources[1:] #d = bucket.callRemote(methname, *args) - d = getattr(bucket, methname)(*args) + d = bucket.startIfNecessary() + d.addCallback(lambda res: getattr(bucket, methname)(*args)) d.addCallback(validatorfunc, bucket) def _bad(f): log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 830b5b63..a36b71a6 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -180,6 +180,9 @@ class Encoder(object): self.setup_codec() # TODO: duplicate call? d = defer.succeed(None) + for l in self.landlords.values(): + d.addCallback(lambda res, l=l: l.start()) + for i in range(self.num_segments-1): # note to self: this form doesn't work, because lambda only # captures the slot, not the value diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 731c22ce..9a1f2145 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -61,6 +61,38 @@ class RIClient(RemoteInterface): return Nodeid class RIBucketWriter(RemoteInterface): + def write(offset=int, data=ShareData): + return None + + def close(): + """ + If the data that has been written is incomplete or inconsistent then + the server will throw the data away, else it will store it for future + retrieval. + """ + return None + +class RIBucketReader(RemoteInterface): + def read(offset=int, length=int): + return ShareData + + +class RIStorageServer(RemoteInterface): + def allocate_buckets(storage_index=StorageIndex, + sharenums=SetOf(int, maxLength=MAX_BUCKETS), + sharesize=int, blocksize=int, canary=Referenceable): + """ + @param canary: If the canary is lost before close(), the bucket is deleted. + @return: tuple of (alreadygot, allocated), where alreadygot is what we + already have and is what we hereby agree to accept + """ + return TupleOf(SetOf(int, maxLength=MAX_BUCKETS), + DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS)) + def get_buckets(storage_index=StorageIndex): + return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) + + +class IStorageBucketWriter(Interface): def put_block(segmentnum=int, data=ShareData): """@param data: For most segments, this data will be 'blocksize' bytes in length. The last segment might be shorter. @@ -92,16 +124,11 @@ class RIBucketWriter(RemoteInterface): write(k + ':' + netstring(dict[k])) """ return None - def close(): - """ - If the data that has been written is incomplete or inconsistent then - the server will throw the data away, else it will store it for future - retrieval. - """ - return None + pass + +class IStorageBucketReader(Interface): -class RIBucketReader(RemoteInterface): def get_block(blocknum=int): """Most blocks will be the same size. The last block might be shorter than the others. @@ -121,55 +148,6 @@ class RIBucketReader(RemoteInterface): return URIExtensionData -class RIStorageServer(RemoteInterface): - def allocate_buckets(storage_index=StorageIndex, - sharenums=SetOf(int, maxLength=MAX_BUCKETS), - sharesize=int, blocksize=int, canary=Referenceable): - """ - @param canary: If the canary is lost before close(), the bucket is deleted. - @return: tuple of (alreadygot, allocated), where alreadygot is what we - already have and is what we hereby agree to accept - """ - return TupleOf(SetOf(int, maxLength=MAX_BUCKETS), - DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS)) - def get_buckets(storage_index=StorageIndex): - return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) - - -class IStorageBucketWriter(Interface): - def put_block(segmentnum, data): - pass - - def put_plaintext_hashes(hashes): - pass - def put_crypttext_hashes(hashes): - pass - def put_block_hashes(blockhashes): - pass - def put_share_hashes(sharehashes): - pass - def put_uri_extension(data): - pass - def close(): - pass - -class IStorageBucketReader(Interface): - - def get_block(blocknum): - pass - - def get_plaintext_hashes(): - pass - def get_crypttext_hashes(): - pass - def get_block_hashes(): - pass - def get_share_hashes(): - pass - def get_uri_extension(): - pass - - # hm, we need a solution for forward references in schemas from foolscap.schema import Any diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index 66cf33e0..4cc53e13 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -1,13 +1,14 @@ -import os, re, weakref +import os, re, weakref, stat, struct from foolscap import Referenceable from twisted.application import service +from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ RIBucketReader, IStorageBucketWriter, IStorageBucketReader from allmydata import interfaces -from allmydata.util import bencode, fileutil, idlib +from allmydata.util import fileutil, idlib from allmydata.util.assertutil import precondition # store/ @@ -25,110 +26,46 @@ NUM_RE=re.compile("[0-9]*") class BucketWriter(Referenceable): implements(RIBucketWriter) - def __init__(self, ss, incominghome, finalhome, blocksize, sharesize): + def __init__(self, ss, incominghome, finalhome, size): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome - self.blocksize = blocksize - self.sharesize = sharesize + self._size = size self.closed = False - self._next_segnum = 0 - fileutil.make_dirs(incominghome) - self._write_file('blocksize', str(blocksize)) + # touch the file, so later callers will see that we're working on it + f = open(self.incominghome, 'ab') + f.close() def allocated_size(self): - return self.sharesize + return self._size - def _write_file(self, fname, data): - open(os.path.join(self.incominghome, fname), 'wb').write(data) - - def remote_put_block(self, segmentnum, data): + def remote_write(self, offset, data): precondition(not self.closed) - # all blocks but the last will be of size self.blocksize, however the - # last one may be short, and we don't know the total number of - # segments so we can't tell which is which. - assert len(data) <= self.blocksize - assert segmentnum == self._next_segnum # must write in sequence - self._next_segnum = segmentnum + 1 - f = fileutil.open_or_create(os.path.join(self.incominghome, 'data')) - f.seek(self.blocksize*segmentnum) + precondition(offset >= 0) + precondition(offset+len(data) <= self._size) + f = open(self.incominghome, 'ab') + f.seek(offset) f.write(data) - - def remote_put_plaintext_hashes(self, hashes): - precondition(not self.closed) - # TODO: verify the length of blockhashes. - # TODO: tighten foolscap schema to require exactly 32 bytes. - self._write_file('plaintext_hashes', ''.join(hashes)) - - def remote_put_crypttext_hashes(self, hashes): - precondition(not self.closed) - # TODO: verify the length of blockhashes. - # TODO: tighten foolscap schema to require exactly 32 bytes. - self._write_file('crypttext_hashes', ''.join(hashes)) - - def remote_put_block_hashes(self, blockhashes): - precondition(not self.closed) - # TODO: verify the length of blockhashes. - # TODO: tighten foolscap schema to require exactly 32 bytes. - self._write_file('blockhashes', ''.join(blockhashes)) - - def remote_put_share_hashes(self, sharehashes): - precondition(not self.closed) - self._write_file('sharehashes', bencode.bencode(sharehashes)) - - def remote_put_uri_extension(self, data): - precondition(not self.closed) - self._write_file('uri_extension', data) + f.close() def remote_close(self): precondition(not self.closed) - # TODO assert or check the completeness and consistency of the data that has been written - fileutil.make_dirs(os.path.dirname(self.finalhome)) fileutil.rename(self.incominghome, self.finalhome) - try: - os.rmdir(os.path.dirname(self.incominghome)) - except OSError: - # Perhaps the directory wasn't empty. In any case, ignore the error. - pass - self.closed = True - self.ss.bucket_writer_closed(self, fileutil.du(self.finalhome)) + filelen = os.stat(self.finalhome)[stat.ST_SIZE] + self.ss.bucket_writer_closed(self, filelen) -def str2l(s): - """ split string (pulled from storage) into a list of blockids """ - return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ] class BucketReader(Referenceable): implements(RIBucketReader) def __init__(self, home): self.home = home - self.blocksize = int(self._read_file('blocksize')) - - def _read_file(self, fname): - return open(os.path.join(self.home, fname), 'rb').read() - - def remote_get_block(self, blocknum): - f = open(os.path.join(self.home, 'data'), 'rb') - f.seek(self.blocksize * blocknum) - return f.read(self.blocksize) # this might be short for the last block - - def remote_get_plaintext_hashes(self): - return str2l(self._read_file('plaintext_hashes')) - def remote_get_crypttext_hashes(self): - return str2l(self._read_file('crypttext_hashes')) - - def remote_get_block_hashes(self): - return str2l(self._read_file('blockhashes')) - def remote_get_share_hashes(self): - hashes = bencode.bdecode(self._read_file('sharehashes')) - # tuples come through bdecode(bencode()) as lists, which violates the - # schema - return [tuple(i) for i in hashes] - - def remote_get_uri_extension(self): - return self._read_file('uri_extension') + def remote_read(self, offset, length): + f = open(self.home, 'rb') + f.seek(offset) + return f.read(length) class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) @@ -159,7 +96,7 @@ class StorageServer(service.MultiService, Referenceable): return space def remote_allocate_buckets(self, storage_index, sharenums, sharesize, - blocksize, canary): + canary): alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter si_s = idlib.b2a(storage_index) @@ -174,8 +111,9 @@ class StorageServer(service.MultiService, Referenceable): if os.path.exists(incominghome) or os.path.exists(finalhome): alreadygot.add(shnum) elif no_limits or remaining_space >= space_per_bucket: + fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) bw = BucketWriter(self, incominghome, finalhome, - blocksize, space_per_bucket) + space_per_bucket) bucketwriters[shnum] = bw self._active_writers[bw] = 1 if yes_limits: @@ -184,6 +122,9 @@ class StorageServer(service.MultiService, Referenceable): # not enough space to accept this bucket pass + if bucketwriters: + fileutil.make_dirs(os.path.join(self.storedir, si_s)) + return alreadygot, bucketwriters def bucket_writer_closed(self, bw, consumed_size): @@ -204,24 +145,127 @@ class StorageServer(service.MultiService, Referenceable): return bucketreaders +""" +Share data is written into a single file. At the start of the file, there is +a series of four-byte big-endian offset values, which indicate where each +section starts. Each offset is measured from the beginning of the file. + +0x00: segment size +0x04: offset of data (=00 00 00 1c) +0x08: offset of plaintext_hash_tree +0x0c: offset of crypttext_hash_tree +0x10: offset of block_hashes +0x14: offset of share_hashes +0x18: offset of uri_extension_length + uri_extension +0x1c: start of data + start of plaintext_hash_tree + start of crypttext_hash_tree + start of block_hashes + start of share_hashes + each share_hash is written as a two-byte (big-endian) hashnum + followed by the 32-byte SHA-256 hash. We only store the hashes + necessary to validate the share hash root + start of uri_extension_length (four-byte big-endian value) + start of uri_extension +""" + class WriteBucketProxy: implements(IStorageBucketWriter) - def __init__(self, rref): + def __init__(self, rref, data_size, segment_size, num_segments, + num_share_hashes): self._rref = rref + self._segment_size = segment_size + + HASH_SIZE = interfaces.HASH_SIZE + self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE + # how many share hashes are included in each share? This will be + # about ln2(num_shares). + self._share_hash_size = num_share_hashes * (2+HASH_SIZE) + + offsets = self._offsets = {} + x = 0x1c + offsets['data'] = x + x += data_size + offsets['plaintext_hash_tree'] = x + x += self._segment_hash_size + offsets['crypttext_hash_tree'] = x + x += self._segment_hash_size + offsets['block_hashes'] = x + x += self._segment_hash_size + offsets['share_hashes'] = x + x += self._share_hash_size + offsets['uri_extension'] = x + + offset_data = struct.pack(">LLLLLLL", + segment_size, + offsets['data'], + offsets['plaintext_hash_tree'], + offsets['crypttext_hash_tree'], + offsets['block_hashes'], + offsets['share_hashes'], + offsets['uri_extension'] + ) + assert len(offset_data) == 7*4 + self._offset_data = offset_data + + def start(self): + return self._write(0, self._offset_data) def put_block(self, segmentnum, data): - return self._rref.callRemote("put_block", segmentnum, data) + offset = self._offsets['data'] + segmentnum * self._segment_size + assert offset + len(data) <= self._offsets['uri_extension'] + assert isinstance(data, str) + if segmentnum < self._segment_size-1: + assert len(data) == self._segment_size + else: + assert len(data) <= self._segment_size + return self._write(offset, data) def put_plaintext_hashes(self, hashes): - return self._rref.callRemote("put_plaintext_hashes", hashes) + offset = self._offsets['plaintext_hash_tree'] + assert isinstance(hashes, list) + data = "".join(hashes) + assert len(data) == self._segment_hash_size + assert offset + len(data) <= self._offsets['crypttext_hash_tree'] + return self._write(offset, data) + def put_crypttext_hashes(self, hashes): - return self._rref.callRemote("put_crypttext_hashes", hashes) + offset = self._offsets['crypttext_hash_tree'] + assert isinstance(hashes, list) + data = "".join(hashes) + assert len(data) == self._segment_hash_size + assert offset + len(data) <= self._offsets['block_hashes'] + return self._write(offset, data) + def put_block_hashes(self, blockhashes): - return self._rref.callRemote("put_block_hashes", blockhashes) + offset = self._offsets['block_hashes'] + assert isinstance(blockhashes, list) + data = "".join(blockhashes) + assert len(data) == self._segment_hash_size + assert offset + len(data) <= self._offsets['share_hashes'] + return self._write(offset, data) + def put_share_hashes(self, sharehashes): - return self._rref.callRemote("put_share_hashes", sharehashes) + # sharehashes is a list of (index, hash) tuples, so they get stored + # as 2+32=34 bytes each + offset = self._offsets['share_hashes'] + assert isinstance(sharehashes, list) + data = "".join([struct.pack(">H", hashnum) + hashvalue + for hashnum,hashvalue in sharehashes]) + assert len(data) == self._share_hash_size + assert offset + len(data) <= self._offsets['uri_extension'] + return self._write(offset, data) + def put_uri_extension(self, data): - return self._rref.callRemote("put_uri_extension", data) + offset = self._offsets['uri_extension'] + assert isinstance(data, str) + length = struct.pack(">L", len(data)) + return self._write(offset, length+data) + + def _write(self, offset, data): + # TODO: for small shares, buffer the writes and do just a single call + return self._rref.callRemote("write", offset, data) + def close(self): return self._rref.callRemote("close") @@ -230,17 +274,87 @@ class ReadBucketProxy: def __init__(self, rref): self._rref = rref + def startIfNecessary(self): + if self._started: + return defer.succeed(self) + d = self.start() + d.addCallback(lambda res: self) + return d + + def start(self): + # TODO: for small shares, read the whole bucket in start() + d = self._read(0, 7*4) + self._offsets = {} + def _got_offsets(data): + self._segment_size = struct.unpack(">L", data[0:4])[0] + x = 4 + for field in ( 'data', + 'plaintext_hash_tree', + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension' ): + offset = struct.unpack(">L", data[x:x+4])[0] + x += 4 + self._offsets[field] = offset + d.addCallback(_got_offsets) + return d + def get_block(self, blocknum): - return self._rref.callRemote("get_block", blocknum) + offset = self._offsets['data'] + blocknum * self._segment_size + return self._read(offset, self._segment_size) + + def _str2l(self, s): + """ split string (pulled from storage) into a list of blockids """ + return [ s[i:i+interfaces.HASH_SIZE] + for i in range(0, len(s), interfaces.HASH_SIZE) ] def get_plaintext_hashes(self): - return self._rref.callRemote("get_plaintext_hashes") + offset = self._offsets['plaintext_hash_tree'] + size = self._offsets['crypttext_hash_tree'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + def get_crypttext_hashes(self): - return self._rref.callRemote("get_crypttext_hashes") + offset = self._offsets['crypttext_hash_tree'] + size = self._offsets['block_hashes'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + def get_block_hashes(self): - return self._rref.callRemote("get_block_hashes") + offset = self._offsets['block_hashes'] + size = self._offsets['share_hashes'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + def get_share_hashes(self): - return self._rref.callRemote("get_share_hashes") - def get_uri_extension(self): - return self._rref.callRemote("get_uri_extension") + offset = self._offsets['share_hashes'] + size = self._offsets['uri_extension'] - offset + HASH_SIZE = interfaces.HASH_SIZE + assert size % (2+HASH_SIZE) == 0 + d = self._read(offset, size) + def _unpack_share_hashes(data): + assert len(data) == size + hashes = [] + for i in range(0, size, 2+HASH_SIZE): + hashnum = struct.unpack(">H", data[i:i+2])[0] + hashvalue = data[i+2:i+2+HASH_SIZE] + hashes.append( (hashnum, hashvalue) ) + return hashes + d.addCallback(_unpack_share_hashes) + return d + def get_uri_extension(self): + offset = self._offsets['uri_extension'] + d = self._read(offset, 4) + def _got_length(data): + length = struct.unpack(">L", data)[0] + return self._read(offset+4, length) + d.addCallback(_got_length) + return d + + def _read(self, offset, length): + return self._rref.callRemote("read", offset, length) diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 59857d4c..d4a42d0f 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -61,17 +61,10 @@ class FakeBucketWriter: self.share_hashes = None self.closed = False - def callRemote(self, methname, *args, **kwargs): - # this allows FakeBucketWriter to be used either as an - # IStorageBucketWriter or as the remote reference that it wraps. This - # should be cleaned up eventually when we change RIBucketWriter to - # have just write(offset, data) and close() - def _call(): - meth = getattr(self, methname) - return meth(*args, **kwargs) - d = eventual.fireEventually() - d.addCallback(lambda res: _call()) - return d + def startIfNecessary(self): + return defer.succeed(self) + def start(self): + return defer.succeed(self) def put_block(self, segmentnum, data): def _try(): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 7d02e178..9f76d70f 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -2,18 +2,20 @@ from twisted.trial import unittest from twisted.application import service +from twisted.internet import defer from foolscap import Referenceable import os.path -from allmydata import storageserver -from allmydata.util import fileutil +from allmydata import storageserver, interfaces +from allmydata.util import fileutil, hashutil class Bucket(unittest.TestCase): def make_workdir(self, name): - basedir = os.path.join("test_storage", "Bucket", name) + basedir = os.path.join("storage", "Bucket", name) incoming = os.path.join(basedir, "tmp", "bucket") final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) + fileutil.make_dirs(os.path.join(basedir, "tmp")) return incoming, final def bucket_writer_closed(self, bw, consumed): @@ -21,31 +23,138 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") - bw = storageserver.BucketWriter(self, incoming, final, 25, 57) - bw.remote_put_block(0, "a"*25) - bw.remote_put_block(1, "b"*25) - bw.remote_put_block(2, "c"*7) # last block may be short + bw = storageserver.BucketWriter(self, incoming, final, 200) + bw.remote_write(0, "a"*25) + bw.remote_write(25, "b"*25) + bw.remote_write(50, "c"*25) + bw.remote_write(75, "d"*7) bw.remote_close() def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = storageserver.BucketWriter(self, incoming, final, 25, 57) - bw.remote_put_block(0, "a"*25) - bw.remote_put_block(1, "b"*25) - bw.remote_put_block(2, "c"*7) # last block may be short - bw.remote_put_block_hashes(["1"*32, "2"*32, "3"*32, "4"*32]) - bw.remote_put_share_hashes([(5, "5"*32), (6, "6"*32)]) + bw = storageserver.BucketWriter(self, incoming, final, 200) + bw.remote_write(0, "a"*25) + bw.remote_write(25, "b"*25) + bw.remote_write(50, "c"*7) # last block may be short bw.remote_close() # now read from it br = storageserver.BucketReader(final) - self.failUnlessEqual(br.remote_get_block(0), "a"*25) - self.failUnlessEqual(br.remote_get_block(1), "b"*25) - self.failUnlessEqual(br.remote_get_block(2), "c"*7) - self.failUnlessEqual(br.remote_get_block_hashes(), - ["1"*32, "2"*32, "3"*32, "4"*32]) - self.failUnlessEqual(br.remote_get_share_hashes(), - [(5, "5"*32), (6, "6"*32)]) + self.failUnlessEqual(br.remote_read(0, 25), "a"*25) + self.failUnlessEqual(br.remote_read(25, 25), "b"*25) + self.failUnlessEqual(br.remote_read(50, 7), "c"*7) + +class RemoteBucket: + + def callRemote(self, methname, *args, **kwargs): + def _call(): + meth = getattr(self.target, "remote_" + methname) + return meth(*args, **kwargs) + return defer.maybeDeferred(_call) + +class BucketProxy(unittest.TestCase): + def make_bucket(self, name, size): + basedir = os.path.join("storage", "BucketProxy", name) + incoming = os.path.join(basedir, "tmp", "bucket") + final = os.path.join(basedir, "bucket") + fileutil.make_dirs(basedir) + fileutil.make_dirs(os.path.join(basedir, "tmp")) + bw = storageserver.BucketWriter(self, incoming, final, size) + rb = RemoteBucket() + rb.target = bw + return bw, rb, final + + def bucket_writer_closed(self, bw, consumed): + pass + + def test_create(self): + bw, rb, final = self.make_bucket("test_create", 500) + bp = storageserver.WriteBucketProxy(rb, + data_size=300, + segment_size=10, + num_segments=5, + num_share_hashes=3) + self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp)) + + def test_readwrite(self): + # Let's pretend each share has 100 bytes of data, and that there are + # 4 segments (25 bytes each), and 8 shares total. So the three + # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree, + # block_hashes) will have 4 leaves and 7 nodes each. The per-share + # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3 + # nodes. Furthermore, let's assume the uri_extension is 500 bytes + # long. That should make the whole share: + # + # 0x1c + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1406 bytes long + + plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i) + for i in range(7)] + crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i) + for i in range(7)] + block_hashes = [hashutil.tagged_hash("block", "bar%d" % i) + for i in range(7)] + share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i)) + for i in (1,9,13)] + uri_extension = "s" + "E"*498 + "e" + + bw, rb, final = self.make_bucket("test_readwrite", 1406) + bp = storageserver.WriteBucketProxy(rb, + data_size=100, + segment_size=25, + num_segments=4, + num_share_hashes=3) + + d = bp.start() + d.addCallback(lambda res: bp.put_block(0, "a"*25)) + d.addCallback(lambda res: bp.put_block(1, "b"*25)) + d.addCallback(lambda res: bp.put_block(2, "c"*25)) + d.addCallback(lambda res: bp.put_block(3, "d"*25)) + d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes)) + d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes)) + d.addCallback(lambda res: bp.put_block_hashes(block_hashes)) + d.addCallback(lambda res: bp.put_share_hashes(share_hashes)) + d.addCallback(lambda res: bp.put_uri_extension(uri_extension)) + d.addCallback(lambda res: bp.close()) + + # now read everything back + def _start_reading(res): + br = storageserver.BucketReader(final) + rb = RemoteBucket() + rb.target = br + rbp = storageserver.ReadBucketProxy(rb) + self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp)) + + d1 = rbp.start() + d1.addCallback(lambda res: rbp.get_block(0)) + d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25)) + d1.addCallback(lambda res: rbp.get_block(1)) + d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25)) + d1.addCallback(lambda res: rbp.get_block(2)) + d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25)) + d1.addCallback(lambda res: rbp.get_block(3)) + d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25)) + + d1.addCallback(lambda res: rbp.get_plaintext_hashes()) + d1.addCallback(lambda res: + self.failUnlessEqual(res, plaintext_hashes)) + d1.addCallback(lambda res: rbp.get_crypttext_hashes()) + d1.addCallback(lambda res: + self.failUnlessEqual(res, crypttext_hashes)) + d1.addCallback(lambda res: rbp.get_block_hashes()) + d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes)) + d1.addCallback(lambda res: rbp.get_share_hashes()) + d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes)) + d1.addCallback(lambda res: rbp.get_uri_extension()) + d1.addCallback(lambda res: + self.failUnlessEqual(res, uri_extension)) + + return d1 + + d.addCallback(_start_reading) + + return d + + class Server(unittest.TestCase): @@ -74,7 +183,7 @@ class Server(unittest.TestCase): canary = Referenceable() already,writers = ss.remote_allocate_buckets("vid", [0,1,2], - 75, 25, canary) + 75, canary) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) @@ -82,19 +191,18 @@ class Server(unittest.TestCase): self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) for i,wb in writers.items(): - wb.remote_put_block(0, "%25d" % i) + wb.remote_write(0, "%25d" % i) wb.remote_close() # now they should be readable b = ss.remote_get_buckets("vid") self.failUnlessEqual(set(b.keys()), set([0,1,2])) - self.failUnlessEqual(b[0].remote_get_block(0), - "%25d" % 0) + self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0) # now if we about writing again, the server should offer those three # buckets as already present already,writers = ss.remote_allocate_buckets("vid", [0,1,2,3,4], - 75, 25, canary) + 75, canary) self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([3,4])) @@ -103,7 +211,7 @@ class Server(unittest.TestCase): # upload into them a second time) already,writers = ss.remote_allocate_buckets("vid", [2,3,4,5], - 75, 25, canary) + 75, canary) self.failUnlessEqual(already, set([2,3,4])) self.failUnlessEqual(set(writers.keys()), set([5])) @@ -112,35 +220,42 @@ class Server(unittest.TestCase): canary = Referenceable() already,writers = ss.remote_allocate_buckets("vid1", [0,1,2], - 25, 5, canary) + 25, canary) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 75 bytes provisionally allocated, # allowing only 25 more to be claimed + self.failUnlessEqual(len(ss._active_writers), 3) already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2], - 25, 5, canary) + 25, canary) self.failUnlessEqual(len(writers2), 1) + self.failUnlessEqual(len(ss._active_writers), 4) # we abandon the first set, so their provisional allocation should be # returned del already del writers + self.failUnlessEqual(len(ss._active_writers), 1) # and we close the second set, so their provisional allocation should # become real, long-term allocation for bw in writers2.values(): + bw.remote_write(0, "a"*25) bw.remote_close() del already2 del writers2 del bw + self.failUnlessEqual(len(ss._active_writers), 0) # now there should be 25 bytes allocated, and 75 free already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3], - 25, 5, canary) + 25, canary) self.failUnlessEqual(len(writers3), 3) + self.failUnlessEqual(len(ss._active_writers), 3) del already3 del writers3 + self.failUnlessEqual(len(ss._active_writers), 0) ss.disownServiceParent() del ss @@ -150,5 +265,6 @@ class Server(unittest.TestCase): # would be more than 25 bytes and this test would need to be changed. ss = self.create("test_sizelimits", 100) already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3], - 25, 5, canary) + 25, canary) self.failUnlessEqual(len(writers4), 3) + self.failUnlessEqual(len(ss._active_writers), 3) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 33afc0a2..5c327da4 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -24,13 +24,16 @@ class TooFullError(Exception): class PeerTracker: def __init__(self, peerid, permutedid, connection, - sharesize, blocksize, crypttext_hash): + sharesize, blocksize, num_segments, num_share_hashes, + crypttext_hash): self.peerid = peerid self.permutedid = permutedid self.connection = connection # to an RIClient self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize self.blocksize = blocksize + self.num_segments = num_segments + self.num_share_hashes = num_share_hashes self.crypttext_hash = crypttext_hash self._storageserver = None @@ -54,8 +57,13 @@ class PeerTracker: def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) - b = dict( [ (sharenum, storageserver.WriteBucketProxy(rref)) - for sharenum, rref in buckets.iteritems() ] ) + b = {} + for sharenum, rref in buckets.iteritems(): + bp = storageserver.WriteBucketProxy(rref, self.sharesize, + self.blocksize, + self.num_segments, + self.num_share_hashes) + b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys())) @@ -129,8 +137,14 @@ class FileUploader: # responsible for handling the data and sending out the shares. peers = self._client.get_permuted_peers(self._crypttext_hash) assert peers + # TODO: eek, don't pull this from here, find a better way. gross. + num_segments = self._encoder.uri_extension_data['num_segments'] + from allmydata.util.mathutil import next_power_of_k + import math + num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1) trackers = [ PeerTracker(peerid, permutedid, conn, share_size, block_size, + num_segments, num_share_hashes, self._crypttext_hash) for permutedid, peerid, conn in peers ] self.usable_peers = set(trackers) # this set shrinks over time