From: Zooko O'Whielacronx Date: Fri, 30 Mar 2007 17:52:19 +0000 (-0700) Subject: finish storage server and write new download X-Git-Url: https://git.rkrishnan.org/components/COPYING.GPL?a=commitdiff_plain;h=f4a718c5b6684c2031832107fdc047a447fff610;p=tahoe-lafs%2Ftahoe-lafs.git finish storage server and write new download --- diff --git a/src/allmydata/bucketstore.py b/src/allmydata/bucketstore.py deleted file mode 100644 index 6c055e78..00000000 --- a/src/allmydata/bucketstore.py +++ /dev/null @@ -1,149 +0,0 @@ -import os - -from foolscap import Referenceable -from twisted.application import service -#from twisted.python import log -from allmydata.util import idlib -from zope.interface import implements -from allmydata.interfaces import RIBucketWriter, RIBucketReader - -from allmydata.util.assertutil import precondition, _assert - -class BucketStore(service.MultiService, Referenceable): - def __init__(self, store_dir): - precondition(os.path.isdir(store_dir)) - service.MultiService.__init__(self) - self._store_dir = store_dir - - self._leases = set() # should do weakref dances. - - def _get_bucket_dir(self, verifierid): - avid = idlib.b2a(verifierid) - return os.path.join(self._store_dir, avid) - - def has_bucket(self, verifierid): - return os.path.exists(self._get_bucket_dir(verifierid)) - - def allocate_bucket(self, verifierid, bucket_num, size, - leaser_credentials, canary): - bucket_dir = self._get_bucket_dir(verifierid) - precondition(not os.path.exists(bucket_dir)) - precondition(isinstance(bucket_num, int)) - bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size) - bucket.set_leaser(leaser_credentials) - lease = Lease(verifierid, leaser_credentials, bucket, canary) - self._leases.add(lease) - return lease - - def get_buckets(self, verifierid): - # for now, only returns those created by this process, in this run - bucket_dir = self._get_bucket_dir(verifierid) - if os.path.exists(bucket_dir): - b = ReadBucket(bucket_dir, verifierid) - return [(b.get_bucket_num(), b)] - else: - return [] - -class Lease(Referenceable): - implements(RIBucketWriter) - - def __init__(self, verifierid, leaser, bucket, canary): - self._leaser = leaser - self._verifierid = verifierid - self._bucket = bucket - canary.notifyOnDisconnect(self._lost_canary) - - def get_bucket(self): - return self._bucket - - def remote_write(self, data): - self._bucket.write(data) - - def remote_set_metadata(self, metadata): - self._bucket.set_metadata(metadata) - - def remote_close(self): - self._bucket.close() - - def _lost_canary(self): - pass - -class Bucket: - def __init__(self, bucket_dir, verifierid): - self._bucket_dir = bucket_dir - self._verifierid = verifierid - - def _write_attr(self, name, val): - f = file(os.path.join(self._bucket_dir, name), 'wb') - f.write(val) - f.close() - - def _read_attr(self, name): - f = file(os.path.join(self._bucket_dir, name), 'rb') - data = f.read() - f.close() - return data - - def is_complete(self): - return os.path.exists(os.path.join(self._bucket_dir, 'closed')) - -class WriteBucket(Bucket): - def __init__(self, bucket_dir, verifierid, bucket_num, size): - Bucket.__init__(self, bucket_dir, verifierid) - precondition(not os.path.exists(bucket_dir)) - #log.msg("WriteBucket [%s]: creating bucket %s" - # % (idlib.b2a(verifierid), bucket_dir)) - os.mkdir(bucket_dir) - - self._open = True - self._size = size - self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb') - self._bytes_written = 0 - - self._write_attr('bucket_num', str(bucket_num)) - - def set_leaser(self, leaser): - self._write_attr('leases', leaser) - - def write(self, data): - precondition(self._open) - precondition(len(data) + self._bytes_written <= self._size) - self._data.write(data) - self._data.flush() - self._bytes_written += len(data) - - def set_metadata(self, metadata): - precondition(self._open) - self._write_attr('metadata', metadata) - - def close(self): - precondition(self._bytes_written == self._size) - #log.msg("WriteBucket.close [%s] (%s)" - # % (idlib.b2a(self._verifierid), self._bucket_dir)) - self._data.close() - self._write_attr('closed', '') - self._open = False - - def is_complete(self): - complete = Bucket.is_complete(self) - if complete: - _assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size) - return complete - -class ReadBucket(Bucket, Referenceable): - implements(RIBucketReader) - - def __init__(self, bucket_dir, verifierid): - Bucket.__init__(self, bucket_dir, verifierid) - precondition(self.is_complete()) # implicitly asserts bucket_dir exists - - def get_bucket_num(self): - return int(self._read_attr('bucket_num')) - - def read(self): - return self._read_attr('data') - remote_read = read - - def get_metadata(self): - return self._read_attr('metadata') - remote_get_metadata = get_metadata diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py index f75235f6..7cbd9f4d 100644 --- a/src/allmydata/codec.py +++ b/src/allmydata/codec.py @@ -42,6 +42,9 @@ class ReplicatingEncoder(object): def get_share_size(self): return self.data_size + def get_block_size(self): + return self.data_size + def encode(self, inshares, desired_shareids=None): assert isinstance(inshares, list) for inshare in inshares: @@ -59,7 +62,7 @@ class ReplicatingDecoder(object): def set_serialized_params(self, params): self.required_shares = int(params) - def get_required_shares(self): + def get_needed_shares(self): return self.required_shares def decode(self, some_shares, their_shareids): @@ -97,6 +100,9 @@ class CRSEncoder(object): def get_share_size(self): return self.share_size + def get_block_size(self): + return self.share_size + def encode(self, inshares, desired_share_ids=None): precondition(desired_share_ids is None or len(desired_share_ids) <= self.max_shares, desired_share_ids, self.max_shares) @@ -129,7 +135,7 @@ class CRSDecoder(object): print "max_shares: %d" % self.max_shares print "required_shares: %d" % self.required_shares - def get_required_shares(self): + def get_needed_shares(self): return self.required_shares def decode(self, some_shares, their_shareids): diff --git a/src/allmydata/debugshell.py b/src/allmydata/debugshell.py index b075e1f5..9e3145f2 100644 --- a/src/allmydata/debugshell.py +++ b/src/allmydata/debugshell.py @@ -1,21 +1,5 @@ -import os # 'app' is overwritten by manhole when the connection is established. We set # it to None now to keep pyflakes from complaining. app = None -def get_random_bucket_on(nodeid, size=200): - d = app.get_remote_service(nodeid, 'storageserver') - def get_bucket(rss): - return rss.callRemote('allocate_bucket', - verifierid=os.urandom(20), - bucket_num=26, - size=size, - leaser=app.tub.tubID, - ) - d.addCallback(get_bucket) - return d - -def write_to_bucket(bucket, bytes=100): - return bucket.callRemote('write', data=os.urandom(bytes)) - diff --git a/src/allmydata/download.py b/src/allmydata/download.py index f08f3a28..d7ed7d9b 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -1,13 +1,14 @@ -import os, sha +import os, random, sha from zope.interface import implements -from twisted.python import failure, log +from twisted.python import log from twisted.internet import defer from twisted.application import service -from allmydata.util import idlib, bencode +from allmydata.util import idlib, bencode, mathutil from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata import codec +from allmydata.Crypto.Cipher import AES from allmydata.uri import unpack_uri from allmydata.interfaces import IDownloadTarget, IDownloader @@ -18,92 +19,187 @@ class HaveAllPeersError(Exception): # we use this to jump out of the loop pass + +class Output: + def __init__(self, downloadable, key): + self.downloadable = downloadable + self._decryptor = AES.new(key=key, mode=AES.MODE_CTR, + counterstart="\x00"*16) + self._verifierid_hasher = sha.new(netstring("allmydata_v1_verifierid")) + self._fileid_hasher = sha.new(netstring("allmydata_v1_fileid")) + def write(self, crypttext): + self._verifierid_hasher.update(crypttext) + plaintext = self._decryptor.decrypt(crypttext) + self._fileid_hasher.update(plaintext) + self.downloadable.write(plaintext) + def finish(self): + self.downloadable.close() + return self.downloadable.finish() + +class BlockDownloader: + def __init__(self, bucket, blocknum, parent): + self.bucket = bucket + self.blocknum = blocknum + self.parent = parent + + def start(self, segnum): + d = self.bucket.callRemote('get_block', segnum) + d.addCallbacks(self._hold_block, self._got_block_error) + return d + + def _hold_block(self, data): + self.parent.hold_block(self.blocknum, data) + + def _got_block_error(self, f): + self.parent.bucket_failed(self.blocknum, self.bucket) + +class SegmentDownloader: + def __init__(self, segmentnumber, needed_shares): + self.segmentnumber = segmentnumber + self.needed_blocks = needed_shares + self.blocks = {} # k: blocknum, v: data + + def start(self): + return self._download() + + def _download(self): + d = self._try() + def _done(res): + if len(self.blocks) >= self.needed_blocks: + return self.blocks + else: + return self._download() + d.addCallback(_done) + return d + + def _try(self): + while len(self.parent.active_buckets) < self.needed_blocks: + # need some more + otherblocknums = list(set(self.parent._share_buckets.keys()) - set(self.parent.active_buckets.keys())) + if not otherblocknums: + raise NotEnoughPeersError + blocknum = random.choice(otherblocknums) + self.parent.active_buckets[blocknum] = random.choice(self.parent._share_buckets[blocknum]) + + # Now we have enough buckets, in self.parent.active_buckets. + l = [] + for blocknum, bucket in self.parent.active_buckets.iteritems(): + bd = BlockDownloader(bucket, blocknum, self) + d = bd.start(self.segmentnumber) + l.append(d) + return defer.DeferredList(l) + + def hold_block(self, blocknum, data): + self.blocks[blocknum] = data + + def bucket_failed(self, shnum, bucket): + del self.parent.active_buckets[shnum] + s = self.parent._share_buckets[shnum] + s.remove(bucket) + if not s: + del self.parent._share_buckets[shnum] + class FileDownloader: debug = False - def __init__(self, peer, uri): - self._peer = peer - (codec_name, codec_params, verifierid) = unpack_uri(uri) + def __init__(self, client, uri, downloadable): + self._client = client + self._downloadable = downloadable + (codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size) = unpack_uri(uri) assert isinstance(verifierid, str) assert len(verifierid) == 20 self._verifierid = verifierid + self._roothash = roothash self._decoder = codec.get_decoder_by_name(codec_name) self._decoder.set_serialized_params(codec_params) - self.needed_shares = self._decoder.get_required_shares() + self._total_segments = mathutil.div_ceil(size, segment_size) + self._current_segnum = 0 + self._segment_size = segment_size + self._needed_shares = self._decoder.get_needed_shares() - def set_download_target(self, target): - self._target = target - self._target.register_canceller(self._cancel) - - def _cancel(self): - pass + # future: + # self._share_hash_tree = ?? + # self._subshare_hash_trees = {} # k:shnum, v: hashtree + # each time we start using a new shnum, we must acquire a share hash + # from one of the buckets that provides that shnum, then validate it against + # the rest of the share hash tree that they provide. Then, each time we + # get a block in that share, we must validate the block against the rest + # of the subshare hash tree that that bucket will provide. def start(self): log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),)) if self.debug: print "starting download" # first step: who should we download from? + self.active_buckets = {} # k: shnum, v: bucket + self._share_buckets = {} # k: shnum, v: set of buckets + + key = "\x00" * 16 + self._output = Output(self._downloadable, key) + + d = defer.maybeDeferred(self._get_all_shareholders) + d.addCallback(self._got_all_shareholders) + d.addCallback(self._download_all_segments) + d.addCallback(self._done) + return d - # maybe limit max_peers to 2*len(self.shares), to reduce memory - # footprint - max_peers = None + def _get_all_shareholders(self): + dl = [] + for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._verifierid): + d = connection.callRemote("get_buckets", self._verifierid) + d.addCallbacks(self._got_response, self._got_error, + callbackArgs=(connection,)) + dl.append(d) + return defer.DeferredList(dl) + + def _got_response(self, buckets, connection): + for sharenum, bucket in buckets: + self._share_buckets.setdefault(sharenum, set()).add(bucket) + + def _got_error(self, f): + self._client.log("Somebody failed. -- %s" % (f,)) - self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers) - for p in self.permuted: - assert isinstance(p, str) - self.landlords = [] # list of (peerid, bucket_num, remotebucket) + def _got_all_shareholders(self, res): + if len(self._share_buckets) < self._needed_shares: + raise NotEnoughPeersError - d = defer.maybeDeferred(self._check_next_peer) - d.addCallback(self._got_all_peers) + self.active_buckets = {} + + def _download_all_segments(self): + d = self._download_segment(self._current_segnum) + def _done(res): + if self._current_segnum == self._total_segments: + return None + return self._download_segment(self._current_segnum) + d.addCallback(_done) return d - def _check_next_peer(self): - if len(self.permuted) == 0: - # there are no more to check - raise NotEnoughPeersError - peerid = self.permuted.pop(0) - - d = self._peer.get_remote_service(peerid, "storageserver") - def _got_peer(service): - bucket_num = len(self.landlords) - if self.debug: print "asking %s" % idlib.b2a(peerid) - d2 = service.callRemote("get_buckets", verifierid=self._verifierid) - def _got_response(buckets): - if buckets: - bucket_nums = [num for (num,bucket) in buckets] - if self.debug: - print " peerid %s has buckets %s" % (idlib.b2a(peerid), - bucket_nums) - - self.landlords.append( (peerid, buckets) ) - if len(self.landlords) >= self.needed_shares: - if self.debug: print " we're done!" - raise HaveAllPeersError - # otherwise we fall through to search more peers - d2.addCallback(_got_response) - return d2 - d.addCallback(_got_peer) - - def _done_with_peer(res): - if self.debug: print "done with peer %s:" % idlib.b2a(peerid) - if isinstance(res, failure.Failure): - if res.check(HaveAllPeersError): - if self.debug: print " all done" - # we're done! - return - if res.check(IndexError): - if self.debug: print " no connection" - else: - if self.debug: print " other error:", res + def _download_segment(self, segnum): + segmentdler = SegmentDownloader(segnum, self._needed_shares) + d = segmentdler.start() + d.addCallback(self._decoder.decode) + def _done(res): + self._current_segnum += 1 + if self._current_segnum == self._total_segments: + data = ''.join(res) + padsize = mathutil.pad_size(self._size, self._segment_size) + data = data[:-padsize] + self.output.write(data) else: - if self.debug: print " they had data for us" - # we get here for either good peers (when we still need more), or - # after checking a bad peer (and thus still need more). So now we - # need to grab a new peer. - return self._check_next_peer() - d.addBoth(_done_with_peer) + for buf in res: + self.output.write(buf) + d.addCallback(_done) return d + def _done(self, res): + return self._output.finish() + + def _write_data(self, data): + self._verifierid_hasher.update(data) + + + +# old stuff def _got_all_peers(self, res): all_buckets = [] for peerid, buckets in self.landlords: diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 1cd60a7e..1cfb5dae 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -57,7 +57,11 @@ class RIBucketWriter(RemoteInterface): class RIStorageServer(RemoteInterface): def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int), sharesize=int, blocksize=int, canary=Referenceable_): - # if the canary is lost before close(), the bucket is deleted + """ + @param canary: If the canary is lost before close(), the bucket is deleted. + @return: tuple of (alreadygot, allocated), where alreadygot is what we + already have and is what we hereby agree to accept + """ return TupleOf(SetOf(int), DictOf(int, RIBucketWriter)) def get_buckets(verifierid=Verifierid): return DictOf(int, RIBucketReader_) @@ -66,7 +70,7 @@ class RIBucketReader(RemoteInterface): def get_block(blocknum=int): return ShareData def get_block_hashes(): - return ListOf(Hash)) + return ListOf(Hash) def get_share_hashes(): return ListOf(TupleOf(int, Hash)) @@ -157,6 +161,10 @@ class ICodecEncoder(Interface): compatible decoder. """ + def get_block_size(): + """Return the length of the shares that encode() will produce. + """ + def get_share_size(): """Return the length of the shares that encode() will produce. """ @@ -271,7 +279,7 @@ class ICodecDecoder(Interface): """Set up the parameters of this encoder, from a string returned by encoder.get_serialized_params().""" - def get_required_shares(): + def get_needed_shares(): """Return the number of shares needed to reconstruct the data. set_serialized_params() is required to be called before this.""" diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index 4286accb..bb9622d1 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -1,4 +1,4 @@ -import os +import os, re from foolscap import Referenceable from twisted.application import service @@ -10,7 +10,7 @@ from allmydata.util import bencode, fileutil, idlib from allmydata.util.assertutil import _assert, precondition # store/ -# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success +# store/incoming # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success # store/$VERIFIERID # store/$VERIFIERID/$SHARENUM # store/$VERIFIERID/$SHARENUM/blocksize @@ -24,20 +24,20 @@ NUM_RE=re.compile("[1-9][0-9]*") class BucketWriter(Referenceable): implements(RIBucketWriter) - def __init__(self, tmphome, finalhome, blocksize): - self.tmphome = tmphome + def __init__(self, incominghome, finalhome, blocksize): + self.incominghome = incominghome self.finalhome = finalhome self.blocksize = blocksize self.closed = False self._write_file('blocksize', str(blocksize)) def _write_file(self, fname, data): - open(os.path.join(tmphome, fname), 'wb').write(data) + open(os.path.join(self.incominghome, fname), 'wb').write(data) def remote_put_block(self, segmentnum, data): precondition(not self.closed) assert len(data) == self.blocksize - f = open(os.path.join(self.tmphome, 'data'), 'wb') + f = open(os.path.join(self.incominghome, 'data'), 'wb') f.seek(self.blocksize*segmentnum) f.write(data) @@ -54,7 +54,7 @@ class BucketWriter(Referenceable): def close(self): precondition(not self.closed) # TODO assert or check the completeness and consistency of the data that has been written - fileutil.rename(self.tmphome, self.finalhome) + fileutil.rename(self.incominghome, self.finalhome) self.closed = True def str2l(s): @@ -87,24 +87,28 @@ class StorageServer(service.MultiService, Referenceable): def __init__(self, storedir): fileutil.make_dirs(storedir) self.storedir = storedir - self.tmpdir = os.path.join(storedir, 'tmp') - self._clean_trash() - fileutil.make_dirs(self.tmpdir) + self.incomingdir = os.path.join(storedir, 'incoming') + self._clean_incomplete() + fileutil.make_dirs(self.incomingdir) service.MultiService.__init__(self) - def _clean_trash(self): - fileutil.rm_dir(self.tmpdir) + def _clean_incomplete(self): + fileutil.rm_dir(self.incomingdir) def remote_allocate_buckets(self, verifierid, sharenums, sharesize, blocksize, canary): - bucketwriters = {} # k: sharenum, v: BucketWriter - for sharenum in sharenums: - tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum) - finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum) - bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize) + alreadygot = set() + bucketwriters = {} # k: shnum, v: BucketWriter + for shnum in sharenums: + incominghome = os.path.join(self.incomingdir, idlib.a2b(verifierid), "%d"%shnum) + finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%shnum) + if os.path.exists(incominghome) or os.path.exists(finalhome): + alreadygot.add(shnum) + else: + bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize) - return bucketwriters + return alreadygot, bucketwriters def remote_get_buckets(self, verifierid): bucketreaders = {} # k: sharenum, v: BucketReader diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 686f23b0..0328ae9e 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -6,221 +6,19 @@ from cStringIO import StringIO from allmydata import upload from allmydata.uri import unpack_uri -class StringBucketProxy: - # This is for unit tests: make a StringIO look like a RIBucketWriter. - - def __init__(self): - self.data = StringIO() - self.size = None - self.done = False - - def callRemote(self, methname, **kwargs): - if methname == "write": - return defer.maybeDeferred(self.write, **kwargs) - elif methname == "close": - return defer.maybeDeferred(self.close, **kwargs) - else: - return defer.fail(NameError("no such method named %s" % methname)) - - def write(self, data): - self.data.write(data) - def close(self): - self.done = True - - -class FakePeer: - def __init__(self, peerid, response): - self.peerid = peerid - self.response = response - - def callRemote(self, methname, *args, **kwargs): - assert not args - return defer.maybeDeferred(self._callRemote, methname, **kwargs) - - def _callRemote(self, methname, **kwargs): - assert methname == "allocate_bucket" - #assert kwargs["size"] == 100 - assert kwargs["leaser"] == "fakeclient" - if self.response == "good": - return self - raise upload.TooFullError() +class FakeStorageServer: + pass class FakeClient: - nodeid = "fakeclient" - def __init__(self, responses): - self.peers = [] - for peerid,r in enumerate(responses): - if r == "disconnected": - self.peers.append(None) - else: - self.peers.append(FakePeer(str(peerid), r)) - - def get_permuted_connections(self, key): - return [str(i) for i in range(len(self.peers))] - - def get_remote_service(self, peerid, name): - peer = self.peers[int(peerid)] - if not peer: - return defer.fail(IndexError("no connection to that peer")) - return defer.succeed(peer) - - -class NextPeerUploader(upload.FileUploader): - _size = 100 - def _got_enough_peers(self, res): - return res - -class NextPeer(unittest.TestCase): - responses = ["good", # 0 - "full", # 1 - "full", # 2 - "disconnected", # 3 - "good", # 4 - ] - - def compare_landlords(self, u, c, expected): - exp = [(str(peerid), bucketnum, c.peers[peerid]) - for peerid, bucketnum in expected] - self.failUnlessEqual(u.landlords, exp) - - VERIFIERID = "\x00" * 20 - def test_0(self): - c = FakeClient([]) - u = NextPeerUploader(c) - u.set_verifierid(self.VERIFIERID) - u.set_params(2, 2, 2) - d = u.start() - def _check(f): - f.trap(upload.NotEnoughPeersError) - d.addCallbacks(lambda res: self.fail("this was supposed to fail"), - _check) - return d - - def test_1(self): - c = FakeClient(self.responses) - u = NextPeerUploader(c) - u.set_verifierid(self.VERIFIERID) - u.set_params(2, 2, 2) - d = u.start() - def _check(res): - self.failUnlessEqual(u.goodness_points, 2) - self.compare_landlords(u, c, [(0, 0), - (4, 1), - ]) - d.addCallback(_check) - return d - - def test_2(self): - c = FakeClient(self.responses) - u = NextPeerUploader(c) - u.set_verifierid(self.VERIFIERID) - u.set_params(3, 3, 3) - d = u.start() - def _check(res): - self.failUnlessEqual(u.goodness_points, 3) - self.compare_landlords(u, c, [(0, 0), - (4, 1), - (0, 2), - ]) - d.addCallback(_check) - return d - - responses2 = ["good", # 0 - "full", # 1 - "full", # 2 - "good", # 3 - "full", # 4 - ] - - def test_3(self): - c = FakeClient(self.responses2) - u = NextPeerUploader(c) - u.set_verifierid(self.VERIFIERID) - u.set_params(3, 3, 3) - d = u.start() - def _check(res): - self.failUnlessEqual(u.goodness_points, 3) - self.compare_landlords(u, c, [(0, 0), - (3, 1), - (0, 2), - ]) - d.addCallback(_check) - return d - - responses3 = ["good", # 0 - "good", # 1 - "good", # 2 - "good", # 3 - "good", # 4 - ] - - def test_4(self): - c = FakeClient(self.responses3) - u = NextPeerUploader(c) - u.set_verifierid(self.VERIFIERID) - u.set_params(4, 4, 4) - d = u.start() - def _check(res): - self.failUnlessEqual(u.goodness_points, 4) - self.compare_landlords(u, c, [(0, 0), - (1, 1), - (2, 2), - (3, 3), - ]) - d.addCallback(_check) - return d - - -class FakePeer2: - def __init__(self, peerid): - self.peerid = peerid - self.data = "" - - def callRemote(self, methname, *args, **kwargs): - if methname == "allocate_bucket": - return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs) - if methname == "write": - return defer.maybeDeferred(self._write, *args, **kwargs) - if methname == "set_metadata": - return defer.maybeDeferred(self._set_metadata, *args, **kwargs) - if methname == "close": - return defer.maybeDeferred(self._close, *args, **kwargs) - return defer.maybeDeferred(self._bad_name, methname) - - def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary): - self.allocated_size = size - return self - def _write(self, data): - self.data = self.data + data - def _set_metadata(self, metadata): - self.metadata = metadata - def _close(self): - pass - def _bad_name(self, methname): - raise NameError("FakePeer2 has no such method named '%s'" % methname) - -class FakeClient2: - nodeid = "fakeclient" - def __init__(self, num_peers): - self.peers = [] - for peerid in range(num_peers): - self.peers.append(FakePeer2(str(peerid))) - - def get_permuted_connections(self, key): - return [str(i) for i in range(len(self.peers))] - - def get_remote_service(self, peerid, name): - peer = self.peers[int(peerid)] - if not peer: - return defer.fail(IndexError("no connection to that peer")) - return defer.succeed(peer) + def get_permuted_peers(self, verifierid): + return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ] class Uploader(unittest.TestCase): def setUp(self): - node = self.node = FakeClient2(10) - u = self.u = upload.Uploader() - u.running = 1 - u.parent = node + self.node = FakeClient() + self.u = upload.Uploader() + self.u.running = True + self.u.parent = self.node def _check(self, uri): self.failUnless(isinstance(uri, str)) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index c994c9c0..d0f36c20 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -84,15 +84,19 @@ class FileUploader: # create the encoder, so we can know how large the shares will be self._encoder = self.ENCODERCLASS() + self._last_seg_encoder = self.ENCODERCLASS() # This one is for encoding the final segment, which might be shorter than the others. self._codec_name = self._encoder.get_encoder_type() + self._encoder.set_params(self.segment_size, self.needed_shares, self.total_shares) +xyz + paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) - self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares) - self._share_size = self._encoder.get_share_size() + + self._block_size = self._encoder.get_block_size() # first step: who should we upload to? peers = self._client.get_permuted_peers(self._verifierid) assert peers - trackers = [ (permutedid, PeerTracker(peerid, conn),) + trackers = [ (permutedid, PeerTracker(peerid, conn, self._share_size, self._block_size, self._verifierid),) for permutedid, peerid, conn in peers ] ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ]) @@ -192,7 +196,7 @@ class FileUploader: def _compute_uri(self, roothash): params = self._encoder.get_serialized_params() - return pack_uri(self._codec_name, params, self._verifierid, roothash) + return pack_uri(self._codec_name, params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size) def netstring(s): diff --git a/src/allmydata/uri.py b/src/allmydata/uri.py index 3a96d2c1..4f2f0d9d 100644 --- a/src/allmydata/uri.py +++ b/src/allmydata/uri.py @@ -5,7 +5,7 @@ from allmydata.util import idlib # enough information to retrieve and validate the contents. It shall be # expressed in a limited character set (namely [TODO]). -def pack_uri(codec_name, codec_params, verifierid): +def pack_uri(codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size): assert isinstance(codec_name, str) assert len(codec_name) < 10 assert ":" not in codec_name @@ -13,13 +13,18 @@ def pack_uri(codec_name, codec_params, verifierid): assert ":" not in codec_params assert isinstance(verifierid, str) assert len(verifierid) == 20 # sha1 hash - return "URI:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid)) + return "URI:%s:%s:%s:%s:%s:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid), idlib.b2a(roothash), needed_shares, total_shares, size, segment_size) def unpack_uri(uri): assert uri.startswith("URI:") - header, codec_name, codec_params, verifierid_s = uri.split(":") + header, codec_name, codec_params, verifierid_s, roothash_s, needed_shares_s, total_shares_s, size_s, segment_size_s = uri.split(":") verifierid = idlib.a2b(verifierid_s) - return codec_name, codec_params, verifierid + roothash = idlib.a2b(roothash_s) + needed_shares = idlib.a2b(needed_shares_s) + total_shares = idlib.a2b(total_shares_s) + size = int(size_s) + segment_size = int(segment_size_s) + return codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size