From: Brian Warner Date: Fri, 30 Mar 2007 18:53:03 +0000 (-0700) Subject: switch upload to use encode_new, fix a few things (but not nearly all of them) X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/file/URI:LIT:krugkidfnzsc4/(%5B%5E?a=commitdiff_plain;h=9a2e0cf28e792d6ddd407e5a8ea7128e6f00fdbb;p=tahoe-lafs%2Ftahoe-lafs.git switch upload to use encode_new, fix a few things (but not nearly all of them) --- diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py index 7cbd9f4d..2e9efa84 100644 --- a/src/allmydata/codec.py +++ b/src/allmydata/codec.py @@ -39,9 +39,6 @@ class ReplicatingEncoder(object): def get_serialized_params(self): return "%d" % self.required_shares - def get_share_size(self): - return self.data_size - def get_block_size(self): return self.data_size @@ -97,9 +94,6 @@ class CRSEncoder(object): return "%d-%d-%d" % (self.data_size, self.required_shares, self.max_shares) - def get_share_size(self): - return self.share_size - def get_block_size(self): return self.share_size diff --git a/src/allmydata/encode_new.py b/src/allmydata/encode_new.py index 2105d260..5d0458d7 100644 --- a/src/allmydata/encode_new.py +++ b/src/allmydata/encode_new.py @@ -75,6 +75,8 @@ PiB=1024*TiB class Encoder(object): implements(IEncoder) + NEEDED_SHARES = 25 + TOTAL_SHARES = 100 def setup(self, infile): self.infile = infile @@ -82,28 +84,37 @@ class Encoder(object): self.file_size = infile.tell() infile.seek(0, 0) - self.num_shares = 100 - self.required_shares = 25 + self.num_shares = self.TOTAL_SHARES + self.required_shares = self.NEEDED_SHARES self.segment_size = min(2*MiB, self.file_size) + self.setup_codec() - def get_reservation_size(self): + def setup_codec(self): + self._codec = CRSEncoder() + self._codec.set_params(self.segment_size, self.required_shares, + self.num_shares) + + def get_share_size(self): share_size = mathutil.div_ceil(self.file_size, self.required_shares) overhead = self.compute_overhead() return share_size + overhead def compute_overhead(self): return 0 + def get_block_size(self): + return self._codec.get_block_size() def set_shareholders(self, landlords): self.landlords = landlords.copy() def start(self): + #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) self.num_segments = mathutil.div_ceil(self.file_size, self.segment_size) self.share_size = mathutil.div_ceil(self.file_size, self.required_shares) self.setup_encryption() - self.setup_encoder() + self.setup_codec() d = defer.succeed(None) for i in range(self.num_segments): d.addCallback(lambda res: self.do_segment(i)) @@ -124,11 +135,6 @@ class Encoder(object): # that we sent to that landlord. self.share_root_hashes = [None] * self.num_shares - def setup_encoder(self): - self.encoder = CRSEncoder() - self.encoder.set_params(self.segment_size, self.required_shares, - self.num_shares) - def do_segment(self, segnum): chunks = [] # the ICodecEncoder API wants to receive a total of self.segment_size @@ -137,7 +143,7 @@ class Encoder(object): # these pieces need to be the same size as the share which the codec # will generate. Therefore we must feed it with input_piece_size that # equals the output share size. - input_piece_size = self.encoder.get_share_size() + input_piece_size = self._codec.get_block_size() # as a result, the number of input pieces per encode() call will be # equal to the number of required shares with which the codec was @@ -154,7 +160,7 @@ class Encoder(object): input_piece += ('\x00' * (input_piece_size - len(input_piece))) encrypted_piece = self.cryptor.encrypt(input_piece) chunks.append(encrypted_piece) - d = self.encoder.encode(chunks) + d = self._codec.encode(chunks) d.addCallback(self._encoded_segment) return d diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 1cfb5dae..cc235779 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -165,10 +165,6 @@ class ICodecEncoder(Interface): """Return the length of the shares that encode() will produce. """ - def get_share_size(): - """Return the length of the shares that encode() will produce. - """ - def encode_proposal(data, desired_share_ids=None): """Encode some data. @@ -332,11 +328,25 @@ class IEncoder(Interface): before calling get_reservation_size(). """ - def get_reservation_size(): + def get_share_size(): """I return the size of the data that will be stored on each - shareholder. It is useful to determine this size before asking - potential shareholders whether they will grant a lease or not, since - their answers will depend upon how much space we need. + shareholder. This is aggregate amount of data that will be sent to + the shareholder, summed over all the put_block() calls I will ever + make. + + TODO: this might also include some amount of overhead, like the size + of all the hashes. We need to decide whether this is useful or not. + + It is useful to determine this size before asking potential + shareholders whether they will grant a lease or not, since their + answers will depend upon how much space we need. + """ + + def get_block_size(): # TODO: can we avoid exposing this? + """I return the size of the individual blocks that will be delivered + to a shareholder's put_block() method. By knowing this, the + shareholder will be able to keep all blocks in a single file and + still provide random access when reading them. """ def set_shareholders(shareholders): diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index aa176434..0eb54222 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -62,6 +62,7 @@ class UpDown(unittest.TestCase): NUM_SHARES = 100 assert e.num_shares == NUM_SHARES # else we'll be completely confused e.segment_size = 25 # force use of multiple segments + e.setup_codec() # need to rebuild the codec for that change NUM_SEGMENTS = 4 assert (NUM_SEGMENTS-1)*e.segment_size < len(data) <= NUM_SEGMENTS*e.segment_size shareholders = {} diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index d0f36c20..2fbfd0c3 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -5,7 +5,7 @@ from twisted.application import service from foolscap import Referenceable from allmydata.util import idlib, mathutil -from allmydata import codec +from allmydata import encode_new from allmydata.uri import pack_uri from allmydata.interfaces import IUploadable, IUploader @@ -45,8 +45,6 @@ class PeerTracker: class FileUploader: debug = False - ENCODERCLASS = codec.CRSEncoder - def __init__(self, client): self._client = client @@ -83,20 +81,16 @@ class FileUploader: assert self.needed_shares # create the encoder, so we can know how large the shares will be - self._encoder = self.ENCODERCLASS() - self._last_seg_encoder = self.ENCODERCLASS() # This one is for encoding the final segment, which might be shorter than the others. - self._codec_name = self._encoder.get_encoder_type() - self._encoder.set_params(self.segment_size, self.needed_shares, self.total_shares) -xyz - - paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) - - self._block_size = self._encoder.get_block_size() + self._encoder = encode_new.Encoder() + self._encoder.setup(infile) + share_size = self._encoder.get_share_size() + block_size = self._encoder.get_block_size() - # first step: who should we upload to? + # we are responsible for locating the shareholders. self._encoder is + # responsible for handling the data and sending out the shares. peers = self._client.get_permuted_peers(self._verifierid) assert peers - trackers = [ (permutedid, PeerTracker(peerid, conn, self._share_size, self._block_size, self._verifierid),) + trackers = [ (permutedid, PeerTracker(peerid, conn, share_size, block_size, self._verifierid),) for permutedid, peerid, conn in peers ] ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ]) @@ -196,7 +190,7 @@ xyz def _compute_uri(self, roothash): params = self._encoder.get_serialized_params() - return pack_uri(self._codec_name, params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size) + return pack_uri(self._encoder.get_encoder_type(), params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size) def netstring(s):