From: Brian Warner Date: Sat, 14 Jul 2007 00:25:45 +0000 (-0700) Subject: rename storageserver.py to just storage.py, since it has both server and client sides now X-Git-Url: https://git.rkrishnan.org/pf/content/statistics?a=commitdiff_plain;h=c6f52e379a3a5c907d60f5c6a49a7393e56d29b7;p=tahoe-lafs%2Ftahoe-lafs.git rename storageserver.py to just storage.py, since it has both server and client sides now --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 21c66f3a..d0099e3e 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -11,7 +11,7 @@ from twisted.python import log import allmydata from allmydata.Crypto.Util.number import bytes_to_long -from allmydata.storageserver import StorageServer +from allmydata.storage import StorageServer from allmydata.upload import Uploader from allmydata.download import Downloader from allmydata.webish import WebishServer diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 59649c9c..f76b13c3 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -7,7 +7,7 @@ from twisted.application import service from allmydata.util import idlib, mathutil, hashutil from allmydata.util.assertutil import _assert -from allmydata import codec, hashtree, storageserver, uri +from allmydata import codec, hashtree, storage, uri from allmydata.Crypto.Cipher import AES from allmydata.interfaces import IDownloadTarget, IDownloader from allmydata.encode import NotEnoughPeersError @@ -345,7 +345,7 @@ class FileDownloader: def _got_response(self, buckets, connection): _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint for sharenum, bucket in buckets.iteritems(): - b = storageserver.ReadBucketProxy(bucket) + b = storage.ReadBucketProxy(bucket) self.add_share_bucket(sharenum, b) self._uri_extension_sources.append(b) diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index d6a53885..773c6748 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -43,12 +43,12 @@ class DumpDirnodeOptions(BasedirMixin, usage.Options): raise usage.UsageError(" parameter is required") def dump_uri_extension(config, out=sys.stdout, err=sys.stderr): - from allmydata import uri, storageserver + from allmydata import uri, storage filename = config['filename'] f = open(filename,"rb") # use a ReadBucketProxy to parse the bucket and find the uri extension - bp = storageserver.ReadBucketProxy(None) + bp = storage.ReadBucketProxy(None) offsets = bp._parse_offsets(f.read(8*4)) f.seek(offsets['uri_extension']) length = struct.unpack(">L", f.read(4))[0] diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py new file mode 100644 index 00000000..b5acdee4 --- /dev/null +++ b/src/allmydata/storage.py @@ -0,0 +1,389 @@ +import os, re, weakref, stat, struct + +from foolscap import Referenceable +from twisted.application import service +from twisted.internet import defer + +from zope.interface import implements +from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ + RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE +from allmydata.util import fileutil, idlib, mathutil +from allmydata.util.assertutil import precondition + +# store/ +# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success +# store/$STORAGEINDEX +# store/$STORAGEINDEX/$SHARENUM +# store/$STORAGEINDEX/$SHARENUM/blocksize +# store/$STORAGEINDEX/$SHARENUM/data +# store/$STORAGEINDEX/$SHARENUM/blockhashes +# store/$STORAGEINDEX/$SHARENUM/sharehashtree + +# $SHARENUM matches this regex: +NUM_RE=re.compile("[0-9]*") + +class BucketWriter(Referenceable): + implements(RIBucketWriter) + + def __init__(self, ss, incominghome, finalhome, size): + self.ss = ss + self.incominghome = incominghome + self.finalhome = finalhome + self._size = size + self.closed = False + # touch the file, so later callers will see that we're working on it + f = open(self.incominghome, 'ab') + f.close() + + def allocated_size(self): + return self._size + + def remote_write(self, offset, data): + precondition(not self.closed) + precondition(offset >= 0) + precondition(offset+len(data) <= self._size) + f = open(self.incominghome, 'ab') + f.seek(offset) + f.write(data) + f.close() + + def remote_close(self): + precondition(not self.closed) + fileutil.rename(self.incominghome, self.finalhome) + self.closed = True + filelen = os.stat(self.finalhome)[stat.ST_SIZE] + self.ss.bucket_writer_closed(self, filelen) + + +class BucketReader(Referenceable): + implements(RIBucketReader) + + def __init__(self, home): + self.home = home + + def remote_read(self, offset, length): + f = open(self.home, 'rb') + f.seek(offset) + return f.read(length) + +class StorageServer(service.MultiService, Referenceable): + implements(RIStorageServer) + name = 'storageserver' + + def __init__(self, storedir, sizelimit=None): + service.MultiService.__init__(self) + fileutil.make_dirs(storedir) + self.storedir = storedir + self.sizelimit = sizelimit + self.incomingdir = os.path.join(storedir, 'incoming') + self._clean_incomplete() + fileutil.make_dirs(self.incomingdir) + self._active_writers = weakref.WeakKeyDictionary() + + self.measure_size() + + def _clean_incomplete(self): + fileutil.rm_dir(self.incomingdir) + + def measure_size(self): + self.consumed = fileutil.du(self.storedir) + + def allocated_size(self): + space = self.consumed + for bw in self._active_writers: + space += bw.allocated_size() + return space + + def remote_allocate_buckets(self, storage_index, sharenums, allocated_size, + canary): + alreadygot = set() + bucketwriters = {} # k: shnum, v: BucketWriter + si_s = idlib.b2a(storage_index) + space_per_bucket = allocated_size + no_limits = self.sizelimit is None + yes_limits = not no_limits + if yes_limits: + remaining_space = self.sizelimit - self.allocated_size() + for shnum in sharenums: + incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum) + finalhome = os.path.join(self.storedir, si_s, "%d" % shnum) + if os.path.exists(incominghome) or os.path.exists(finalhome): + alreadygot.add(shnum) + elif no_limits or remaining_space >= space_per_bucket: + fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) + bw = BucketWriter(self, incominghome, finalhome, + space_per_bucket) + bucketwriters[shnum] = bw + self._active_writers[bw] = 1 + if yes_limits: + remaining_space -= space_per_bucket + else: + # not enough space to accept this bucket + pass + + if bucketwriters: + fileutil.make_dirs(os.path.join(self.storedir, si_s)) + + return alreadygot, bucketwriters + + def bucket_writer_closed(self, bw, consumed_size): + self.consumed += consumed_size + del self._active_writers[bw] + + def remote_get_buckets(self, storage_index): + bucketreaders = {} # k: sharenum, v: BucketReader + storagedir = os.path.join(self.storedir, idlib.b2a(storage_index)) + try: + for f in os.listdir(storagedir): + if NUM_RE.match(f): + br = BucketReader(os.path.join(storagedir, f)) + bucketreaders[int(f)] = br + except OSError: + # Commonly caused by there being no buckets at all. + pass + + return bucketreaders + +""" +Share data is written into a single file. At the start of the file, there is +a series of four-byte big-endian offset values, which indicate where each +section starts. Each offset is measured from the beginning of the file. + +0x00: segment size +0x04: data size +0x08: offset of data (=00 00 00 1c) +0x0c: offset of plaintext_hash_tree +0x10: offset of crypttext_hash_tree +0x14: offset of block_hashes +0x18: offset of share_hashes +0x1c: offset of uri_extension_length + uri_extension +0x20: start of data +? : start of plaintext_hash_tree +? : start of crypttext_hash_tree +? : start of block_hashes +? : start of share_hashes + each share_hash is written as a two-byte (big-endian) hashnum + followed by the 32-byte SHA-256 hash. We only store the hashes + necessary to validate the share hash root +? : start of uri_extension_length (four-byte big-endian value) +? : start of uri_extension +""" + +def allocated_size(data_size, num_segments, num_share_hashes, + uri_extension_size): + wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, + uri_extension_size) + uri_extension_starts_at = wbp._offsets['uri_extension'] + return uri_extension_starts_at + 4 + uri_extension_size + +class WriteBucketProxy: + implements(IStorageBucketWriter) + def __init__(self, rref, data_size, segment_size, num_segments, + num_share_hashes, uri_extension_size): + self._rref = rref + self._data_size = data_size + self._segment_size = segment_size + self._num_segments = num_segments + + self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE + # how many share hashes are included in each share? This will be + # about ln2(num_shares). + self._share_hash_size = num_share_hashes * (2+HASH_SIZE) + # we commit to not sending a uri extension larger than this + self._uri_extension_size = uri_extension_size + + offsets = self._offsets = {} + x = 0x20 + offsets['data'] = x + x += data_size + offsets['plaintext_hash_tree'] = x + x += self._segment_hash_size + offsets['crypttext_hash_tree'] = x + x += self._segment_hash_size + offsets['block_hashes'] = x + x += self._segment_hash_size + offsets['share_hashes'] = x + x += self._share_hash_size + offsets['uri_extension'] = x + + offset_data = struct.pack(">LLLLLLLL", + segment_size, + data_size, + offsets['data'], + offsets['plaintext_hash_tree'], + offsets['crypttext_hash_tree'], + offsets['block_hashes'], + offsets['share_hashes'], + offsets['uri_extension'], + ) + assert len(offset_data) == 8*4 + self._offset_data = offset_data + + def start(self): + return self._write(0, self._offset_data) + + def put_block(self, segmentnum, data): + offset = self._offsets['data'] + segmentnum * self._segment_size + assert offset + len(data) <= self._offsets['uri_extension'] + assert isinstance(data, str) + if segmentnum < self._num_segments-1: + precondition(len(data) == self._segment_size, + len(data), self._segment_size) + else: + precondition(len(data) == (self._data_size - + (self._segment_size * + (self._num_segments - 1))), + len(data), self._segment_size) + return self._write(offset, data) + + def put_plaintext_hashes(self, hashes): + offset = self._offsets['plaintext_hash_tree'] + assert isinstance(hashes, list) + data = "".join(hashes) + assert len(data) == self._segment_hash_size + assert offset + len(data) <= self._offsets['crypttext_hash_tree'] + return self._write(offset, data) + + def put_crypttext_hashes(self, hashes): + offset = self._offsets['crypttext_hash_tree'] + assert isinstance(hashes, list) + data = "".join(hashes) + assert len(data) == self._segment_hash_size + assert offset + len(data) <= self._offsets['block_hashes'] + return self._write(offset, data) + + def put_block_hashes(self, blockhashes): + offset = self._offsets['block_hashes'] + assert isinstance(blockhashes, list) + data = "".join(blockhashes) + assert len(data) == self._segment_hash_size + assert offset + len(data) <= self._offsets['share_hashes'] + return self._write(offset, data) + + def put_share_hashes(self, sharehashes): + # sharehashes is a list of (index, hash) tuples, so they get stored + # as 2+32=34 bytes each + offset = self._offsets['share_hashes'] + assert isinstance(sharehashes, list) + data = "".join([struct.pack(">H", hashnum) + hashvalue + for hashnum,hashvalue in sharehashes]) + precondition(len(data) == self._share_hash_size, + len(data), self._share_hash_size) + assert offset + len(data) <= self._offsets['uri_extension'] + return self._write(offset, data) + + def put_uri_extension(self, data): + offset = self._offsets['uri_extension'] + assert isinstance(data, str) + assert len(data) <= self._uri_extension_size + length = struct.pack(">L", len(data)) + return self._write(offset, length+data) + + def _write(self, offset, data): + # TODO: for small shares, buffer the writes and do just a single call + return self._rref.callRemote("write", offset, data) + + def close(self): + return self._rref.callRemote("close") + +class ReadBucketProxy: + implements(IStorageBucketReader) + def __init__(self, rref): + self._rref = rref + self._started = False + + def startIfNecessary(self): + if self._started: + return defer.succeed(self) + d = self.start() + d.addCallback(lambda res: self) + return d + + def start(self): + # TODO: for small shares, read the whole bucket in start() + d = self._read(0, 8*4) + d.addCallback(self._parse_offsets) + return d + + def _parse_offsets(self, data): + precondition(len(data) == 8*4) + self._offsets = {} + self._segment_size = struct.unpack(">L", data[0:4])[0] + self._data_size = struct.unpack(">L", data[4:8])[0] + x = 0x08 + for field in ( 'data', + 'plaintext_hash_tree', + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension', + ): + offset = struct.unpack(">L", data[x:x+4])[0] + x += 4 + self._offsets[field] = offset + return self._offsets + + def get_block(self, blocknum): + num_segments = mathutil.div_ceil(self._data_size, self._segment_size) + if blocknum < num_segments-1: + size = self._segment_size + else: + size = self._data_size % self._segment_size + if size == 0: + size = self._segment_size + offset = self._offsets['data'] + blocknum * self._segment_size + return self._read(offset, size) + + def _str2l(self, s): + """ split string (pulled from storage) into a list of blockids """ + return [ s[i:i+HASH_SIZE] + for i in range(0, len(s), HASH_SIZE) ] + + def get_plaintext_hashes(self): + offset = self._offsets['plaintext_hash_tree'] + size = self._offsets['crypttext_hash_tree'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + + def get_crypttext_hashes(self): + offset = self._offsets['crypttext_hash_tree'] + size = self._offsets['block_hashes'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + + def get_block_hashes(self): + offset = self._offsets['block_hashes'] + size = self._offsets['share_hashes'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + + def get_share_hashes(self): + offset = self._offsets['share_hashes'] + size = self._offsets['uri_extension'] - offset + assert size % (2+HASH_SIZE) == 0 + d = self._read(offset, size) + def _unpack_share_hashes(data): + assert len(data) == size + hashes = [] + for i in range(0, size, 2+HASH_SIZE): + hashnum = struct.unpack(">H", data[i:i+2])[0] + hashvalue = data[i+2:i+2+HASH_SIZE] + hashes.append( (hashnum, hashvalue) ) + return hashes + d.addCallback(_unpack_share_hashes) + return d + + def get_uri_extension(self): + offset = self._offsets['uri_extension'] + d = self._read(offset, 4) + def _got_length(data): + length = struct.unpack(">L", data)[0] + return self._read(offset+4, length) + d.addCallback(_got_length) + return d + + def _read(self, offset, length): + return self._rref.callRemote("read", offset, length) diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py deleted file mode 100644 index b5acdee4..00000000 --- a/src/allmydata/storageserver.py +++ /dev/null @@ -1,389 +0,0 @@ -import os, re, weakref, stat, struct - -from foolscap import Referenceable -from twisted.application import service -from twisted.internet import defer - -from zope.interface import implements -from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ - RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE -from allmydata.util import fileutil, idlib, mathutil -from allmydata.util.assertutil import precondition - -# store/ -# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success -# store/$STORAGEINDEX -# store/$STORAGEINDEX/$SHARENUM -# store/$STORAGEINDEX/$SHARENUM/blocksize -# store/$STORAGEINDEX/$SHARENUM/data -# store/$STORAGEINDEX/$SHARENUM/blockhashes -# store/$STORAGEINDEX/$SHARENUM/sharehashtree - -# $SHARENUM matches this regex: -NUM_RE=re.compile("[0-9]*") - -class BucketWriter(Referenceable): - implements(RIBucketWriter) - - def __init__(self, ss, incominghome, finalhome, size): - self.ss = ss - self.incominghome = incominghome - self.finalhome = finalhome - self._size = size - self.closed = False - # touch the file, so later callers will see that we're working on it - f = open(self.incominghome, 'ab') - f.close() - - def allocated_size(self): - return self._size - - def remote_write(self, offset, data): - precondition(not self.closed) - precondition(offset >= 0) - precondition(offset+len(data) <= self._size) - f = open(self.incominghome, 'ab') - f.seek(offset) - f.write(data) - f.close() - - def remote_close(self): - precondition(not self.closed) - fileutil.rename(self.incominghome, self.finalhome) - self.closed = True - filelen = os.stat(self.finalhome)[stat.ST_SIZE] - self.ss.bucket_writer_closed(self, filelen) - - -class BucketReader(Referenceable): - implements(RIBucketReader) - - def __init__(self, home): - self.home = home - - def remote_read(self, offset, length): - f = open(self.home, 'rb') - f.seek(offset) - return f.read(length) - -class StorageServer(service.MultiService, Referenceable): - implements(RIStorageServer) - name = 'storageserver' - - def __init__(self, storedir, sizelimit=None): - service.MultiService.__init__(self) - fileutil.make_dirs(storedir) - self.storedir = storedir - self.sizelimit = sizelimit - self.incomingdir = os.path.join(storedir, 'incoming') - self._clean_incomplete() - fileutil.make_dirs(self.incomingdir) - self._active_writers = weakref.WeakKeyDictionary() - - self.measure_size() - - def _clean_incomplete(self): - fileutil.rm_dir(self.incomingdir) - - def measure_size(self): - self.consumed = fileutil.du(self.storedir) - - def allocated_size(self): - space = self.consumed - for bw in self._active_writers: - space += bw.allocated_size() - return space - - def remote_allocate_buckets(self, storage_index, sharenums, allocated_size, - canary): - alreadygot = set() - bucketwriters = {} # k: shnum, v: BucketWriter - si_s = idlib.b2a(storage_index) - space_per_bucket = allocated_size - no_limits = self.sizelimit is None - yes_limits = not no_limits - if yes_limits: - remaining_space = self.sizelimit - self.allocated_size() - for shnum in sharenums: - incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum) - finalhome = os.path.join(self.storedir, si_s, "%d" % shnum) - if os.path.exists(incominghome) or os.path.exists(finalhome): - alreadygot.add(shnum) - elif no_limits or remaining_space >= space_per_bucket: - fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) - bw = BucketWriter(self, incominghome, finalhome, - space_per_bucket) - bucketwriters[shnum] = bw - self._active_writers[bw] = 1 - if yes_limits: - remaining_space -= space_per_bucket - else: - # not enough space to accept this bucket - pass - - if bucketwriters: - fileutil.make_dirs(os.path.join(self.storedir, si_s)) - - return alreadygot, bucketwriters - - def bucket_writer_closed(self, bw, consumed_size): - self.consumed += consumed_size - del self._active_writers[bw] - - def remote_get_buckets(self, storage_index): - bucketreaders = {} # k: sharenum, v: BucketReader - storagedir = os.path.join(self.storedir, idlib.b2a(storage_index)) - try: - for f in os.listdir(storagedir): - if NUM_RE.match(f): - br = BucketReader(os.path.join(storagedir, f)) - bucketreaders[int(f)] = br - except OSError: - # Commonly caused by there being no buckets at all. - pass - - return bucketreaders - -""" -Share data is written into a single file. At the start of the file, there is -a series of four-byte big-endian offset values, which indicate where each -section starts. Each offset is measured from the beginning of the file. - -0x00: segment size -0x04: data size -0x08: offset of data (=00 00 00 1c) -0x0c: offset of plaintext_hash_tree -0x10: offset of crypttext_hash_tree -0x14: offset of block_hashes -0x18: offset of share_hashes -0x1c: offset of uri_extension_length + uri_extension -0x20: start of data -? : start of plaintext_hash_tree -? : start of crypttext_hash_tree -? : start of block_hashes -? : start of share_hashes - each share_hash is written as a two-byte (big-endian) hashnum - followed by the 32-byte SHA-256 hash. We only store the hashes - necessary to validate the share hash root -? : start of uri_extension_length (four-byte big-endian value) -? : start of uri_extension -""" - -def allocated_size(data_size, num_segments, num_share_hashes, - uri_extension_size): - wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, - uri_extension_size) - uri_extension_starts_at = wbp._offsets['uri_extension'] - return uri_extension_starts_at + 4 + uri_extension_size - -class WriteBucketProxy: - implements(IStorageBucketWriter) - def __init__(self, rref, data_size, segment_size, num_segments, - num_share_hashes, uri_extension_size): - self._rref = rref - self._data_size = data_size - self._segment_size = segment_size - self._num_segments = num_segments - - self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE - # how many share hashes are included in each share? This will be - # about ln2(num_shares). - self._share_hash_size = num_share_hashes * (2+HASH_SIZE) - # we commit to not sending a uri extension larger than this - self._uri_extension_size = uri_extension_size - - offsets = self._offsets = {} - x = 0x20 - offsets['data'] = x - x += data_size - offsets['plaintext_hash_tree'] = x - x += self._segment_hash_size - offsets['crypttext_hash_tree'] = x - x += self._segment_hash_size - offsets['block_hashes'] = x - x += self._segment_hash_size - offsets['share_hashes'] = x - x += self._share_hash_size - offsets['uri_extension'] = x - - offset_data = struct.pack(">LLLLLLLL", - segment_size, - data_size, - offsets['data'], - offsets['plaintext_hash_tree'], - offsets['crypttext_hash_tree'], - offsets['block_hashes'], - offsets['share_hashes'], - offsets['uri_extension'], - ) - assert len(offset_data) == 8*4 - self._offset_data = offset_data - - def start(self): - return self._write(0, self._offset_data) - - def put_block(self, segmentnum, data): - offset = self._offsets['data'] + segmentnum * self._segment_size - assert offset + len(data) <= self._offsets['uri_extension'] - assert isinstance(data, str) - if segmentnum < self._num_segments-1: - precondition(len(data) == self._segment_size, - len(data), self._segment_size) - else: - precondition(len(data) == (self._data_size - - (self._segment_size * - (self._num_segments - 1))), - len(data), self._segment_size) - return self._write(offset, data) - - def put_plaintext_hashes(self, hashes): - offset = self._offsets['plaintext_hash_tree'] - assert isinstance(hashes, list) - data = "".join(hashes) - assert len(data) == self._segment_hash_size - assert offset + len(data) <= self._offsets['crypttext_hash_tree'] - return self._write(offset, data) - - def put_crypttext_hashes(self, hashes): - offset = self._offsets['crypttext_hash_tree'] - assert isinstance(hashes, list) - data = "".join(hashes) - assert len(data) == self._segment_hash_size - assert offset + len(data) <= self._offsets['block_hashes'] - return self._write(offset, data) - - def put_block_hashes(self, blockhashes): - offset = self._offsets['block_hashes'] - assert isinstance(blockhashes, list) - data = "".join(blockhashes) - assert len(data) == self._segment_hash_size - assert offset + len(data) <= self._offsets['share_hashes'] - return self._write(offset, data) - - def put_share_hashes(self, sharehashes): - # sharehashes is a list of (index, hash) tuples, so they get stored - # as 2+32=34 bytes each - offset = self._offsets['share_hashes'] - assert isinstance(sharehashes, list) - data = "".join([struct.pack(">H", hashnum) + hashvalue - for hashnum,hashvalue in sharehashes]) - precondition(len(data) == self._share_hash_size, - len(data), self._share_hash_size) - assert offset + len(data) <= self._offsets['uri_extension'] - return self._write(offset, data) - - def put_uri_extension(self, data): - offset = self._offsets['uri_extension'] - assert isinstance(data, str) - assert len(data) <= self._uri_extension_size - length = struct.pack(">L", len(data)) - return self._write(offset, length+data) - - def _write(self, offset, data): - # TODO: for small shares, buffer the writes and do just a single call - return self._rref.callRemote("write", offset, data) - - def close(self): - return self._rref.callRemote("close") - -class ReadBucketProxy: - implements(IStorageBucketReader) - def __init__(self, rref): - self._rref = rref - self._started = False - - def startIfNecessary(self): - if self._started: - return defer.succeed(self) - d = self.start() - d.addCallback(lambda res: self) - return d - - def start(self): - # TODO: for small shares, read the whole bucket in start() - d = self._read(0, 8*4) - d.addCallback(self._parse_offsets) - return d - - def _parse_offsets(self, data): - precondition(len(data) == 8*4) - self._offsets = {} - self._segment_size = struct.unpack(">L", data[0:4])[0] - self._data_size = struct.unpack(">L", data[4:8])[0] - x = 0x08 - for field in ( 'data', - 'plaintext_hash_tree', - 'crypttext_hash_tree', - 'block_hashes', - 'share_hashes', - 'uri_extension', - ): - offset = struct.unpack(">L", data[x:x+4])[0] - x += 4 - self._offsets[field] = offset - return self._offsets - - def get_block(self, blocknum): - num_segments = mathutil.div_ceil(self._data_size, self._segment_size) - if blocknum < num_segments-1: - size = self._segment_size - else: - size = self._data_size % self._segment_size - if size == 0: - size = self._segment_size - offset = self._offsets['data'] + blocknum * self._segment_size - return self._read(offset, size) - - def _str2l(self, s): - """ split string (pulled from storage) into a list of blockids """ - return [ s[i:i+HASH_SIZE] - for i in range(0, len(s), HASH_SIZE) ] - - def get_plaintext_hashes(self): - offset = self._offsets['plaintext_hash_tree'] - size = self._offsets['crypttext_hash_tree'] - offset - d = self._read(offset, size) - d.addCallback(self._str2l) - return d - - def get_crypttext_hashes(self): - offset = self._offsets['crypttext_hash_tree'] - size = self._offsets['block_hashes'] - offset - d = self._read(offset, size) - d.addCallback(self._str2l) - return d - - def get_block_hashes(self): - offset = self._offsets['block_hashes'] - size = self._offsets['share_hashes'] - offset - d = self._read(offset, size) - d.addCallback(self._str2l) - return d - - def get_share_hashes(self): - offset = self._offsets['share_hashes'] - size = self._offsets['uri_extension'] - offset - assert size % (2+HASH_SIZE) == 0 - d = self._read(offset, size) - def _unpack_share_hashes(data): - assert len(data) == size - hashes = [] - for i in range(0, size, 2+HASH_SIZE): - hashnum = struct.unpack(">H", data[i:i+2])[0] - hashvalue = data[i+2:i+2+HASH_SIZE] - hashes.append( (hashnum, hashvalue) ) - return hashes - d.addCallback(_unpack_share_hashes) - return d - - def get_uri_extension(self): - offset = self._offsets['uri_extension'] - d = self._read(offset, 4) - def _got_length(data): - length = struct.unpack(">L", data)[0] - return self._read(offset+4, length) - d.addCallback(_got_length) - return d - - def _read(self, offset, length): - return self._rref.callRemote("read", offset, length) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 01e282a8..d58b064b 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -7,7 +7,7 @@ from foolscap import Referenceable import os.path from allmydata import interfaces from allmydata.util import fileutil, hashutil -from allmydata.storageserver import BucketWriter, BucketReader, \ +from allmydata.storage import BucketWriter, BucketReader, \ WriteBucketProxy, ReadBucketProxy, StorageServer diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 404b86de..bebb4649 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -5,7 +5,7 @@ from twisted.application import service from foolscap import Referenceable from allmydata.util import idlib, hashutil -from allmydata import encode, storageserver, hashtree +from allmydata import encode, storage, hashtree from allmydata.uri import pack_uri, pack_lit from allmydata.interfaces import IUploadable, IUploader from allmydata.Crypto.Cipher import AES @@ -39,10 +39,10 @@ class PeerTracker: self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize #print "PeerTracker", peerid, permutedid, sharesize - as = storageserver.allocated_size(sharesize, - num_segments, - num_share_hashes, - EXTENSION_SIZE) + as = storage.allocated_size(sharesize, + num_segments, + num_share_hashes, + EXTENSION_SIZE) self.allocated_size = as self.blocksize = blocksize @@ -74,11 +74,11 @@ class PeerTracker: #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) b = {} for sharenum, rref in buckets.iteritems(): - bp = storageserver.WriteBucketProxy(rref, self.sharesize, - self.blocksize, - self.num_segments, - self.num_share_hashes, - EXTENSION_SIZE) + bp = storage.WriteBucketProxy(rref, self.sharesize, + self.blocksize, + self.num_segments, + self.num_share_hashes, + EXTENSION_SIZE) b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys()))