From: Brian Warner Date: Tue, 16 Jan 2007 04:22:22 +0000 (-0700) Subject: rearrange encode/upload, add URIs, switch to ReplicatingEncoder X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~374 X-Git-Url: https://git.rkrishnan.org/FOOURL?a=commitdiff_plain;h=3209fd5e0914f04b9626fde08787dba250569b30;p=tahoe-lafs%2Ftahoe-lafs.git rearrange encode/upload, add URIs, switch to ReplicatingEncoder Added metadata to the bucket store, which is used to hold the share number (but the bucket doesn't know that, it just gets a string). Modified the codec interfaces a bit. Try to pass around URIs to/from download/upload instead of verifierids. URI format is still in flux. Change the current (primitive) file encoder to use a ReplicatingEncoder because it provides ICodecEncoder. We will be moving to the (less primitive) file encoder (currently in allmydata.encode_new) eventually, but for now this change lets us test out PyRS or zooko's upcoming C-based RS codec in something larger than a single unit test. This primitive file encoder only uses a single segment, and has no merkle trees. Also added allmydata.util.deferredutil for a DeferredList wrapper that errbacks (but only when all component Deferreds have fired) if there were any errors, which unfortunately is not a behavior available from the standard DeferredList. --- diff --git a/src/allmydata/bucketstore.py b/src/allmydata/bucketstore.py index 08ea9dc9..923b8e81 100644 --- a/src/allmydata/bucketstore.py +++ b/src/allmydata/bucketstore.py @@ -2,16 +2,13 @@ import os from foolscap import Referenceable from twisted.application import service -from twisted.python.failure import Failure +from twisted.python import log from allmydata.util import idlib from zope.interface import implements from allmydata.interfaces import RIBucketWriter, RIBucketReader from allmydata.util.assertutil import precondition, _assert -class NoSuchBucketError(Failure): - pass - class BucketStore(service.MultiService, Referenceable): def __init__(self, store_dir): precondition(os.path.isdir(store_dir)) @@ -94,6 +91,8 @@ class WriteBucket(Bucket): def __init__(self, bucket_dir, verifierid, bucket_num, size): Bucket.__init__(self, bucket_dir, verifierid) precondition(not os.path.exists(bucket_dir)) + #log.msg("WriteBucket [%s]: creating bucket %s" + # % (idlib.b2a(verifierid), bucket_dir)) os.mkdir(bucket_dir) self._open = True @@ -119,6 +118,8 @@ class WriteBucket(Bucket): def close(self): precondition(self._bytes_written == self._size) + #log.msg("WriteBucket.close [%s] (%s)" + # % (idlib.b2a(self._verifierid), self._bucket_dir)) self._data.close() self._write_attr('closed', '') self._open = False diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 360cdc93..203961ec 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -140,6 +140,7 @@ class Client(node.Node, Referenceable): # sort of at most max_count elements results = [] for nodeid in self.all_peers: + assert isinstance(nodeid, str) permuted = sha.new(key + nodeid).digest() results.append((permuted, nodeid)) results.sort() diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py index e8d99f18..e330fe2e 100644 --- a/src/allmydata/codec.py +++ b/src/allmydata/codec.py @@ -14,10 +14,10 @@ class ReplicatingEncoder(object): implements(ICodecEncoder) ENCODER_TYPE = 0 - def set_params(self, data_size, required_shares, total_shares): + def set_params(self, data_size, required_shares, max_shares): self.data_size = data_size self.required_shares = required_shares - self.total_shares = total_shares + self.max_shares = max_shares def get_encoder_type(self): return self.ENCODER_TYPE @@ -28,8 +28,11 @@ class ReplicatingEncoder(object): def get_share_size(self): return self.data_size - def encode(self, data): - shares = [(i,data) for i in range(self.total_shares)] + def encode(self, data, num_shares=None): + if num_shares is None: + num_shares = self.max_shares + assert num_shares <= self.max_shares + shares = [(i,data) for i in range(num_shares)] return defer.succeed(shares) class ReplicatingDecoder(object): @@ -38,6 +41,9 @@ class ReplicatingDecoder(object): def set_serialized_params(self, params): self.required_shares = int(params) + def get_required_shares(self): + return self.required_shares + def decode(self, some_shares): assert len(some_shares) >= self.required_shares data = some_shares[0][1] @@ -117,32 +123,38 @@ class PyRSEncoder(object): # than 20 minutes to run the test_encode_share tests, so I disabled most # of them. (uh, hello, it's running figleaf) - def set_params(self, data_size, required_shares, total_shares): - assert required_shares <= total_shares + def set_params(self, data_size, required_shares, max_shares): + assert required_shares <= max_shares self.data_size = data_size self.required_shares = required_shares - self.total_shares = total_shares + self.max_shares = max_shares self.chunk_size = required_shares self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size) self.last_chunk_padding = mathutil.pad_size(data_size, required_shares) self.share_size = self.num_chunks - self.encoder = rs_code.RSCode(total_shares, required_shares, 8) + self.encoder = rs_code.RSCode(max_shares, required_shares, 8) def get_encoder_type(self): return self.ENCODER_TYPE def get_serialized_params(self): return "%d:%d:%d" % (self.data_size, self.required_shares, - self.total_shares) + self.max_shares) def get_share_size(self): return self.share_size - def encode(self, data): - share_data = [ [] for i in range(self.total_shares)] + def encode(self, data, num_shares=None): + if num_shares is None: + num_shares = self.max_shares + assert num_shares <= self.max_shares + # we create self.max_shares shares, then throw out any extra ones + # so that we always return exactly num_shares shares. + + share_data = [ [] for i in range(self.max_shares)] for i in range(self.num_chunks): # we take self.chunk_size bytes from the input string, and - # turn it into self.total_shares bytes. + # turn it into self.max_shares bytes. offset = i*self.chunk_size # Note string slices aren't an efficient way to use memory, so # when we upgrade from the unusably slow py_ecc prototype to a @@ -155,12 +167,12 @@ class PyRSEncoder(object): input_vector = [ord(x) for x in chunk] assert len(input_vector) == self.required_shares output_vector = self.encoder.Encode(input_vector) - assert len(output_vector) == self.total_shares + assert len(output_vector) == self.max_shares for i2,out in enumerate(output_vector): share_data[i2].append(chr(out)) shares = [ (i, "".join(share_data[i])) - for i in range(self.total_shares) ] + for i in range(num_shares) ] return defer.succeed(shares) class PyRSDecoder(object): @@ -170,23 +182,26 @@ class PyRSDecoder(object): pieces = params.split(":") self.data_size = int(pieces[0]) self.required_shares = int(pieces[1]) - self.total_shares = int(pieces[2]) + self.max_shares = int(pieces[2]) self.chunk_size = self.required_shares self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size) self.last_chunk_padding = mathutil.pad_size(self.data_size, self.required_shares) self.share_size = self.num_chunks - self.encoder = rs_code.RSCode(self.total_shares, self.required_shares, + self.encoder = rs_code.RSCode(self.max_shares, self.required_shares, 8) if False: print "chunk_size: %d" % self.chunk_size print "num_chunks: %d" % self.num_chunks print "last_chunk_padding: %d" % self.last_chunk_padding print "share_size: %d" % self.share_size - print "total_shares: %d" % self.total_shares + print "max_shares: %d" % self.max_shares print "required_shares: %d" % self.required_shares + def get_required_shares(self): + return self.required_shares + def decode(self, some_shares): chunk_size = self.chunk_size assert len(some_shares) >= self.required_shares @@ -198,7 +213,7 @@ class PyRSDecoder(object): # this takes one byte from each share, and turns the combination # into a single chunk received_vector = [] - for j in range(self.total_shares): + for j in range(self.max_shares): share = have_shares.get(j) if share is not None: received_vector.append(ord(share[i])) diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 8646f788..2b5fca76 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -1,11 +1,12 @@ -import os +import os, sha from zope.interface import Interface, implements from twisted.python import failure, log from twisted.internet import defer from twisted.application import service -from allmydata.util import idlib +from allmydata.util import idlib, bencode +from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata import codec class NotEnoughPeersError(Exception): @@ -15,13 +16,21 @@ class HaveAllPeersError(Exception): # we use this to jump out of the loop pass +def unpack_uri(uri): + assert uri.startswith("URI:") + return bencode.bdecode(uri[4:]) + class FileDownloader: debug = False - def __init__(self, peer, verifierid): + def __init__(self, peer, verifierid, encoding_params): self._peer = peer assert isinstance(verifierid, str) + assert len(verifierid) == 20 self._verifierid = verifierid + self._decoder = codec.ReplicatingDecoder() + self._decoder.set_serialized_params(encoding_params) + self.needed_shares = self._decoder.get_required_shares() def set_download_target(self, target): self._target = target @@ -30,15 +39,8 @@ class FileDownloader: def _cancel(self): pass - def make_decoder(self): - n = self._shares = 4 - k = self._desired_shares = 2 - self._target.open() - self._decoder = codec.Decoder(self._target, k, n, - self._verifierid) - def start(self): - log.msg("starting download") + log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),)) if self.debug: print "starting download" # first step: who should we download from? @@ -75,7 +77,7 @@ class FileDownloader: bucket_nums) self.landlords.append( (peerid, buckets) ) - if len(self.landlords) >= self._desired_shares: + if len(self.landlords) >= self.needed_shares: if self.debug: print " we're done!" raise HaveAllPeersError # otherwise we fall through to search more peers @@ -107,7 +109,34 @@ class FileDownloader: all_buckets = [] for peerid, buckets in self.landlords: all_buckets.extend(buckets) - d = self._decoder.start(all_buckets) + # TODO: try to avoid pulling multiple shares from the same peer + all_buckets = all_buckets[:self.needed_shares] + # retrieve all shares + dl = [] + shares = [] + for (bucket_num, bucket) in all_buckets: + d0 = bucket.callRemote("get_metadata") + d1 = bucket.callRemote("read") + d2 = DeferredListShouldSucceed([d0, d1]) + def _got(res): + sharenum_s, sharedata = res + sharenum = bencode.bdecode(sharenum_s) + shares.append((sharenum, sharedata)) + d2.addCallback(_got) + dl.append(d2) + d = DeferredListShouldSucceed(dl) + + d.addCallback(lambda res: self._decoder.decode(shares)) + + def _write(data): + self._target.open() + hasher = sha.new(netstring("allmydata_v1_verifierid")) + hasher.update(data) + vid = hasher.digest() + assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid)) + self._target.write(data) + d.addCallback(_write) + def _done(res): self._target.close() return self._target.finish() @@ -204,26 +233,29 @@ class Downloader(service.MultiService): """ implements(IDownloader) name = "downloader" + debug = False - def download(self, verifierid, t): + def download(self, uri, t): + (verifierid, params) = unpack_uri(uri) assert self.parent assert self.running assert isinstance(verifierid, str) t = IDownloadTarget(t) assert t.write assert t.close - dl = FileDownloader(self.parent, verifierid) + dl = FileDownloader(self.parent, verifierid, params) dl.set_download_target(t) - dl.make_decoder() + if self.debug: + dl.debug = True d = dl.start() return d # utility functions - def download_to_data(self, verifierid): - return self.download(verifierid, Data()) - def download_to_filename(self, verifierid, filename): - return self.download(verifierid, FileName(filename)) - def download_to_filehandle(self, verifierid, filehandle): - return self.download(verifierid, FileHandle(filehandle)) + def download_to_data(self, uri): + return self.download(uri, Data()) + def download_to_filename(self, uri, filename): + return self.download(uri, FileName(filename)) + def download_to_filehandle(self, uri, filehandle): + return self.download(uri, FileHandle(filehandle)) diff --git a/src/allmydata/encode_new.py b/src/allmydata/encode_new.py index 0da54501..df09f6d4 100644 --- a/src/allmydata/encode_new.py +++ b/src/allmydata/encode_new.py @@ -129,6 +129,7 @@ class Encoder(object): segment_plaintext = self.infile.read(self.segment_size) segment_crypttext = self.cryptor.encrypt(segment_plaintext) del segment_plaintext + assert self.encoder.max_shares == self.num_shares d = self.encoder.encode(segment_crypttext) d.addCallback(self._encoded_segment) return d diff --git a/src/allmydata/filetable.py b/src/allmydata/filetable.py index bb96b02a..7b8c0e19 100644 --- a/src/allmydata/filetable.py +++ b/src/allmydata/filetable.py @@ -74,10 +74,10 @@ class MutableDirectoryNode(Referenceable): return self.make_subnode(absname) remote_add_directory = add_directory - def add_file(self, name, data): + def add_file(self, name, uri): self.validate_name(name) f = open(os.path.join(self._basedir, name), "wb") - f.write(data) + f.write(uri) f.close() remote_add_file = add_file diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 396a75d2..246e3d7f 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -6,6 +6,7 @@ from foolscap import RemoteInterface Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash PBURL = StringConstraint(150) Verifierid = StringConstraint(20) +URI = StringConstraint(100) # kind of arbitrary ShareData = StringConstraint(100000) # these four are here because Foolscap does not yet support the kind of # restriction I really want to apply to these. @@ -65,7 +66,7 @@ class RIMutableDirectoryNode(RemoteInterface): def add_directory(name=str): return RIMutableDirectoryNode_ - def add_file(name=str, data=Verifierid): + def add_file(name=str, uri=URI): return None def remove(name=str): @@ -75,7 +76,7 @@ class RIMutableDirectoryNode(RemoteInterface): class ICodecEncoder(Interface): - def set_params(data_size, required_shares, total_shares): + def set_params(data_size, required_shares, max_shares): """Set up the parameters of this encoder. See encode() for a description of how these parameters are used. @@ -109,28 +110,58 @@ class ICodecEncoder(Interface): """Return the length of the shares that encode() will produce. """ - def encode(data): + def encode(data, num_shares=None): """Encode a chunk of data. This may be called multiple times. Each call is independent. The data must be a string with a length that exactly matches the data_size promised by set_params(). + 'num_shares', if provided, must be equal or less than the + 'max_shares' set in set_params. If 'num_shares' is left at None, this + method will produce 'max_shares' shares. This can be used to minimize + the work that the encoder needs to do if we initially thought that we + would need, say, 100 shares, but now that it is time to actually + encode the data we only have 75 peers to send data to. + For each call, encode() will return a Deferred that fires with a list of 'total_shares' tuples. Each tuple is of the form (sharenum, - share), where sharenum is an int (from 0 total_shares-1), and share - is a string. The get_share_size() method can be used to determine the - length of the 'share' strings returned by encode(). + sharedata), where sharenum is an int (from 0 total_shares-1), and + sharedata is a string. The get_share_size() method can be used to + determine the length of the 'sharedata' strings returned by encode(). + + The (sharenum, sharedata) tuple must be kept together during storage + and retrieval. Specifically, the share data is useless by itself: the + decoder needs to be told which share is which by providing it with + both the share number and the actual share data. The memory usage of this function is expected to be on the order of total_shares * get_share_size(). """ + # design note: we could embed the share number in the sharedata by + # returning bencode((sharenum,sharedata)). The advantage would be + # making it easier to keep these two pieces together, and probably + # avoiding a round trip when reading the remote bucket (although this + # could be achieved by changing RIBucketReader.read to + # read_data_and_metadata). The disadvantage is that the share number + # wants to be exposed to the storage/bucket layer (specifically to + # handle the next stage of peer-selection algorithm in which we + # propose to keep share#A on a given peer and they are allowed to + # tell us that they already have share#B). Also doing this would make + # the share size somewhat variable (one-digit sharenumbers will be a + # byte shorter than two-digit sharenumbers), unless we zero-pad the + # sharenumbers based upon the max_total_shares declared in + # set_params. class ICodecDecoder(Interface): def set_serialized_params(params): """Set up the parameters of this encoder, from a string returned by encoder.get_serialized_params().""" + def get_required_shares(): + """Return the number of shares needed to reconstruct the data. + set_serialized_params() must be called before this.""" + def decode(some_shares): """Decode a partial list of shares into data. diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index c067c303..5cd01e06 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -17,3 +17,8 @@ class Basic(unittest.TestCase): self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3']) c.all_peers = [] self.failUnlessEqual(c.permute_peerids("one"), []) + + c2 = client.Client("") + c2.all_peers = ["%d" % i for i in range(5)] + self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2']) + diff --git a/src/allmydata/test/test_codec.py b/src/allmydata/test/test_codec.py index cec61bb3..d073ae8c 100644 --- a/src/allmydata/test/test_codec.py +++ b/src/allmydata/test/test_codec.py @@ -10,17 +10,23 @@ class Tester: #enc_class = PyRSEncoder #dec_class = PyRSDecoder - def do_test(self, size, required_shares, total_shares): + def do_test(self, size, required_shares, max_shares, fewer_shares=None): data0 = os.urandom(size) enc = self.enc_class() - enc.set_params(size, required_shares, total_shares) + enc.set_params(size, required_shares, max_shares) serialized_params = enc.get_serialized_params() log.msg("serialized_params: %s" % serialized_params) d = enc.encode(data0) - def _done(shares): - self.failUnlessEqual(len(shares), total_shares) + def _done_encoding_all(shares): + self.failUnlessEqual(len(shares), max_shares) self.shares = shares - d.addCallback(_done) + d.addCallback(_done_encoding_all) + if fewer_shares is not None: + # also validate that the num_shares= parameter works + d.addCallback(lambda res: enc.encode(data0, fewer_shares)) + def _check_fewer_shares(some_shares): + self.failUnlessEqual(len(some_shares), fewer_shares) + d.addCallback(_check_fewer_shares) def _decode(shares): dec = self.dec_class() @@ -91,7 +97,7 @@ class Tester: def test_encode2(self): if os.uname()[1] == "slave3" and self.enc_class == PyRSEncoder: raise unittest.SkipTest("slave3 is really slow") - return self.do_test(123, 25, 100) + return self.do_test(123, 25, 100, 90) def test_sizes(self): raise unittest.SkipTest("omg this would take forever") @@ -114,13 +120,13 @@ class BenchPyRS(unittest.TestCase): def test_big(self): size = 10000 required_shares = 25 - total_shares = 100 + max_shares = 100 # this lets us use a persistent lookup table, stored outside the # _trial_temp directory (which is deleted each time trial is run) os.symlink("../ffield.lut.8", "ffield.lut.8") enc = self.enc_class() self.start() - enc.set_params(size, required_shares, total_shares) + enc.set_params(size, required_shares, max_shares) serialized_params = enc.get_serialized_params() print "encoder ready", self.stop() self.start() @@ -132,7 +138,7 @@ class BenchPyRS(unittest.TestCase): now_shares = time.time() print "shares ready", self.stop() self.start() - self.failUnlessEqual(len(shares), total_shares) + self.failUnlessEqual(len(shares), max_shares) d.addCallback(_done) d.addCallback(lambda res: enc.encode(data0)) d.addCallback(_done) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 6c43460c..5ca474e2 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -116,16 +116,20 @@ class SystemTest(unittest.TestCase): d1 = u.upload_data(DATA) return d1 d.addCallback(_do_upload) - def _upload_done(verifierid): - log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid)) + def _upload_done(uri): + log.msg("upload finished: uri is %s" % (uri,)) dl = self.clients[1].getServiceNamed("downloader") - d1 = dl.download_to_data(verifierid) + d1 = dl.download_to_data(uri) return d1 d.addCallback(_upload_done) def _download_done(data): log.msg("download finished") self.failUnlessEqual(data, DATA) d.addCallback(_download_done) + def _oops(res): + log.msg("oops, an error orccurred, finishing: %s" % res) + return res + d.addErrback(_oops) return d test_upload_and_download.timeout = 20 diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 9d3e44c8..d0f74560 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -1,9 +1,10 @@ from twisted.trial import unittest from twisted.internet import defer +from twisted.application import service from cStringIO import StringIO -from allmydata import upload +from allmydata import upload, download class StringBucketProxy: # This is for unit tests: make a StringIO look like a RIBucketWriter. @@ -64,8 +65,10 @@ class FakeClient: return defer.fail(IndexError("no connection to that peer")) return defer.succeed(peer) + class NextPeerUploader(upload.FileUploader): - def _got_all_peers(self, res): + _size = 100 + def _got_enough_peers(self, res): return res class NextPeer(unittest.TestCase): @@ -81,12 +84,12 @@ class NextPeer(unittest.TestCase): for peerid, bucketnum in expected] self.failUnlessEqual(u.landlords, exp) + VERIFIERID = "\x00" * 20 def test_0(self): c = FakeClient([]) u = NextPeerUploader(c) - u._verifierid = "verifierid" - u._shares = 2 - u._share_size = 100 + u.set_verifierid(self.VERIFIERID) + u.set_params(2, 2, 2) d = u.start() def _check(f): f.trap(upload.NotEnoughPeersError) @@ -97,9 +100,8 @@ class NextPeer(unittest.TestCase): def test_1(self): c = FakeClient(self.responses) u = NextPeerUploader(c) - u._verifierid = "verifierid" - u._shares = 2 - u._share_size = 100 + u.set_verifierid(self.VERIFIERID) + u.set_params(2, 2, 2) d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 2) @@ -112,9 +114,8 @@ class NextPeer(unittest.TestCase): def test_2(self): c = FakeClient(self.responses) u = NextPeerUploader(c) - u._verifierid = "verifierid" - u._shares = 3 - u._share_size = 100 + u.set_verifierid(self.VERIFIERID) + u.set_params(3, 3, 3) d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 3) @@ -135,9 +136,8 @@ class NextPeer(unittest.TestCase): def test_3(self): c = FakeClient(self.responses2) u = NextPeerUploader(c) - u._verifierid = "verifierid" - u._shares = 3 - u._share_size = 100 + u.set_verifierid(self.VERIFIERID) + u.set_params(3, 3, 3) d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 3) @@ -158,9 +158,8 @@ class NextPeer(unittest.TestCase): def test_4(self): c = FakeClient(self.responses3) u = NextPeerUploader(c) - u._verifierid = "verifierid" - u._shares = 4 - u._share_size = 100 + u.set_verifierid(self.VERIFIERID) + u.set_params(4, 4, 4) d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 4) @@ -171,3 +170,88 @@ class NextPeer(unittest.TestCase): ]) d.addCallback(_check) return d + + +class FakePeer2: + def __init__(self, peerid): + self.peerid = peerid + self.data = "" + + def callRemote(self, methname, *args, **kwargs): + if methname == "allocate_bucket": + return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs) + if methname == "write": + return defer.maybeDeferred(self._write, *args, **kwargs) + if methname == "set_metadata": + return defer.maybeDeferred(self._set_metadata, *args, **kwargs) + if methname == "close": + return defer.maybeDeferred(self._close, *args, **kwargs) + return defer.maybeDeferred(self._bad_name, methname) + + def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary): + self.allocated_size = size + return self + def _write(self, data): + self.data = self.data + data + def _set_metadata(self, metadata): + self.metadata = metadata + def _close(self): + pass + def _bad_name(self, methname): + raise NameError("FakePeer2 has no such method named '%s'" % methname) + +class FakeClient2: + nodeid = "fakeclient" + def __init__(self, max_peers): + self.peers = [] + for peerid in range(max_peers): + self.peers.append(FakePeer2(str(peerid))) + + def permute_peerids(self, key, max_peers): + assert max_peers == None + return [str(i) for i in range(len(self.peers))] + + def get_remote_service(self, peerid, name): + peer = self.peers[int(peerid)] + if not peer: + return defer.fail(IndexError("no connection to that peer")) + return defer.succeed(peer) + +class Uploader(unittest.TestCase): + def setUp(self): + node = self.node = FakeClient2(10) + u = self.u = upload.Uploader() + u.running = 1 + u.parent = node + + def _check(self, uri): + self.failUnless(isinstance(uri, str)) + self.failUnless(uri.startswith("URI:")) + verifierid, params = download.unpack_uri(uri) + self.failUnless(isinstance(verifierid, str)) + self.failUnlessEqual(len(verifierid), 20) + self.failUnless(isinstance(params, str)) + peers = self.node.peers + self.failUnlessEqual(peers[0].allocated_size, + len(peers[0].data)) + def testData(self): + data = "This is some data to upload" + d = self.u.upload_data(data) + d.addCallback(self._check) + return d + + def testFileHandle(self): + data = "This is some data to upload" + d = self.u.upload_filehandle(StringIO(data)) + d.addCallback(self._check) + return d + + def testFilename(self): + fn = "Uploader-testFilename.data" + f = open(fn, "w") + data = "This is some data to upload" + f.write(data) + f.close() + d = self.u.upload_filename(fn) + d.addCallback(self._check) + return d diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 28908318..5121bc93 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -5,7 +5,8 @@ from twisted.internet import defer from twisted.application import service from foolscap import Referenceable -from allmydata.util import idlib +from allmydata.util import idlib, bencode +from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata import codec from cStringIO import StringIO @@ -22,54 +23,84 @@ class HaveAllPeersError(Exception): class TooFullError(Exception): pass + class FileUploader: debug = False def __init__(self, peer): self._peer = peer + def set_params(self, min_shares, target_goodness, max_shares): + self.min_shares = min_shares + self.target_goodness = target_goodness + self.max_shares = max_shares + def set_filehandle(self, filehandle): self._filehandle = filehandle filehandle.seek(0, 2) self._size = filehandle.tell() filehandle.seek(0) - def make_encoder(self): - self._needed_shares = 4 - self._shares = 4 - self._encoder = codec.Encoder(self._filehandle, self._shares) - self._share_size = self._size - def set_verifierid(self, vid): assert isinstance(vid, str) + assert len(vid) == 20 self._verifierid = vid def start(self): - log.msg("starting upload") + """Start uploading the file. + + The source of the data to be uploaded must have been set before this + point by calling set_filehandle(). + + This method returns a Deferred that will fire with the URI (a + string).""" + + log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),)) if self.debug: print "starting upload" + assert self.min_shares + assert self.target_goodness + + # create the encoder, so we can know how large the shares will be + total_shares = self.max_shares + needed_shares = self.min_shares + self._encoder = codec.ReplicatingEncoder() + self._encoder.set_params(self._size, needed_shares, total_shares) + self._share_size = self._encoder.get_share_size() + # first step: who should we upload to? - # maybe limit max_peers to 2*len(self.shares), to reduce memory - # footprint + # We will talk to at most max_peers (which can be None to mean no + # limit). Maybe limit max_peers to 2*len(self.shares), to reduce + # memory footprint. For now, make it unlimited. max_peers = None self.permuted = self._peer.permute_peerids(self._verifierid, max_peers) + self._total_peers = len(self.permuted) for p in self.permuted: assert isinstance(p, str) # we will shrink self.permuted as we give up on peers self.peer_index = 0 self.goodness_points = 0 - self.target_goodness = self._shares self.landlords = [] # list of (peerid, bucket_num, remotebucket) d = defer.maybeDeferred(self._check_next_peer) - d.addCallback(self._got_all_peers) + d.addCallback(self._got_enough_peers) + d.addCallback(self._compute_uri) return d + def _compute_uri(self, params): + return "URI:%s" % bencode.bencode((self._verifierid, params)) + def _check_next_peer(self): + if self.debug: + log.msg("FileUploader._check_next_peer: %d permuted, %d goodness" + " (want %d), have %d landlords, %d total peers" % + (len(self.permuted), self.goodness_points, + self.target_goodness, len(self.landlords), + self._total_peers)) if len(self.permuted) == 0: # there are no more to check raise NotEnoughPeersError("%s goodness, want %s, have %d " @@ -98,7 +129,8 @@ class FileUploader: print " peerid %s will grant us a lease" % idlib.b2a(peerid) self.landlords.append( (peerid, bucket_num, bucket) ) self.goodness_points += 1 - if self.goodness_points >= self.target_goodness: + if (self.goodness_points >= self.target_goodness and + len(self.landlords) >= self.min_shares): if self.debug: print " we're done!" raise HaveAllPeersError() # otherwise we fall through to allocate more peers @@ -129,11 +161,52 @@ class FileUploader: d.addBoth(_done_with_peer) return d - def _got_all_peers(self, res): - d = self._encoder.do_upload(self.landlords) - d.addCallback(lambda res: self._verifierid) + def _got_enough_peers(self, res): + landlords = self.landlords + if self.debug: + log.msg("FileUploader._got_enough_peers") + log.msg(" %d landlords" % len(landlords)) + if len(landlords) < 20: + log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0]) + for l in landlords])) + log.msg(" buckets: %s" % " ".join([str(l[1]) + for l in landlords])) + # assign shares to landlords + self.sharemap = {} + for peerid, bucket_num, bucket in landlords: + self.sharemap[bucket_num] = bucket + # the sharemap should have exactly len(landlords) shares, with + # no holes + assert sorted(self.sharemap.keys()) == range(len(landlords)) + # encode all the data at once: this class does not use segmentation + data = self._filehandle.read() + d = self._encoder.encode(data, len(landlords)) + d.addCallback(self._send_all_shares) + d.addCallback(lambda res: self._encoder.get_serialized_params()) + return d + + def _send_one_share(self, bucket, sharedata, metadata): + d = bucket.callRemote("write", sharedata) + d.addCallback(lambda res: + bucket.callRemote("set_metadata", metadata)) + d.addCallback(lambda res: + bucket.callRemote("close")) return d + def _send_all_shares(self, shares): + dl = [] + for share in shares: + (sharenum,sharedata) = share + if self.debug: + log.msg(" writing share %d" % sharenum) + metadata = bencode.bencode(sharenum) + assert len(sharedata) == self._share_size + assert isinstance(sharedata, str) + bucket = self.sharemap[sharenum] + d = self._send_one_share(bucket, sharedata, metadata) + dl.append(d) + return DeferredListShouldSucceed(dl) + def netstring(s): return "%d:%s," % (len(s), s) @@ -175,24 +248,35 @@ class Uploader(service.MultiService): """I am a service that allows file uploading. """ name = "uploader" + uploader_class = FileUploader + debug = False def _compute_verifierid(self, f): hasher = sha.new(netstring("allmydata_v1_verifierid")) f.seek(0) - hasher.update(f.read()) + data = f.read() + hasher.update(data)#f.read()) f.seek(0) # note: this is only of the plaintext data, no encryption yet return hasher.digest() def upload(self, f): + # this returns (verifierid, encoding_params) assert self.parent assert self.running f = IUploadable(f) fh = f.get_filehandle() - u = FileUploader(self.parent) + u = self.uploader_class(self.parent) + if self.debug: + u.debug = True u.set_filehandle(fh) + # TODO: change this to (2,2,4) once Foolscap is fixed to allow + # connect-to-self and Client is fixed to include ourselves in the + # peerlist. Otherwise this usually fails because we give a share to + # the eventual downloader, and they won't try to get a share from + # themselves. + u.set_params(2, 3, 4) u.set_verifierid(self._compute_verifierid(fh)) - u.make_encoder() d = u.start() def _done(res): f.close_filehandle(fh) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py new file mode 100644 index 00000000..ca62e5ae --- /dev/null +++ b/src/allmydata/util/deferredutil.py @@ -0,0 +1,17 @@ + +from twisted.internet import defer + +# utility wrapper for DeferredList +def _check_deferred_list(results): + # if any of the component Deferreds failed, return the first failure such + # that an addErrback() would fire. If all were ok, return a list of the + # results (without the success/failure booleans) + for success,f in results: + if not success: + return f + return [r[1] for r in results] +def DeferredListShouldSucceed(dl): + d = defer.DeferredList(dl) + d.addCallback(_check_deferred_list) + return d + diff --git a/src/allmydata/util/idlib.py b/src/allmydata/util/idlib.py index b447a08d..cd6882db 100644 --- a/src/allmydata/util/idlib.py +++ b/src/allmydata/util/idlib.py @@ -1,7 +1,14 @@ from base64 import b32encode, b32decode def b2a(i): + assert isinstance(i, str), "tried to idlib.b2a non-string '%s'" % (i,) return b32encode(i).lower() def a2b(i): - return b32decode(i.upper()) + assert isinstance(i, str), "tried to idlib.a2b non-string '%s'" % (i,) + try: + return b32decode(i.upper()) + except TypeError: + print "b32decode failed on a %s byte string '%s'" % (len(i), i) + raise + diff --git a/src/allmydata/vdrive.py b/src/allmydata/vdrive.py index 22e012d0..bbb3eda5 100644 --- a/src/allmydata/vdrive.py +++ b/src/allmydata/vdrive.py @@ -84,8 +84,9 @@ class VDrive(service.MultiService): d = self.dirpath(dir_or_path) def _got_dir(dirnode): d1 = ul.upload(uploadable) - d1.addCallback(lambda vid: - dirnode.callRemote("add_file", name, vid)) + def _add(uri): + return dirnode.callRemote("add_file", name, uri) + d1.addCallback(_add) return d1 d.addCallback(_got_dir) def _done(res):