+++ /dev/null
-import os
-
-from foolscap import Referenceable
-from twisted.application import service
-#from twisted.python import log
-from allmydata.util import idlib
-from zope.interface import implements
-from allmydata.interfaces import RIBucketWriter, RIBucketReader
-
-from allmydata.util.assertutil import precondition, _assert
-
-class BucketStore(service.MultiService, Referenceable):
- def __init__(self, store_dir):
- precondition(os.path.isdir(store_dir))
- service.MultiService.__init__(self)
- self._store_dir = store_dir
-
- self._leases = set() # should do weakref dances.
-
- def _get_bucket_dir(self, verifierid):
- avid = idlib.b2a(verifierid)
- return os.path.join(self._store_dir, avid)
-
- def has_bucket(self, verifierid):
- return os.path.exists(self._get_bucket_dir(verifierid))
-
- def allocate_bucket(self, verifierid, bucket_num, size,
- leaser_credentials, canary):
- bucket_dir = self._get_bucket_dir(verifierid)
- precondition(not os.path.exists(bucket_dir))
- precondition(isinstance(bucket_num, int))
- bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
- bucket.set_leaser(leaser_credentials)
- lease = Lease(verifierid, leaser_credentials, bucket, canary)
- self._leases.add(lease)
- return lease
-
- def get_buckets(self, verifierid):
- # for now, only returns those created by this process, in this run
- bucket_dir = self._get_bucket_dir(verifierid)
- if os.path.exists(bucket_dir):
- b = ReadBucket(bucket_dir, verifierid)
- return [(b.get_bucket_num(), b)]
- else:
- return []
-
-class Lease(Referenceable):
- implements(RIBucketWriter)
-
- def __init__(self, verifierid, leaser, bucket, canary):
- self._leaser = leaser
- self._verifierid = verifierid
- self._bucket = bucket
- canary.notifyOnDisconnect(self._lost_canary)
-
- def get_bucket(self):
- return self._bucket
-
- def remote_write(self, data):
- self._bucket.write(data)
-
- def remote_set_metadata(self, metadata):
- self._bucket.set_metadata(metadata)
-
- def remote_close(self):
- self._bucket.close()
-
- def _lost_canary(self):
- pass
-
-class Bucket:
- def __init__(self, bucket_dir, verifierid):
- self._bucket_dir = bucket_dir
- self._verifierid = verifierid
-
- def _write_attr(self, name, val):
- f = file(os.path.join(self._bucket_dir, name), 'wb')
- f.write(val)
- f.close()
-
- def _read_attr(self, name):
- f = file(os.path.join(self._bucket_dir, name), 'rb')
- data = f.read()
- f.close()
- return data
-
- def is_complete(self):
- return os.path.exists(os.path.join(self._bucket_dir, 'closed'))
-
-class WriteBucket(Bucket):
- def __init__(self, bucket_dir, verifierid, bucket_num, size):
- Bucket.__init__(self, bucket_dir, verifierid)
- precondition(not os.path.exists(bucket_dir))
- #log.msg("WriteBucket [%s]: creating bucket %s"
- # % (idlib.b2a(verifierid), bucket_dir))
- os.mkdir(bucket_dir)
-
- self._open = True
- self._size = size
- self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
- self._bytes_written = 0
-
- self._write_attr('bucket_num', str(bucket_num))
-
- def set_leaser(self, leaser):
- self._write_attr('leases', leaser)
-
- def write(self, data):
- precondition(self._open)
- precondition(len(data) + self._bytes_written <= self._size)
- self._data.write(data)
- self._data.flush()
- self._bytes_written += len(data)
-
- def set_metadata(self, metadata):
- precondition(self._open)
- self._write_attr('metadata', metadata)
-
- def close(self):
- precondition(self._bytes_written == self._size)
- #log.msg("WriteBucket.close [%s] (%s)"
- # % (idlib.b2a(self._verifierid), self._bucket_dir))
- self._data.close()
- self._write_attr('closed', '')
- self._open = False
-
- def is_complete(self):
- complete = Bucket.is_complete(self)
- if complete:
- _assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
- return complete
-
-class ReadBucket(Bucket, Referenceable):
- implements(RIBucketReader)
-
- def __init__(self, bucket_dir, verifierid):
- Bucket.__init__(self, bucket_dir, verifierid)
- precondition(self.is_complete()) # implicitly asserts bucket_dir exists
-
- def get_bucket_num(self):
- return int(self._read_attr('bucket_num'))
-
- def read(self):
- return self._read_attr('data')
- remote_read = read
-
- def get_metadata(self):
- return self._read_attr('metadata')
- remote_get_metadata = get_metadata
def get_share_size(self):
return self.data_size
+ def get_block_size(self):
+ return self.data_size
+
def encode(self, inshares, desired_shareids=None):
assert isinstance(inshares, list)
for inshare in inshares:
def set_serialized_params(self, params):
self.required_shares = int(params)
- def get_required_shares(self):
+ def get_needed_shares(self):
return self.required_shares
def decode(self, some_shares, their_shareids):
def get_share_size(self):
return self.share_size
+ def get_block_size(self):
+ return self.share_size
+
def encode(self, inshares, desired_share_ids=None):
precondition(desired_share_ids is None or len(desired_share_ids) <= self.max_shares, desired_share_ids, self.max_shares)
print "max_shares: %d" % self.max_shares
print "required_shares: %d" % self.required_shares
- def get_required_shares(self):
+ def get_needed_shares(self):
return self.required_shares
def decode(self, some_shares, their_shareids):
-import os
# 'app' is overwritten by manhole when the connection is established. We set
# it to None now to keep pyflakes from complaining.
app = None
-def get_random_bucket_on(nodeid, size=200):
- d = app.get_remote_service(nodeid, 'storageserver')
- def get_bucket(rss):
- return rss.callRemote('allocate_bucket',
- verifierid=os.urandom(20),
- bucket_num=26,
- size=size,
- leaser=app.tub.tubID,
- )
- d.addCallback(get_bucket)
- return d
-
-def write_to_bucket(bucket, bytes=100):
- return bucket.callRemote('write', data=os.urandom(bytes))
-
-import os, sha
+import os, random, sha
from zope.interface import implements
-from twisted.python import failure, log
+from twisted.python import log
from twisted.internet import defer
from twisted.application import service
-from allmydata.util import idlib, bencode
+from allmydata.util import idlib, bencode, mathutil
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
+from allmydata.Crypto.Cipher import AES
from allmydata.uri import unpack_uri
from allmydata.interfaces import IDownloadTarget, IDownloader
# we use this to jump out of the loop
pass
+
+class Output:
+ def __init__(self, downloadable, key):
+ self.downloadable = downloadable
+ self._decryptor = AES.new(key=key, mode=AES.MODE_CTR,
+ counterstart="\x00"*16)
+ self._verifierid_hasher = sha.new(netstring("allmydata_v1_verifierid"))
+ self._fileid_hasher = sha.new(netstring("allmydata_v1_fileid"))
+ def write(self, crypttext):
+ self._verifierid_hasher.update(crypttext)
+ plaintext = self._decryptor.decrypt(crypttext)
+ self._fileid_hasher.update(plaintext)
+ self.downloadable.write(plaintext)
+ def finish(self):
+ self.downloadable.close()
+ return self.downloadable.finish()
+
+class BlockDownloader:
+ def __init__(self, bucket, blocknum, parent):
+ self.bucket = bucket
+ self.blocknum = blocknum
+ self.parent = parent
+
+ def start(self, segnum):
+ d = self.bucket.callRemote('get_block', segnum)
+ d.addCallbacks(self._hold_block, self._got_block_error)
+ return d
+
+ def _hold_block(self, data):
+ self.parent.hold_block(self.blocknum, data)
+
+ def _got_block_error(self, f):
+ self.parent.bucket_failed(self.blocknum, self.bucket)
+
+class SegmentDownloader:
+ def __init__(self, segmentnumber, needed_shares):
+ self.segmentnumber = segmentnumber
+ self.needed_blocks = needed_shares
+ self.blocks = {} # k: blocknum, v: data
+
+ def start(self):
+ return self._download()
+
+ def _download(self):
+ d = self._try()
+ def _done(res):
+ if len(self.blocks) >= self.needed_blocks:
+ return self.blocks
+ else:
+ return self._download()
+ d.addCallback(_done)
+ return d
+
+ def _try(self):
+ while len(self.parent.active_buckets) < self.needed_blocks:
+ # need some more
+ otherblocknums = list(set(self.parent._share_buckets.keys()) - set(self.parent.active_buckets.keys()))
+ if not otherblocknums:
+ raise NotEnoughPeersError
+ blocknum = random.choice(otherblocknums)
+ self.parent.active_buckets[blocknum] = random.choice(self.parent._share_buckets[blocknum])
+
+ # Now we have enough buckets, in self.parent.active_buckets.
+ l = []
+ for blocknum, bucket in self.parent.active_buckets.iteritems():
+ bd = BlockDownloader(bucket, blocknum, self)
+ d = bd.start(self.segmentnumber)
+ l.append(d)
+ return defer.DeferredList(l)
+
+ def hold_block(self, blocknum, data):
+ self.blocks[blocknum] = data
+
+ def bucket_failed(self, shnum, bucket):
+ del self.parent.active_buckets[shnum]
+ s = self.parent._share_buckets[shnum]
+ s.remove(bucket)
+ if not s:
+ del self.parent._share_buckets[shnum]
+
class FileDownloader:
debug = False
- def __init__(self, peer, uri):
- self._peer = peer
- (codec_name, codec_params, verifierid) = unpack_uri(uri)
+ def __init__(self, client, uri, downloadable):
+ self._client = client
+ self._downloadable = downloadable
+ (codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size) = unpack_uri(uri)
assert isinstance(verifierid, str)
assert len(verifierid) == 20
self._verifierid = verifierid
+ self._roothash = roothash
self._decoder = codec.get_decoder_by_name(codec_name)
self._decoder.set_serialized_params(codec_params)
- self.needed_shares = self._decoder.get_required_shares()
+ self._total_segments = mathutil.div_ceil(size, segment_size)
+ self._current_segnum = 0
+ self._segment_size = segment_size
+ self._needed_shares = self._decoder.get_needed_shares()
- def set_download_target(self, target):
- self._target = target
- self._target.register_canceller(self._cancel)
-
- def _cancel(self):
- pass
+ # future:
+ # self._share_hash_tree = ??
+ # self._subshare_hash_trees = {} # k:shnum, v: hashtree
+ # each time we start using a new shnum, we must acquire a share hash
+ # from one of the buckets that provides that shnum, then validate it against
+ # the rest of the share hash tree that they provide. Then, each time we
+ # get a block in that share, we must validate the block against the rest
+ # of the subshare hash tree that that bucket will provide.
def start(self):
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting download"
# first step: who should we download from?
+ self.active_buckets = {} # k: shnum, v: bucket
+ self._share_buckets = {} # k: shnum, v: set of buckets
+
+ key = "\x00" * 16
+ self._output = Output(self._downloadable, key)
+
+ d = defer.maybeDeferred(self._get_all_shareholders)
+ d.addCallback(self._got_all_shareholders)
+ d.addCallback(self._download_all_segments)
+ d.addCallback(self._done)
+ return d
- # maybe limit max_peers to 2*len(self.shares), to reduce memory
- # footprint
- max_peers = None
+ def _get_all_shareholders(self):
+ dl = []
+ for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._verifierid):
+ d = connection.callRemote("get_buckets", self._verifierid)
+ d.addCallbacks(self._got_response, self._got_error,
+ callbackArgs=(connection,))
+ dl.append(d)
+ return defer.DeferredList(dl)
+
+ def _got_response(self, buckets, connection):
+ for sharenum, bucket in buckets:
+ self._share_buckets.setdefault(sharenum, set()).add(bucket)
+
+ def _got_error(self, f):
+ self._client.log("Somebody failed. -- %s" % (f,))
- self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers)
- for p in self.permuted:
- assert isinstance(p, str)
- self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+ def _got_all_shareholders(self, res):
+ if len(self._share_buckets) < self._needed_shares:
+ raise NotEnoughPeersError
- d = defer.maybeDeferred(self._check_next_peer)
- d.addCallback(self._got_all_peers)
+ self.active_buckets = {}
+
+ def _download_all_segments(self):
+ d = self._download_segment(self._current_segnum)
+ def _done(res):
+ if self._current_segnum == self._total_segments:
+ return None
+ return self._download_segment(self._current_segnum)
+ d.addCallback(_done)
return d
- def _check_next_peer(self):
- if len(self.permuted) == 0:
- # there are no more to check
- raise NotEnoughPeersError
- peerid = self.permuted.pop(0)
-
- d = self._peer.get_remote_service(peerid, "storageserver")
- def _got_peer(service):
- bucket_num = len(self.landlords)
- if self.debug: print "asking %s" % idlib.b2a(peerid)
- d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
- def _got_response(buckets):
- if buckets:
- bucket_nums = [num for (num,bucket) in buckets]
- if self.debug:
- print " peerid %s has buckets %s" % (idlib.b2a(peerid),
- bucket_nums)
-
- self.landlords.append( (peerid, buckets) )
- if len(self.landlords) >= self.needed_shares:
- if self.debug: print " we're done!"
- raise HaveAllPeersError
- # otherwise we fall through to search more peers
- d2.addCallback(_got_response)
- return d2
- d.addCallback(_got_peer)
-
- def _done_with_peer(res):
- if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
- if isinstance(res, failure.Failure):
- if res.check(HaveAllPeersError):
- if self.debug: print " all done"
- # we're done!
- return
- if res.check(IndexError):
- if self.debug: print " no connection"
- else:
- if self.debug: print " other error:", res
+ def _download_segment(self, segnum):
+ segmentdler = SegmentDownloader(segnum, self._needed_shares)
+ d = segmentdler.start()
+ d.addCallback(self._decoder.decode)
+ def _done(res):
+ self._current_segnum += 1
+ if self._current_segnum == self._total_segments:
+ data = ''.join(res)
+ padsize = mathutil.pad_size(self._size, self._segment_size)
+ data = data[:-padsize]
+ self.output.write(data)
else:
- if self.debug: print " they had data for us"
- # we get here for either good peers (when we still need more), or
- # after checking a bad peer (and thus still need more). So now we
- # need to grab a new peer.
- return self._check_next_peer()
- d.addBoth(_done_with_peer)
+ for buf in res:
+ self.output.write(buf)
+ d.addCallback(_done)
return d
+ def _done(self, res):
+ return self._output.finish()
+
+ def _write_data(self, data):
+ self._verifierid_hasher.update(data)
+
+
+
+# old stuff
def _got_all_peers(self, res):
all_buckets = []
for peerid, buckets in self.landlords:
class RIStorageServer(RemoteInterface):
def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int),
sharesize=int, blocksize=int, canary=Referenceable_):
- # if the canary is lost before close(), the bucket is deleted
+ """
+ @param canary: If the canary is lost before close(), the bucket is deleted.
+ @return: tuple of (alreadygot, allocated), where alreadygot is what we
+ already have and is what we hereby agree to accept
+ """
return TupleOf(SetOf(int), DictOf(int, RIBucketWriter))
def get_buckets(verifierid=Verifierid):
return DictOf(int, RIBucketReader_)
def get_block(blocknum=int):
return ShareData
def get_block_hashes():
- return ListOf(Hash))
+ return ListOf(Hash)
def get_share_hashes():
return ListOf(TupleOf(int, Hash))
compatible decoder.
"""
+ def get_block_size():
+ """Return the length of the shares that encode() will produce.
+ """
+
def get_share_size():
"""Return the length of the shares that encode() will produce.
"""
"""Set up the parameters of this encoder, from a string returned by
encoder.get_serialized_params()."""
- def get_required_shares():
+ def get_needed_shares():
"""Return the number of shares needed to reconstruct the data.
set_serialized_params() is required to be called before this."""
-import os
+import os, re
from foolscap import Referenceable
from twisted.application import service
from allmydata.util.assertutil import _assert, precondition
# store/
-# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
+# store/incoming # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
# store/$VERIFIERID
# store/$VERIFIERID/$SHARENUM
# store/$VERIFIERID/$SHARENUM/blocksize
class BucketWriter(Referenceable):
implements(RIBucketWriter)
- def __init__(self, tmphome, finalhome, blocksize):
- self.tmphome = tmphome
+ def __init__(self, incominghome, finalhome, blocksize):
+ self.incominghome = incominghome
self.finalhome = finalhome
self.blocksize = blocksize
self.closed = False
self._write_file('blocksize', str(blocksize))
def _write_file(self, fname, data):
- open(os.path.join(tmphome, fname), 'wb').write(data)
+ open(os.path.join(self.incominghome, fname), 'wb').write(data)
def remote_put_block(self, segmentnum, data):
precondition(not self.closed)
assert len(data) == self.blocksize
- f = open(os.path.join(self.tmphome, 'data'), 'wb')
+ f = open(os.path.join(self.incominghome, 'data'), 'wb')
f.seek(self.blocksize*segmentnum)
f.write(data)
def close(self):
precondition(not self.closed)
# TODO assert or check the completeness and consistency of the data that has been written
- fileutil.rename(self.tmphome, self.finalhome)
+ fileutil.rename(self.incominghome, self.finalhome)
self.closed = True
def str2l(s):
def __init__(self, storedir):
fileutil.make_dirs(storedir)
self.storedir = storedir
- self.tmpdir = os.path.join(storedir, 'tmp')
- self._clean_trash()
- fileutil.make_dirs(self.tmpdir)
+ self.incomingdir = os.path.join(storedir, 'incoming')
+ self._clean_incomplete()
+ fileutil.make_dirs(self.incomingdir)
service.MultiService.__init__(self)
- def _clean_trash(self):
- fileutil.rm_dir(self.tmpdir)
+ def _clean_incomplete(self):
+ fileutil.rm_dir(self.incomingdir)
def remote_allocate_buckets(self, verifierid, sharenums, sharesize,
blocksize, canary):
- bucketwriters = {} # k: sharenum, v: BucketWriter
- for sharenum in sharenums:
- tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum)
- finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum)
- bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize)
+ alreadygot = set()
+ bucketwriters = {} # k: shnum, v: BucketWriter
+ for shnum in sharenums:
+ incominghome = os.path.join(self.incomingdir, idlib.a2b(verifierid), "%d"%shnum)
+ finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%shnum)
+ if os.path.exists(incominghome) or os.path.exists(finalhome):
+ alreadygot.add(shnum)
+ else:
+ bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize)
- return bucketwriters
+ return alreadygot, bucketwriters
def remote_get_buckets(self, verifierid):
bucketreaders = {} # k: sharenum, v: BucketReader
from allmydata import upload
from allmydata.uri import unpack_uri
-class StringBucketProxy:
- # This is for unit tests: make a StringIO look like a RIBucketWriter.
-
- def __init__(self):
- self.data = StringIO()
- self.size = None
- self.done = False
-
- def callRemote(self, methname, **kwargs):
- if methname == "write":
- return defer.maybeDeferred(self.write, **kwargs)
- elif methname == "close":
- return defer.maybeDeferred(self.close, **kwargs)
- else:
- return defer.fail(NameError("no such method named %s" % methname))
-
- def write(self, data):
- self.data.write(data)
- def close(self):
- self.done = True
-
-
-class FakePeer:
- def __init__(self, peerid, response):
- self.peerid = peerid
- self.response = response
-
- def callRemote(self, methname, *args, **kwargs):
- assert not args
- return defer.maybeDeferred(self._callRemote, methname, **kwargs)
-
- def _callRemote(self, methname, **kwargs):
- assert methname == "allocate_bucket"
- #assert kwargs["size"] == 100
- assert kwargs["leaser"] == "fakeclient"
- if self.response == "good":
- return self
- raise upload.TooFullError()
+class FakeStorageServer:
+ pass
class FakeClient:
- nodeid = "fakeclient"
- def __init__(self, responses):
- self.peers = []
- for peerid,r in enumerate(responses):
- if r == "disconnected":
- self.peers.append(None)
- else:
- self.peers.append(FakePeer(str(peerid), r))
-
- def get_permuted_connections(self, key):
- return [str(i) for i in range(len(self.peers))]
-
- def get_remote_service(self, peerid, name):
- peer = self.peers[int(peerid)]
- if not peer:
- return defer.fail(IndexError("no connection to that peer"))
- return defer.succeed(peer)
-
-
-class NextPeerUploader(upload.FileUploader):
- _size = 100
- def _got_enough_peers(self, res):
- return res
-
-class NextPeer(unittest.TestCase):
- responses = ["good", # 0
- "full", # 1
- "full", # 2
- "disconnected", # 3
- "good", # 4
- ]
-
- def compare_landlords(self, u, c, expected):
- exp = [(str(peerid), bucketnum, c.peers[peerid])
- for peerid, bucketnum in expected]
- self.failUnlessEqual(u.landlords, exp)
-
- VERIFIERID = "\x00" * 20
- def test_0(self):
- c = FakeClient([])
- u = NextPeerUploader(c)
- u.set_verifierid(self.VERIFIERID)
- u.set_params(2, 2, 2)
- d = u.start()
- def _check(f):
- f.trap(upload.NotEnoughPeersError)
- d.addCallbacks(lambda res: self.fail("this was supposed to fail"),
- _check)
- return d
-
- def test_1(self):
- c = FakeClient(self.responses)
- u = NextPeerUploader(c)
- u.set_verifierid(self.VERIFIERID)
- u.set_params(2, 2, 2)
- d = u.start()
- def _check(res):
- self.failUnlessEqual(u.goodness_points, 2)
- self.compare_landlords(u, c, [(0, 0),
- (4, 1),
- ])
- d.addCallback(_check)
- return d
-
- def test_2(self):
- c = FakeClient(self.responses)
- u = NextPeerUploader(c)
- u.set_verifierid(self.VERIFIERID)
- u.set_params(3, 3, 3)
- d = u.start()
- def _check(res):
- self.failUnlessEqual(u.goodness_points, 3)
- self.compare_landlords(u, c, [(0, 0),
- (4, 1),
- (0, 2),
- ])
- d.addCallback(_check)
- return d
-
- responses2 = ["good", # 0
- "full", # 1
- "full", # 2
- "good", # 3
- "full", # 4
- ]
-
- def test_3(self):
- c = FakeClient(self.responses2)
- u = NextPeerUploader(c)
- u.set_verifierid(self.VERIFIERID)
- u.set_params(3, 3, 3)
- d = u.start()
- def _check(res):
- self.failUnlessEqual(u.goodness_points, 3)
- self.compare_landlords(u, c, [(0, 0),
- (3, 1),
- (0, 2),
- ])
- d.addCallback(_check)
- return d
-
- responses3 = ["good", # 0
- "good", # 1
- "good", # 2
- "good", # 3
- "good", # 4
- ]
-
- def test_4(self):
- c = FakeClient(self.responses3)
- u = NextPeerUploader(c)
- u.set_verifierid(self.VERIFIERID)
- u.set_params(4, 4, 4)
- d = u.start()
- def _check(res):
- self.failUnlessEqual(u.goodness_points, 4)
- self.compare_landlords(u, c, [(0, 0),
- (1, 1),
- (2, 2),
- (3, 3),
- ])
- d.addCallback(_check)
- return d
-
-
-class FakePeer2:
- def __init__(self, peerid):
- self.peerid = peerid
- self.data = ""
-
- def callRemote(self, methname, *args, **kwargs):
- if methname == "allocate_bucket":
- return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs)
- if methname == "write":
- return defer.maybeDeferred(self._write, *args, **kwargs)
- if methname == "set_metadata":
- return defer.maybeDeferred(self._set_metadata, *args, **kwargs)
- if methname == "close":
- return defer.maybeDeferred(self._close, *args, **kwargs)
- return defer.maybeDeferred(self._bad_name, methname)
-
- def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary):
- self.allocated_size = size
- return self
- def _write(self, data):
- self.data = self.data + data
- def _set_metadata(self, metadata):
- self.metadata = metadata
- def _close(self):
- pass
- def _bad_name(self, methname):
- raise NameError("FakePeer2 has no such method named '%s'" % methname)
-
-class FakeClient2:
- nodeid = "fakeclient"
- def __init__(self, num_peers):
- self.peers = []
- for peerid in range(num_peers):
- self.peers.append(FakePeer2(str(peerid)))
-
- def get_permuted_connections(self, key):
- return [str(i) for i in range(len(self.peers))]
-
- def get_remote_service(self, peerid, name):
- peer = self.peers[int(peerid)]
- if not peer:
- return defer.fail(IndexError("no connection to that peer"))
- return defer.succeed(peer)
+ def get_permuted_peers(self, verifierid):
+ return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ]
class Uploader(unittest.TestCase):
def setUp(self):
- node = self.node = FakeClient2(10)
- u = self.u = upload.Uploader()
- u.running = 1
- u.parent = node
+ self.node = FakeClient()
+ self.u = upload.Uploader()
+ self.u.running = True
+ self.u.parent = self.node
def _check(self, uri):
self.failUnless(isinstance(uri, str))
# create the encoder, so we can know how large the shares will be
self._encoder = self.ENCODERCLASS()
+ self._last_seg_encoder = self.ENCODERCLASS() # This one is for encoding the final segment, which might be shorter than the others.
self._codec_name = self._encoder.get_encoder_type()
+ self._encoder.set_params(self.segment_size, self.needed_shares, self.total_shares)
+xyz
+
paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
- self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares)
- self._share_size = self._encoder.get_share_size()
+
+ self._block_size = self._encoder.get_block_size()
# first step: who should we upload to?
peers = self._client.get_permuted_peers(self._verifierid)
assert peers
- trackers = [ (permutedid, PeerTracker(peerid, conn),)
+ trackers = [ (permutedid, PeerTracker(peerid, conn, self._share_size, self._block_size, self._verifierid),)
for permutedid, peerid, conn in peers ]
ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
def _compute_uri(self, roothash):
params = self._encoder.get_serialized_params()
- return pack_uri(self._codec_name, params, self._verifierid, roothash)
+ return pack_uri(self._codec_name, params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size)
def netstring(s):
# enough information to retrieve and validate the contents. It shall be
# expressed in a limited character set (namely [TODO]).
-def pack_uri(codec_name, codec_params, verifierid):
+def pack_uri(codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size):
assert isinstance(codec_name, str)
assert len(codec_name) < 10
assert ":" not in codec_name
assert ":" not in codec_params
assert isinstance(verifierid, str)
assert len(verifierid) == 20 # sha1 hash
- return "URI:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid))
+ return "URI:%s:%s:%s:%s:%s:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid), idlib.b2a(roothash), needed_shares, total_shares, size, segment_size)
def unpack_uri(uri):
assert uri.startswith("URI:")
- header, codec_name, codec_params, verifierid_s = uri.split(":")
+ header, codec_name, codec_params, verifierid_s, roothash_s, needed_shares_s, total_shares_s, size_s, segment_size_s = uri.split(":")
verifierid = idlib.a2b(verifierid_s)
- return codec_name, codec_params, verifierid
+ roothash = idlib.a2b(roothash_s)
+ needed_shares = idlib.a2b(needed_shares_s)
+ total_shares = idlib.a2b(total_shares_s)
+ size = int(size_s)
+ segment_size = int(segment_size_s)
+ return codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size