From: Brian Warner Date: Fri, 10 Oct 2008 00:08:00 +0000 (-0700) Subject: storage: split WriteBucketProxy and ReadBucketProxy out into immutable/layout.py... X-Git-Url: https://git.rkrishnan.org/simplejson/components/com_hotproperty/somewhere?a=commitdiff_plain;h=288d55825c4f676e6fd631544b63a464c96702ca;p=tahoe-lafs%2Ftahoe-lafs.git storage: split WriteBucketProxy and ReadBucketProxy out into immutable/layout.py . No behavioral changes. --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index 80b9ce65..a2216bee 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -12,6 +12,7 @@ from allmydata.util.assertutil import _assert from allmydata import codec, hashtree, storage, uri from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \ IDownloadStatus, IDownloadResults +from allmydata.immutable import layout from allmydata.immutable.encode import NotEnoughSharesError from pycryptopp.cipher.aes import AES @@ -580,7 +581,7 @@ class FileDownloader: (self._responses_received, self._queries_sent)) for sharenum, bucket in buckets.iteritems(): - b = storage.ReadBucketProxy(bucket, peerid, self._si_s) + b = layout.ReadBucketProxy(bucket, peerid, self._si_s) self.add_share_bucket(sharenum, b) self._uri_extension_sources.append(b) if self._results: diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py new file mode 100644 index 00000000..9f7be655 --- /dev/null +++ b/src/allmydata/immutable/layout.py @@ -0,0 +1,301 @@ + +import struct +from zope.interface import implements +from twisted.internet import defer +from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ + FileTooLargeError, HASH_SIZE +from allmydata.util import mathutil, idlib +from allmydata.util.assertutil import _assert, precondition + + +""" +Share data is written into a single file. At the start of the file, there is +a series of four-byte big-endian offset values, which indicate where each +section starts. Each offset is measured from the beginning of the file. + +0x00: version number (=00 00 00 01) +0x04: segment size +0x08: data size +0x0c: offset of data (=00 00 00 24) +0x10: offset of plaintext_hash_tree +0x14: offset of crypttext_hash_tree +0x18: offset of block_hashes +0x1c: offset of share_hashes +0x20: offset of uri_extension_length + uri_extension +0x24: start of data +? : start of plaintext_hash_tree +? : start of crypttext_hash_tree +? : start of block_hashes +? : start of share_hashes + each share_hash is written as a two-byte (big-endian) hashnum + followed by the 32-byte SHA-256 hash. We only store the hashes + necessary to validate the share hash root +? : start of uri_extension_length (four-byte big-endian value) +? : start of uri_extension +""" + +def allocated_size(data_size, num_segments, num_share_hashes, + uri_extension_size): + wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, + uri_extension_size, None) + uri_extension_starts_at = wbp._offsets['uri_extension'] + return uri_extension_starts_at + 4 + uri_extension_size + +class WriteBucketProxy: + implements(IStorageBucketWriter) + def __init__(self, rref, data_size, segment_size, num_segments, + num_share_hashes, uri_extension_size, nodeid): + self._rref = rref + self._data_size = data_size + self._segment_size = segment_size + self._num_segments = num_segments + self._nodeid = nodeid + + if segment_size >= 2**32 or data_size >= 2**32: + raise FileTooLargeError("This file is too large to be uploaded (data_size).") + + effective_segments = mathutil.next_power_of_k(num_segments,2) + self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE + # how many share hashes are included in each share? This will be + # about ln2(num_shares). + self._share_hash_size = num_share_hashes * (2+HASH_SIZE) + # we commit to not sending a uri extension larger than this + self._uri_extension_size = uri_extension_size + + offsets = self._offsets = {} + x = 0x24 + offsets['data'] = x + x += data_size + offsets['plaintext_hash_tree'] = x + x += self._segment_hash_size + offsets['crypttext_hash_tree'] = x + x += self._segment_hash_size + offsets['block_hashes'] = x + x += self._segment_hash_size + offsets['share_hashes'] = x + x += self._share_hash_size + offsets['uri_extension'] = x + + if x >= 2**32: + raise FileTooLargeError("This file is too large to be uploaded (offsets).") + + offset_data = struct.pack(">LLLLLLLLL", + 1, # version number + segment_size, + data_size, + offsets['data'], + offsets['plaintext_hash_tree'], + offsets['crypttext_hash_tree'], + offsets['block_hashes'], + offsets['share_hashes'], + offsets['uri_extension'], + ) + assert len(offset_data) == 0x24 + self._offset_data = offset_data + + def __repr__(self): + if self._nodeid: + nodeid_s = idlib.nodeid_b2a(self._nodeid) + else: + nodeid_s = "[None]" + return "" % nodeid_s + + def start(self): + return self._write(0, self._offset_data) + + def put_block(self, segmentnum, data): + offset = self._offsets['data'] + segmentnum * self._segment_size + assert offset + len(data) <= self._offsets['uri_extension'] + assert isinstance(data, str) + if segmentnum < self._num_segments-1: + precondition(len(data) == self._segment_size, + len(data), self._segment_size) + else: + precondition(len(data) == (self._data_size - + (self._segment_size * + (self._num_segments - 1))), + len(data), self._segment_size) + return self._write(offset, data) + + def put_plaintext_hashes(self, hashes): + offset = self._offsets['plaintext_hash_tree'] + assert isinstance(hashes, list) + data = "".join(hashes) + precondition(len(data) == self._segment_hash_size, + len(data), self._segment_hash_size) + precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'], + offset, len(data), offset+len(data), + self._offsets['crypttext_hash_tree']) + return self._write(offset, data) + + def put_crypttext_hashes(self, hashes): + offset = self._offsets['crypttext_hash_tree'] + assert isinstance(hashes, list) + data = "".join(hashes) + precondition(len(data) == self._segment_hash_size, + len(data), self._segment_hash_size) + precondition(offset + len(data) <= self._offsets['block_hashes'], + offset, len(data), offset+len(data), + self._offsets['block_hashes']) + return self._write(offset, data) + + def put_block_hashes(self, blockhashes): + offset = self._offsets['block_hashes'] + assert isinstance(blockhashes, list) + data = "".join(blockhashes) + precondition(len(data) == self._segment_hash_size, + len(data), self._segment_hash_size) + precondition(offset + len(data) <= self._offsets['share_hashes'], + offset, len(data), offset+len(data), + self._offsets['share_hashes']) + return self._write(offset, data) + + def put_share_hashes(self, sharehashes): + # sharehashes is a list of (index, hash) tuples, so they get stored + # as 2+32=34 bytes each + offset = self._offsets['share_hashes'] + assert isinstance(sharehashes, list) + data = "".join([struct.pack(">H", hashnum) + hashvalue + for hashnum,hashvalue in sharehashes]) + precondition(len(data) == self._share_hash_size, + len(data), self._share_hash_size) + precondition(offset + len(data) <= self._offsets['uri_extension'], + offset, len(data), offset+len(data), + self._offsets['uri_extension']) + return self._write(offset, data) + + def put_uri_extension(self, data): + offset = self._offsets['uri_extension'] + assert isinstance(data, str) + precondition(len(data) <= self._uri_extension_size, + len(data), self._uri_extension_size) + length = struct.pack(">L", len(data)) + return self._write(offset, length+data) + + def _write(self, offset, data): + # TODO: for small shares, buffer the writes and do just a single call + return self._rref.callRemote("write", offset, data) + + def close(self): + return self._rref.callRemote("close") + + def abort(self): + return self._rref.callRemoteOnly("abort") + +class ReadBucketProxy: + implements(IStorageBucketReader) + def __init__(self, rref, peerid=None, storage_index_s=None): + self._rref = rref + self._peerid = peerid + self._si_s = storage_index_s + self._started = False + + def get_peerid(self): + return self._peerid + + def __repr__(self): + peerid_s = idlib.shortnodeid_b2a(self._peerid) + return "" % (peerid_s, + self._si_s) + + def startIfNecessary(self): + if self._started: + return defer.succeed(self) + d = self.start() + d.addCallback(lambda res: self) + return d + + def start(self): + # TODO: for small shares, read the whole bucket in start() + d = self._read(0, 0x24) + d.addCallback(self._parse_offsets) + def _started(res): + self._started = True + return res + d.addCallback(_started) + return d + + def _parse_offsets(self, data): + precondition(len(data) == 0x24) + self._offsets = {} + (version, self._segment_size, self._data_size) = \ + struct.unpack(">LLL", data[0:0xc]) + _assert(version == 1) + x = 0x0c + for field in ( 'data', + 'plaintext_hash_tree', + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension', + ): + offset = struct.unpack(">L", data[x:x+4])[0] + x += 4 + self._offsets[field] = offset + return self._offsets + + def get_block(self, blocknum): + num_segments = mathutil.div_ceil(self._data_size, self._segment_size) + if blocknum < num_segments-1: + size = self._segment_size + else: + size = self._data_size % self._segment_size + if size == 0: + size = self._segment_size + offset = self._offsets['data'] + blocknum * self._segment_size + return self._read(offset, size) + + def _str2l(self, s): + """ split string (pulled from storage) into a list of blockids """ + return [ s[i:i+HASH_SIZE] + for i in range(0, len(s), HASH_SIZE) ] + + def get_plaintext_hashes(self): + offset = self._offsets['plaintext_hash_tree'] + size = self._offsets['crypttext_hash_tree'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + + def get_crypttext_hashes(self): + offset = self._offsets['crypttext_hash_tree'] + size = self._offsets['block_hashes'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + + def get_block_hashes(self): + offset = self._offsets['block_hashes'] + size = self._offsets['share_hashes'] - offset + d = self._read(offset, size) + d.addCallback(self._str2l) + return d + + def get_share_hashes(self): + offset = self._offsets['share_hashes'] + size = self._offsets['uri_extension'] - offset + assert size % (2+HASH_SIZE) == 0 + d = self._read(offset, size) + def _unpack_share_hashes(data): + assert len(data) == size + hashes = [] + for i in range(0, size, 2+HASH_SIZE): + hashnum = struct.unpack(">H", data[i:i+2])[0] + hashvalue = data[i+2:i+2+HASH_SIZE] + hashes.append( (hashnum, hashvalue) ) + return hashes + d.addCallback(_unpack_share_hashes) + return d + + def get_uri_extension(self): + offset = self._offsets['uri_extension'] + d = self._read(offset, 4) + def _got_length(data): + length = struct.unpack(">L", data)[0] + return self._read(offset+4, length) + d.addCallback(_got_length) + return d + + def _read(self, offset, length): + return self._rref.callRemote("read", offset, length) + diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index cf275b89..40fed09b 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -18,6 +18,7 @@ from allmydata.util import base32, idlib, mathutil from allmydata.util.assertutil import precondition from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus +from allmydata.immutable import layout from pycryptopp.cipher.aes import AES from cStringIO import StringIO @@ -76,10 +77,10 @@ class PeerTracker: self._storageserver = storage_server # to an RIStorageServer self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize - self.allocated_size = storage.allocated_size(sharesize, - num_segments, - num_share_hashes, - EXTENSION_SIZE) + self.allocated_size = layout.allocated_size(sharesize, + num_segments, + num_share_hashes, + EXTENSION_SIZE) self.blocksize = blocksize self.num_segments = num_segments @@ -109,12 +110,12 @@ class PeerTracker: #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) b = {} for sharenum, rref in buckets.iteritems(): - bp = storage.WriteBucketProxy(rref, self.sharesize, - self.blocksize, - self.num_segments, - self.num_share_hashes, - EXTENSION_SIZE, - self.peerid) + bp = layout.WriteBucketProxy(rref, self.sharesize, + self.blocksize, + self.num_segments, + self.num_share_hashes, + EXTENSION_SIZE, + self.peerid) b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys())) diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index f2a8afad..52a9f4fe 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -7,6 +7,7 @@ from foolscap import Referenceable, DeadReferenceError from foolscap.eventual import eventually from allmydata import interfaces, storage, uri from allmydata.immutable import upload +from allmydata.immutable.layout import ReadBucketProxy from allmydata.util import idlib, log, observer, fileutil, hashutil @@ -85,8 +86,7 @@ class CHKCheckerAndUEBFetcher: self.log("no readers, so no UEB", level=log.NOISY) return b,peerid = self._readers.pop() - rbp = storage.ReadBucketProxy(b, peerid, - storage.si_b2a(self._storage_index)) + rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index)) d = rbp.startIfNecessary() d.addCallback(lambda res: rbp.get_uri_extension()) d.addCallback(self._got_uri_extension) diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index 243e49d8..863e1e65 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -47,11 +47,12 @@ def dump_share(options): def dump_immutable_share(options): from allmydata import uri, storage from allmydata.util import base32 + from allmydata.immutable.layout import ReadBucketProxy out = options.stdout f = storage.ShareFile(options['filename']) # use a ReadBucketProxy to parse the bucket and find the uri extension - bp = storage.ReadBucketProxy(None) + bp = ReadBucketProxy(None) offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) seek = offsets['uri_extension'] length = struct.unpack(">L", f.read_share_data(seek, 4))[0] @@ -516,6 +517,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): from allmydata import uri, storage from allmydata.mutable.layout import unpack_share from allmydata.mutable.common import NeedMoreDataError + from allmydata.immutable.layout import ReadBucketProxy from allmydata.util import base32 import struct @@ -569,7 +571,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): sf = storage.ShareFile(abs_sharefile) # use a ReadBucketProxy to parse the bucket and find the uri extension - bp = storage.ReadBucketProxy(None) + bp = ReadBucketProxy(None) offsets = bp._parse_offsets(sf.read_share_data(0, 0x24)) seek = offsets['uri_extension'] length = struct.unpack(">L", sf.read_share_data(seek, 4))[0] @@ -689,7 +691,7 @@ def corrupt_share(options): else: # otherwise assume it's immutable f = storage.ShareFile(fn) - bp = storage.ReadBucketProxy(None) + bp = ReadBucketProxy(None) offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) start = f._data_offset + offsets["data"] end = f._data_offset + offsets["plaintext_hash_tree"] diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index b2e9685b..29b4019d 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -3,14 +3,12 @@ from distutils.version import LooseVersion from foolscap import Referenceable from twisted.application import service -from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ - RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \ - BadWriteEnablerError, IStatsProducer, FileTooLargeError -from allmydata.util import base32, fileutil, idlib, mathutil, log -from allmydata.util.assertutil import precondition, _assert + RIBucketReader, BadWriteEnablerError, IStatsProducer +from allmydata.util import base32, fileutil, idlib, log +from allmydata.util.assertutil import precondition import allmydata # for __version__ class DataTooLargeError(Exception): @@ -38,7 +36,7 @@ NUM_RE=re.compile("^[0-9]+$") # 0x00: share file version number, four bytes, current version is 1 # 0x04: share data length, four bytes big-endian = A # 0x08: number of leases, four bytes big-endian -# 0x0c: beginning of share data (described below, at WriteBucketProxy) +# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) # A+0x0c = B: first lease. Lease format is: # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner # B+0x04: renew secret, 32 bytes (SHA256) @@ -1210,295 +1208,3 @@ class StorageServer(service.MultiService, Referenceable): # the code before here runs on the storage server, not the client -# the code beyond here runs on the client, not the storage server - -""" -Share data is written into a single file. At the start of the file, there is -a series of four-byte big-endian offset values, which indicate where each -section starts. Each offset is measured from the beginning of the file. - -0x00: version number (=00 00 00 01) -0x04: segment size -0x08: data size -0x0c: offset of data (=00 00 00 24) -0x10: offset of plaintext_hash_tree -0x14: offset of crypttext_hash_tree -0x18: offset of block_hashes -0x1c: offset of share_hashes -0x20: offset of uri_extension_length + uri_extension -0x24: start of data -? : start of plaintext_hash_tree -? : start of crypttext_hash_tree -? : start of block_hashes -? : start of share_hashes - each share_hash is written as a two-byte (big-endian) hashnum - followed by the 32-byte SHA-256 hash. We only store the hashes - necessary to validate the share hash root -? : start of uri_extension_length (four-byte big-endian value) -? : start of uri_extension -""" - -def allocated_size(data_size, num_segments, num_share_hashes, - uri_extension_size): - wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, - uri_extension_size, None) - uri_extension_starts_at = wbp._offsets['uri_extension'] - return uri_extension_starts_at + 4 + uri_extension_size - -class WriteBucketProxy: - implements(IStorageBucketWriter) - def __init__(self, rref, data_size, segment_size, num_segments, - num_share_hashes, uri_extension_size, nodeid): - self._rref = rref - self._data_size = data_size - self._segment_size = segment_size - self._num_segments = num_segments - self._nodeid = nodeid - - if segment_size >= 2**32 or data_size >= 2**32: - raise FileTooLargeError("This file is too large to be uploaded (data_size).") - - effective_segments = mathutil.next_power_of_k(num_segments,2) - self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE - # how many share hashes are included in each share? This will be - # about ln2(num_shares). - self._share_hash_size = num_share_hashes * (2+HASH_SIZE) - # we commit to not sending a uri extension larger than this - self._uri_extension_size = uri_extension_size - - offsets = self._offsets = {} - x = 0x24 - offsets['data'] = x - x += data_size - offsets['plaintext_hash_tree'] = x - x += self._segment_hash_size - offsets['crypttext_hash_tree'] = x - x += self._segment_hash_size - offsets['block_hashes'] = x - x += self._segment_hash_size - offsets['share_hashes'] = x - x += self._share_hash_size - offsets['uri_extension'] = x - - if x >= 2**32: - raise FileTooLargeError("This file is too large to be uploaded (offsets).") - - offset_data = struct.pack(">LLLLLLLLL", - 1, # version number - segment_size, - data_size, - offsets['data'], - offsets['plaintext_hash_tree'], - offsets['crypttext_hash_tree'], - offsets['block_hashes'], - offsets['share_hashes'], - offsets['uri_extension'], - ) - assert len(offset_data) == 0x24 - self._offset_data = offset_data - - def __repr__(self): - if self._nodeid: - nodeid_s = idlib.nodeid_b2a(self._nodeid) - else: - nodeid_s = "[None]" - return "" % nodeid_s - - def start(self): - return self._write(0, self._offset_data) - - def put_block(self, segmentnum, data): - offset = self._offsets['data'] + segmentnum * self._segment_size - assert offset + len(data) <= self._offsets['uri_extension'] - assert isinstance(data, str) - if segmentnum < self._num_segments-1: - precondition(len(data) == self._segment_size, - len(data), self._segment_size) - else: - precondition(len(data) == (self._data_size - - (self._segment_size * - (self._num_segments - 1))), - len(data), self._segment_size) - return self._write(offset, data) - - def put_plaintext_hashes(self, hashes): - offset = self._offsets['plaintext_hash_tree'] - assert isinstance(hashes, list) - data = "".join(hashes) - precondition(len(data) == self._segment_hash_size, - len(data), self._segment_hash_size) - precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'], - offset, len(data), offset+len(data), - self._offsets['crypttext_hash_tree']) - return self._write(offset, data) - - def put_crypttext_hashes(self, hashes): - offset = self._offsets['crypttext_hash_tree'] - assert isinstance(hashes, list) - data = "".join(hashes) - precondition(len(data) == self._segment_hash_size, - len(data), self._segment_hash_size) - precondition(offset + len(data) <= self._offsets['block_hashes'], - offset, len(data), offset+len(data), - self._offsets['block_hashes']) - return self._write(offset, data) - - def put_block_hashes(self, blockhashes): - offset = self._offsets['block_hashes'] - assert isinstance(blockhashes, list) - data = "".join(blockhashes) - precondition(len(data) == self._segment_hash_size, - len(data), self._segment_hash_size) - precondition(offset + len(data) <= self._offsets['share_hashes'], - offset, len(data), offset+len(data), - self._offsets['share_hashes']) - return self._write(offset, data) - - def put_share_hashes(self, sharehashes): - # sharehashes is a list of (index, hash) tuples, so they get stored - # as 2+32=34 bytes each - offset = self._offsets['share_hashes'] - assert isinstance(sharehashes, list) - data = "".join([struct.pack(">H", hashnum) + hashvalue - for hashnum,hashvalue in sharehashes]) - precondition(len(data) == self._share_hash_size, - len(data), self._share_hash_size) - precondition(offset + len(data) <= self._offsets['uri_extension'], - offset, len(data), offset+len(data), - self._offsets['uri_extension']) - return self._write(offset, data) - - def put_uri_extension(self, data): - offset = self._offsets['uri_extension'] - assert isinstance(data, str) - precondition(len(data) <= self._uri_extension_size, - len(data), self._uri_extension_size) - length = struct.pack(">L", len(data)) - return self._write(offset, length+data) - - def _write(self, offset, data): - # TODO: for small shares, buffer the writes and do just a single call - return self._rref.callRemote("write", offset, data) - - def close(self): - return self._rref.callRemote("close") - - def abort(self): - return self._rref.callRemoteOnly("abort") - -class ReadBucketProxy: - implements(IStorageBucketReader) - def __init__(self, rref, peerid=None, storage_index_s=None): - self._rref = rref - self._peerid = peerid - self._si_s = storage_index_s - self._started = False - - def get_peerid(self): - return self._peerid - - def __repr__(self): - peerid_s = idlib.shortnodeid_b2a(self._peerid) - return "" % (peerid_s, - self._si_s) - - def startIfNecessary(self): - if self._started: - return defer.succeed(self) - d = self.start() - d.addCallback(lambda res: self) - return d - - def start(self): - # TODO: for small shares, read the whole bucket in start() - d = self._read(0, 0x24) - d.addCallback(self._parse_offsets) - def _started(res): - self._started = True - return res - d.addCallback(_started) - return d - - def _parse_offsets(self, data): - precondition(len(data) == 0x24) - self._offsets = {} - (version, self._segment_size, self._data_size) = \ - struct.unpack(">LLL", data[0:0xc]) - _assert(version == 1) - x = 0x0c - for field in ( 'data', - 'plaintext_hash_tree', - 'crypttext_hash_tree', - 'block_hashes', - 'share_hashes', - 'uri_extension', - ): - offset = struct.unpack(">L", data[x:x+4])[0] - x += 4 - self._offsets[field] = offset - return self._offsets - - def get_block(self, blocknum): - num_segments = mathutil.div_ceil(self._data_size, self._segment_size) - if blocknum < num_segments-1: - size = self._segment_size - else: - size = self._data_size % self._segment_size - if size == 0: - size = self._segment_size - offset = self._offsets['data'] + blocknum * self._segment_size - return self._read(offset, size) - - def _str2l(self, s): - """ split string (pulled from storage) into a list of blockids """ - return [ s[i:i+HASH_SIZE] - for i in range(0, len(s), HASH_SIZE) ] - - def get_plaintext_hashes(self): - offset = self._offsets['plaintext_hash_tree'] - size = self._offsets['crypttext_hash_tree'] - offset - d = self._read(offset, size) - d.addCallback(self._str2l) - return d - - def get_crypttext_hashes(self): - offset = self._offsets['crypttext_hash_tree'] - size = self._offsets['block_hashes'] - offset - d = self._read(offset, size) - d.addCallback(self._str2l) - return d - - def get_block_hashes(self): - offset = self._offsets['block_hashes'] - size = self._offsets['share_hashes'] - offset - d = self._read(offset, size) - d.addCallback(self._str2l) - return d - - def get_share_hashes(self): - offset = self._offsets['share_hashes'] - size = self._offsets['uri_extension'] - offset - assert size % (2+HASH_SIZE) == 0 - d = self._read(offset, size) - def _unpack_share_hashes(data): - assert len(data) == size - hashes = [] - for i in range(0, size, 2+HASH_SIZE): - hashnum = struct.unpack(">H", data[i:i+2])[0] - hashvalue = data[i+2:i+2+HASH_SIZE] - hashes.append( (hashnum, hashvalue) ) - return hashes - d.addCallback(_unpack_share_hashes) - return d - - def get_uri_extension(self): - offset = self._offsets['uri_extension'] - d = self._read(offset, 4) - def _got_length(data): - length = struct.unpack(">L", data)[0] - return self._read(offset+4, length) - d.addCallback(_got_length) - return d - - def _read(self, offset, length): - return self._rref.callRemote("read", offset, length) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 71626252..70db7ab7 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -7,8 +7,9 @@ import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil from allmydata.storage import BucketWriter, BucketReader, \ - WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \ + StorageServer, MutableShareFile, \ storage_index_to_dir, DataTooLargeError, LeaseInfo +from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent