From: Brian Warner Date: Fri, 13 Jul 2007 22:09:01 +0000 (-0700) Subject: more #85 work, system test still fails X-Git-Url: https://git.rkrishnan.org/class-simplejson.JSONEncoder-index.html?a=commitdiff_plain;h=1f8e407d9cda19ed65a42f98cb0869f213ef9705;p=tahoe-lafs%2Ftahoe-lafs.git more #85 work, system test still fails --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index a36b71a6..f3af448b 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -162,10 +162,7 @@ class Encoder(object): def set_shareholders(self, landlords): assert isinstance(landlords, dict) for k in landlords: - # it would be nice to: - #assert RIBucketWriter.providedBy(landlords[k]) - assert IStorageBucketWriter(landlords[k]) - pass + assert IStorageBucketWriter.providedBy(landlords[k]) self.landlords = landlords.copy() def start(self): diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 9a1f2145..979a4e32 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -80,7 +80,7 @@ class RIBucketReader(RemoteInterface): class RIStorageServer(RemoteInterface): def allocate_buckets(storage_index=StorageIndex, sharenums=SetOf(int, maxLength=MAX_BUCKETS), - sharesize=int, blocksize=int, canary=Referenceable): + allocated_size=int, canary=Referenceable): """ @param canary: If the canary is lost before close(), the bucket is deleted. @return: tuple of (alreadygot, allocated), where alreadygot is what we diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index 4cc53e13..74b4c894 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -95,12 +95,12 @@ class StorageServer(service.MultiService, Referenceable): space += bw.allocated_size() return space - def remote_allocate_buckets(self, storage_index, sharenums, sharesize, + def remote_allocate_buckets(self, storage_index, sharenums, allocated_size, canary): alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter si_s = idlib.b2a(storage_index) - space_per_bucket = sharesize + space_per_bucket = allocated_size no_limits = self.sizelimit is None yes_limits = not no_limits if yes_limits: @@ -169,18 +169,28 @@ section starts. Each offset is measured from the beginning of the file. start of uri_extension """ +def allocated_size(data_size, num_segments, num_share_hashes, + uri_extension_size): + wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, + uri_extension_size) + uri_extension_starts_at = wbp._offsets['uri_extension'] + return uri_extension_starts_at + 4 + uri_extension_size + class WriteBucketProxy: implements(IStorageBucketWriter) def __init__(self, rref, data_size, segment_size, num_segments, - num_share_hashes): + num_share_hashes, uri_extension_size): self._rref = rref self._segment_size = segment_size + self._num_segments = num_segments HASH_SIZE = interfaces.HASH_SIZE self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE # how many share hashes are included in each share? This will be # about ln2(num_shares). self._share_hash_size = num_share_hashes * (2+HASH_SIZE) + # we commit to not sending a uri extension larger than this + self._uri_extension_size = uri_extension_size offsets = self._offsets = {} x = 0x1c @@ -215,10 +225,12 @@ class WriteBucketProxy: offset = self._offsets['data'] + segmentnum * self._segment_size assert offset + len(data) <= self._offsets['uri_extension'] assert isinstance(data, str) - if segmentnum < self._segment_size-1: - assert len(data) == self._segment_size + if segmentnum < self._num_segments-1: + precondition(len(data) == self._segment_size, + len(data), self._segment_size) else: - assert len(data) <= self._segment_size + precondition(len(data) <= self._segment_size, + len(data), self._segment_size) return self._write(offset, data) def put_plaintext_hashes(self, hashes): @@ -252,13 +264,15 @@ class WriteBucketProxy: assert isinstance(sharehashes, list) data = "".join([struct.pack(">H", hashnum) + hashvalue for hashnum,hashvalue in sharehashes]) - assert len(data) == self._share_hash_size + precondition(len(data) == self._share_hash_size, + len(data), self._share_hash_size) assert offset + len(data) <= self._offsets['uri_extension'] return self._write(offset, data) def put_uri_extension(self, data): offset = self._offsets['uri_extension'] assert isinstance(data, str) + assert len(data) <= self._uri_extension_size length = struct.pack(">L", len(data)) return self._write(offset, length+data) @@ -273,6 +287,7 @@ class ReadBucketProxy: implements(IStorageBucketReader) def __init__(self, rref): self._rref = rref + self._started = False def startIfNecessary(self): if self._started: diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index d4a42d0f..a70bfd6e 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -3,7 +3,6 @@ from zope.interface import implements from twisted.trial import unittest from twisted.internet import defer from twisted.python.failure import Failure -from foolscap import eventual from allmydata import encode, download, hashtree from allmydata.util import hashutil from allmydata.uri import pack_uri @@ -11,45 +10,13 @@ from allmydata.Crypto.Cipher import AES from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader from cStringIO import StringIO -class FakePeer: - def __init__(self, mode="good"): - self.ss = FakeStorageServer(mode) - - def callRemote(self, methname, *args, **kwargs): - def _call(): - meth = getattr(self, methname) - return meth(*args, **kwargs) - return defer.maybeDeferred(_call) - - def get_service(self, sname): - assert sname == "storageserver" - return self.ss - -class FakeStorageServer: - def __init__(self, mode): - self.mode = mode - def callRemote(self, methname, *args, **kwargs): - def _call(): - meth = getattr(self, methname) - return meth(*args, **kwargs) - d = eventual.fireEventually() - d.addCallback(lambda res: _call()) - return d - def allocate_buckets(self, crypttext_hash, sharenums, shareize, blocksize, canary): - if self.mode == "full": - return (set(), {},) - elif self.mode == "already got them": - return (set(sharenums), {},) - else: - return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),) - class LostPeerError(Exception): pass def flip_bit(good): # flips the last bit return good[:-1] + chr(ord(good[-1]) ^ 0x01) -class FakeBucketWriter: +class FakeBucketWriterProxy: implements(IStorageBucketWriter, IStorageBucketReader) # these are used for both reading and writing def __init__(self, mode="good"): @@ -195,7 +162,7 @@ class Encode(unittest.TestCase): shareholders = {} all_shareholders = [] for shnum in range(NUM_SHARES): - peer = FakeBucketWriter() + peer = FakeBucketWriterProxy() shareholders[shnum] = peer all_shareholders.append(peer) e.set_shareholders(shareholders) @@ -322,7 +289,7 @@ class Roundtrip(unittest.TestCase): all_peers = [] for shnum in range(NUM_SHARES): mode = bucket_modes.get(shnum, "good") - peer = FakeBucketWriter(mode) + peer = FakeBucketWriterProxy(mode) shareholders[shnum] = peer e.set_shareholders(shareholders) plaintext_hasher = hashutil.plaintext_hasher() diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 9f76d70f..7d6afd42 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -5,8 +5,10 @@ from twisted.application import service from twisted.internet import defer from foolscap import Referenceable import os.path -from allmydata import storageserver, interfaces +from allmydata import interfaces from allmydata.util import fileutil, hashutil +from allmydata.storageserver import BucketWriter, BucketReader, \ + WriteBucketProxy, ReadBucketProxy, StorageServer class Bucket(unittest.TestCase): @@ -23,7 +25,7 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") - bw = storageserver.BucketWriter(self, incoming, final, 200) + bw = BucketWriter(self, incoming, final, 200) bw.remote_write(0, "a"*25) bw.remote_write(25, "b"*25) bw.remote_write(50, "c"*25) @@ -32,14 +34,14 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = storageserver.BucketWriter(self, incoming, final, 200) + bw = BucketWriter(self, incoming, final, 200) bw.remote_write(0, "a"*25) bw.remote_write(25, "b"*25) bw.remote_write(50, "c"*7) # last block may be short bw.remote_close() # now read from it - br = storageserver.BucketReader(final) + br = BucketReader(final) self.failUnlessEqual(br.remote_read(0, 25), "a"*25) self.failUnlessEqual(br.remote_read(25, 25), "b"*25) self.failUnlessEqual(br.remote_read(50, 7), "c"*7) @@ -59,7 +61,7 @@ class BucketProxy(unittest.TestCase): final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = storageserver.BucketWriter(self, incoming, final, size) + bw = BucketWriter(self, incoming, final, size) rb = RemoteBucket() rb.target = bw return bw, rb, final @@ -69,11 +71,12 @@ class BucketProxy(unittest.TestCase): def test_create(self): bw, rb, final = self.make_bucket("test_create", 500) - bp = storageserver.WriteBucketProxy(rb, - data_size=300, - segment_size=10, - num_segments=5, - num_share_hashes=3) + bp = WriteBucketProxy(rb, + data_size=300, + segment_size=10, + num_segments=5, + num_share_hashes=3, + uri_extension_size=500) self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp)) def test_readwrite(self): @@ -98,11 +101,12 @@ class BucketProxy(unittest.TestCase): uri_extension = "s" + "E"*498 + "e" bw, rb, final = self.make_bucket("test_readwrite", 1406) - bp = storageserver.WriteBucketProxy(rb, - data_size=100, - segment_size=25, - num_segments=4, - num_share_hashes=3) + bp = WriteBucketProxy(rb, + data_size=100, + segment_size=25, + num_segments=4, + num_share_hashes=3, + uri_extension_size=len(uri_extension)) d = bp.start() d.addCallback(lambda res: bp.put_block(0, "a"*25)) @@ -118,13 +122,13 @@ class BucketProxy(unittest.TestCase): # now read everything back def _start_reading(res): - br = storageserver.BucketReader(final) + br = BucketReader(final) rb = RemoteBucket() rb.target = br - rbp = storageserver.ReadBucketProxy(rb) + rbp = ReadBucketProxy(rb) self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp)) - d1 = rbp.start() + d1 = rbp.startIfNecessary() d1.addCallback(lambda res: rbp.get_block(0)) d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25)) d1.addCallback(lambda res: rbp.get_block(1)) @@ -169,7 +173,7 @@ class Server(unittest.TestCase): def create(self, name, sizelimit=None): workdir = self.workdir(name) - ss = storageserver.StorageServer(workdir, sizelimit) + ss = StorageServer(workdir, sizelimit) ss.setServiceParent(self.sparent) return ss diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 6b508513..92a08763 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -1,12 +1,79 @@ from twisted.trial import unittest from twisted.python.failure import Failure +from twisted.internet import defer from cStringIO import StringIO -from allmydata import upload, encode +from allmydata import upload, encode, storageserver from allmydata.uri import unpack_uri, unpack_lit +from allmydata.util.assertutil import precondition +from foolscap import eventual -from test_encode import FakePeer +class FakePeer: + def __init__(self, mode="good"): + self.ss = FakeStorageServer(mode) + + def callRemote(self, methname, *args, **kwargs): + def _call(): + meth = getattr(self, methname) + return meth(*args, **kwargs) + return defer.maybeDeferred(_call) + + def get_service(self, sname): + assert sname == "storageserver" + return self.ss + +class FakeStorageServer: + def __init__(self, mode): + self.mode = mode + def callRemote(self, methname, *args, **kwargs): + def _call(): + meth = getattr(self, methname) + return meth(*args, **kwargs) + d = eventual.fireEventually() + d.addCallback(lambda res: _call()) + return d + + def allocate_buckets(self, crypttext_hash, sharenums, + share_size, blocksize, canary): + #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size) + if self.mode == "full": + return (set(), {},) + elif self.mode == "already got them": + return (set(sharenums), {},) + else: + return (set(), + dict([( shnum, FakeBucketWriter(share_size) ) + for shnum in sharenums]), + ) + +class FakeBucketWriter: + # a diagnostic version of storageserver.BucketWriter + def __init__(self, size): + self.data = StringIO() + self.closed = False + self._size = size + + def callRemote(self, methname, *args, **kwargs): + def _call(): + meth = getattr(self, "remote_" + methname) + return meth(*args, **kwargs) + d = eventual.fireEventually() + d.addCallback(lambda res: _call()) + return d + + def remote_write(self, offset, data): + precondition(not self.closed) + precondition(offset >= 0) + precondition(offset+len(data) <= self._size, + "offset=%d + data=%d > size=%d" % + (offset, len(data), self._size)) + self.data.seek(offset) + self.data.write(data) + + def remote_close(self): + precondition(not self.closed) + self.closed = True class FakeClient: def __init__(self, mode="good"): diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 5c327da4..404b86de 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -5,7 +5,7 @@ from twisted.application import service from foolscap import Referenceable from allmydata.util import idlib, hashutil -from allmydata import encode, storageserver +from allmydata import encode, storageserver, hashtree from allmydata.uri import pack_uri, pack_lit from allmydata.interfaces import IUploadable, IUploader from allmydata.Crypto.Cipher import AES @@ -22,6 +22,13 @@ class HaveAllPeersError(Exception): class TooFullError(Exception): pass +# our current uri_extension is 846 bytes for small files, a few bytes +# more for larger ones (since the filesize is encoded in decimal in a +# few places). Ask for a little bit more just in case we need it. If +# the extension changes size, we can change EXTENSION_SIZE to +# allocate a more accurate amount of space. +EXTENSION_SIZE = 1000 + class PeerTracker: def __init__(self, peerid, permutedid, connection, sharesize, blocksize, num_segments, num_share_hashes, @@ -31,6 +38,13 @@ class PeerTracker: self.connection = connection # to an RIClient self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize + #print "PeerTracker", peerid, permutedid, sharesize + as = storageserver.allocated_size(sharesize, + num_segments, + num_share_hashes, + EXTENSION_SIZE) + self.allocated_size = as + self.blocksize = blocksize self.num_segments = num_segments self.num_share_hashes = num_share_hashes @@ -47,10 +61,11 @@ class PeerTracker: def _got_storageserver(self, storageserver): self._storageserver = storageserver def _query(self, sharenums): + #print " query", self.peerid, len(sharenums) d = self._storageserver.callRemote("allocate_buckets", self.crypttext_hash, - sharenums, self.sharesize, - self.blocksize, + sharenums, + self.allocated_size, canary=Referenceable()) d.addCallback(self._got_reply) return d @@ -62,7 +77,8 @@ class PeerTracker: bp = storageserver.WriteBucketProxy(rref, self.sharesize, self.blocksize, self.num_segments, - self.num_share_hashes) + self.num_share_hashes, + EXTENSION_SIZE) b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys())) @@ -137,11 +153,16 @@ class FileUploader: # responsible for handling the data and sending out the shares. peers = self._client.get_permuted_peers(self._crypttext_hash) assert peers + # TODO: eek, don't pull this from here, find a better way. gross. num_segments = self._encoder.uri_extension_data['num_segments'] - from allmydata.util.mathutil import next_power_of_k - import math - num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1) + ht = hashtree.IncompleteHashTree(self.total_shares) + # this needed_hashes computation should mirror + # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree + # (instead of a HashTree) because we don't require actual hashing + # just to count the levels. + num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) + trackers = [ PeerTracker(peerid, permutedid, conn, share_size, block_size, num_segments, num_share_hashes, @@ -217,10 +238,11 @@ class FileUploader: if ring[0][1] == SHARE: sharenums_to_query.add(ring[0][2]) else: - d = peer.query(sharenums_to_query) - d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,)) - outstanding_queries.append(d) - d.addErrback(log.err) + if True or sharenums_to_query: + d = peer.query(sharenums_to_query) + d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,)) + outstanding_queries.append(d) + d.addErrback(log.err) peer = ring[0][2] sharenums_to_query = set() ring.rotate(-1)