from foolscap import Referenceable
from twisted.application import service
-from twisted.python.failure import Failure
+from twisted.python import log
from allmydata.util import idlib
from zope.interface import implements
from allmydata.interfaces import RIBucketWriter, RIBucketReader
from allmydata.util.assertutil import precondition, _assert
-class NoSuchBucketError(Failure):
- pass
-
class BucketStore(service.MultiService, Referenceable):
def __init__(self, store_dir):
precondition(os.path.isdir(store_dir))
def __init__(self, bucket_dir, verifierid, bucket_num, size):
Bucket.__init__(self, bucket_dir, verifierid)
precondition(not os.path.exists(bucket_dir))
+ #log.msg("WriteBucket [%s]: creating bucket %s"
+ # % (idlib.b2a(verifierid), bucket_dir))
os.mkdir(bucket_dir)
self._open = True
def close(self):
precondition(self._bytes_written == self._size)
+ #log.msg("WriteBucket.close [%s] (%s)"
+ # % (idlib.b2a(self._verifierid), self._bucket_dir))
self._data.close()
self._write_attr('closed', '')
self._open = False
# sort of at most max_count elements
results = []
for nodeid in self.all_peers:
+ assert isinstance(nodeid, str)
permuted = sha.new(key + nodeid).digest()
results.append((permuted, nodeid))
results.sort()
implements(ICodecEncoder)
ENCODER_TYPE = 0
- def set_params(self, data_size, required_shares, total_shares):
+ def set_params(self, data_size, required_shares, max_shares):
self.data_size = data_size
self.required_shares = required_shares
- self.total_shares = total_shares
+ self.max_shares = max_shares
def get_encoder_type(self):
return self.ENCODER_TYPE
def get_share_size(self):
return self.data_size
- def encode(self, data):
- shares = [(i,data) for i in range(self.total_shares)]
+ def encode(self, data, num_shares=None):
+ if num_shares is None:
+ num_shares = self.max_shares
+ assert num_shares <= self.max_shares
+ shares = [(i,data) for i in range(num_shares)]
return defer.succeed(shares)
class ReplicatingDecoder(object):
def set_serialized_params(self, params):
self.required_shares = int(params)
+ def get_required_shares(self):
+ return self.required_shares
+
def decode(self, some_shares):
assert len(some_shares) >= self.required_shares
data = some_shares[0][1]
# than 20 minutes to run the test_encode_share tests, so I disabled most
# of them. (uh, hello, it's running figleaf)
- def set_params(self, data_size, required_shares, total_shares):
- assert required_shares <= total_shares
+ def set_params(self, data_size, required_shares, max_shares):
+ assert required_shares <= max_shares
self.data_size = data_size
self.required_shares = required_shares
- self.total_shares = total_shares
+ self.max_shares = max_shares
self.chunk_size = required_shares
self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size)
self.last_chunk_padding = mathutil.pad_size(data_size, required_shares)
self.share_size = self.num_chunks
- self.encoder = rs_code.RSCode(total_shares, required_shares, 8)
+ self.encoder = rs_code.RSCode(max_shares, required_shares, 8)
def get_encoder_type(self):
return self.ENCODER_TYPE
def get_serialized_params(self):
return "%d:%d:%d" % (self.data_size, self.required_shares,
- self.total_shares)
+ self.max_shares)
def get_share_size(self):
return self.share_size
- def encode(self, data):
- share_data = [ [] for i in range(self.total_shares)]
+ def encode(self, data, num_shares=None):
+ if num_shares is None:
+ num_shares = self.max_shares
+ assert num_shares <= self.max_shares
+ # we create self.max_shares shares, then throw out any extra ones
+ # so that we always return exactly num_shares shares.
+
+ share_data = [ [] for i in range(self.max_shares)]
for i in range(self.num_chunks):
# we take self.chunk_size bytes from the input string, and
- # turn it into self.total_shares bytes.
+ # turn it into self.max_shares bytes.
offset = i*self.chunk_size
# Note string slices aren't an efficient way to use memory, so
# when we upgrade from the unusably slow py_ecc prototype to a
input_vector = [ord(x) for x in chunk]
assert len(input_vector) == self.required_shares
output_vector = self.encoder.Encode(input_vector)
- assert len(output_vector) == self.total_shares
+ assert len(output_vector) == self.max_shares
for i2,out in enumerate(output_vector):
share_data[i2].append(chr(out))
shares = [ (i, "".join(share_data[i]))
- for i in range(self.total_shares) ]
+ for i in range(num_shares) ]
return defer.succeed(shares)
class PyRSDecoder(object):
pieces = params.split(":")
self.data_size = int(pieces[0])
self.required_shares = int(pieces[1])
- self.total_shares = int(pieces[2])
+ self.max_shares = int(pieces[2])
self.chunk_size = self.required_shares
self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
self.last_chunk_padding = mathutil.pad_size(self.data_size,
self.required_shares)
self.share_size = self.num_chunks
- self.encoder = rs_code.RSCode(self.total_shares, self.required_shares,
+ self.encoder = rs_code.RSCode(self.max_shares, self.required_shares,
8)
if False:
print "chunk_size: %d" % self.chunk_size
print "num_chunks: %d" % self.num_chunks
print "last_chunk_padding: %d" % self.last_chunk_padding
print "share_size: %d" % self.share_size
- print "total_shares: %d" % self.total_shares
+ print "max_shares: %d" % self.max_shares
print "required_shares: %d" % self.required_shares
+ def get_required_shares(self):
+ return self.required_shares
+
def decode(self, some_shares):
chunk_size = self.chunk_size
assert len(some_shares) >= self.required_shares
# this takes one byte from each share, and turns the combination
# into a single chunk
received_vector = []
- for j in range(self.total_shares):
+ for j in range(self.max_shares):
share = have_shares.get(j)
if share is not None:
received_vector.append(ord(share[i]))
-import os
+import os, sha
from zope.interface import Interface, implements
from twisted.python import failure, log
from twisted.internet import defer
from twisted.application import service
-from allmydata.util import idlib
+from allmydata.util import idlib, bencode
+from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
class NotEnoughPeersError(Exception):
# we use this to jump out of the loop
pass
+def unpack_uri(uri):
+ assert uri.startswith("URI:")
+ return bencode.bdecode(uri[4:])
+
class FileDownloader:
debug = False
- def __init__(self, peer, verifierid):
+ def __init__(self, peer, verifierid, encoding_params):
self._peer = peer
assert isinstance(verifierid, str)
+ assert len(verifierid) == 20
self._verifierid = verifierid
+ self._decoder = codec.ReplicatingDecoder()
+ self._decoder.set_serialized_params(encoding_params)
+ self.needed_shares = self._decoder.get_required_shares()
def set_download_target(self, target):
self._target = target
def _cancel(self):
pass
- def make_decoder(self):
- n = self._shares = 4
- k = self._desired_shares = 2
- self._target.open()
- self._decoder = codec.Decoder(self._target, k, n,
- self._verifierid)
-
def start(self):
- log.msg("starting download")
+ log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting download"
# first step: who should we download from?
bucket_nums)
self.landlords.append( (peerid, buckets) )
- if len(self.landlords) >= self._desired_shares:
+ if len(self.landlords) >= self.needed_shares:
if self.debug: print " we're done!"
raise HaveAllPeersError
# otherwise we fall through to search more peers
all_buckets = []
for peerid, buckets in self.landlords:
all_buckets.extend(buckets)
- d = self._decoder.start(all_buckets)
+ # TODO: try to avoid pulling multiple shares from the same peer
+ all_buckets = all_buckets[:self.needed_shares]
+ # retrieve all shares
+ dl = []
+ shares = []
+ for (bucket_num, bucket) in all_buckets:
+ d0 = bucket.callRemote("get_metadata")
+ d1 = bucket.callRemote("read")
+ d2 = DeferredListShouldSucceed([d0, d1])
+ def _got(res):
+ sharenum_s, sharedata = res
+ sharenum = bencode.bdecode(sharenum_s)
+ shares.append((sharenum, sharedata))
+ d2.addCallback(_got)
+ dl.append(d2)
+ d = DeferredListShouldSucceed(dl)
+
+ d.addCallback(lambda res: self._decoder.decode(shares))
+
+ def _write(data):
+ self._target.open()
+ hasher = sha.new(netstring("allmydata_v1_verifierid"))
+ hasher.update(data)
+ vid = hasher.digest()
+ assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
+ self._target.write(data)
+ d.addCallback(_write)
+
def _done(res):
self._target.close()
return self._target.finish()
"""
implements(IDownloader)
name = "downloader"
+ debug = False
- def download(self, verifierid, t):
+ def download(self, uri, t):
+ (verifierid, params) = unpack_uri(uri)
assert self.parent
assert self.running
assert isinstance(verifierid, str)
t = IDownloadTarget(t)
assert t.write
assert t.close
- dl = FileDownloader(self.parent, verifierid)
+ dl = FileDownloader(self.parent, verifierid, params)
dl.set_download_target(t)
- dl.make_decoder()
+ if self.debug:
+ dl.debug = True
d = dl.start()
return d
# utility functions
- def download_to_data(self, verifierid):
- return self.download(verifierid, Data())
- def download_to_filename(self, verifierid, filename):
- return self.download(verifierid, FileName(filename))
- def download_to_filehandle(self, verifierid, filehandle):
- return self.download(verifierid, FileHandle(filehandle))
+ def download_to_data(self, uri):
+ return self.download(uri, Data())
+ def download_to_filename(self, uri, filename):
+ return self.download(uri, FileName(filename))
+ def download_to_filehandle(self, uri, filehandle):
+ return self.download(uri, FileHandle(filehandle))
segment_plaintext = self.infile.read(self.segment_size)
segment_crypttext = self.cryptor.encrypt(segment_plaintext)
del segment_plaintext
+ assert self.encoder.max_shares == self.num_shares
d = self.encoder.encode(segment_crypttext)
d.addCallback(self._encoded_segment)
return d
return self.make_subnode(absname)
remote_add_directory = add_directory
- def add_file(self, name, data):
+ def add_file(self, name, uri):
self.validate_name(name)
f = open(os.path.join(self._basedir, name), "wb")
- f.write(data)
+ f.write(uri)
f.close()
remote_add_file = add_file
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
PBURL = StringConstraint(150)
Verifierid = StringConstraint(20)
+URI = StringConstraint(100) # kind of arbitrary
ShareData = StringConstraint(100000)
# these four are here because Foolscap does not yet support the kind of
# restriction I really want to apply to these.
def add_directory(name=str):
return RIMutableDirectoryNode_
- def add_file(name=str, data=Verifierid):
+ def add_file(name=str, uri=URI):
return None
def remove(name=str):
class ICodecEncoder(Interface):
- def set_params(data_size, required_shares, total_shares):
+ def set_params(data_size, required_shares, max_shares):
"""Set up the parameters of this encoder.
See encode() for a description of how these parameters are used.
"""Return the length of the shares that encode() will produce.
"""
- def encode(data):
+ def encode(data, num_shares=None):
"""Encode a chunk of data. This may be called multiple times. Each
call is independent.
The data must be a string with a length that exactly matches the
data_size promised by set_params().
+ 'num_shares', if provided, must be equal or less than the
+ 'max_shares' set in set_params. If 'num_shares' is left at None, this
+ method will produce 'max_shares' shares. This can be used to minimize
+ the work that the encoder needs to do if we initially thought that we
+ would need, say, 100 shares, but now that it is time to actually
+ encode the data we only have 75 peers to send data to.
+
For each call, encode() will return a Deferred that fires with a list
of 'total_shares' tuples. Each tuple is of the form (sharenum,
- share), where sharenum is an int (from 0 total_shares-1), and share
- is a string. The get_share_size() method can be used to determine the
- length of the 'share' strings returned by encode().
+ sharedata), where sharenum is an int (from 0 total_shares-1), and
+ sharedata is a string. The get_share_size() method can be used to
+ determine the length of the 'sharedata' strings returned by encode().
+
+ The (sharenum, sharedata) tuple must be kept together during storage
+ and retrieval. Specifically, the share data is useless by itself: the
+ decoder needs to be told which share is which by providing it with
+ both the share number and the actual share data.
The memory usage of this function is expected to be on the order of
total_shares * get_share_size().
"""
+ # design note: we could embed the share number in the sharedata by
+ # returning bencode((sharenum,sharedata)). The advantage would be
+ # making it easier to keep these two pieces together, and probably
+ # avoiding a round trip when reading the remote bucket (although this
+ # could be achieved by changing RIBucketReader.read to
+ # read_data_and_metadata). The disadvantage is that the share number
+ # wants to be exposed to the storage/bucket layer (specifically to
+ # handle the next stage of peer-selection algorithm in which we
+ # propose to keep share#A on a given peer and they are allowed to
+ # tell us that they already have share#B). Also doing this would make
+ # the share size somewhat variable (one-digit sharenumbers will be a
+ # byte shorter than two-digit sharenumbers), unless we zero-pad the
+ # sharenumbers based upon the max_total_shares declared in
+ # set_params.
class ICodecDecoder(Interface):
def set_serialized_params(params):
"""Set up the parameters of this encoder, from a string returned by
encoder.get_serialized_params()."""
+ def get_required_shares():
+ """Return the number of shares needed to reconstruct the data.
+ set_serialized_params() must be called before this."""
+
def decode(some_shares):
"""Decode a partial list of shares into data.
self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
c.all_peers = []
self.failUnlessEqual(c.permute_peerids("one"), [])
+
+ c2 = client.Client("")
+ c2.all_peers = ["%d" % i for i in range(5)]
+ self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
+
#enc_class = PyRSEncoder
#dec_class = PyRSDecoder
- def do_test(self, size, required_shares, total_shares):
+ def do_test(self, size, required_shares, max_shares, fewer_shares=None):
data0 = os.urandom(size)
enc = self.enc_class()
- enc.set_params(size, required_shares, total_shares)
+ enc.set_params(size, required_shares, max_shares)
serialized_params = enc.get_serialized_params()
log.msg("serialized_params: %s" % serialized_params)
d = enc.encode(data0)
- def _done(shares):
- self.failUnlessEqual(len(shares), total_shares)
+ def _done_encoding_all(shares):
+ self.failUnlessEqual(len(shares), max_shares)
self.shares = shares
- d.addCallback(_done)
+ d.addCallback(_done_encoding_all)
+ if fewer_shares is not None:
+ # also validate that the num_shares= parameter works
+ d.addCallback(lambda res: enc.encode(data0, fewer_shares))
+ def _check_fewer_shares(some_shares):
+ self.failUnlessEqual(len(some_shares), fewer_shares)
+ d.addCallback(_check_fewer_shares)
def _decode(shares):
dec = self.dec_class()
def test_encode2(self):
if os.uname()[1] == "slave3" and self.enc_class == PyRSEncoder:
raise unittest.SkipTest("slave3 is really slow")
- return self.do_test(123, 25, 100)
+ return self.do_test(123, 25, 100, 90)
def test_sizes(self):
raise unittest.SkipTest("omg this would take forever")
def test_big(self):
size = 10000
required_shares = 25
- total_shares = 100
+ max_shares = 100
# this lets us use a persistent lookup table, stored outside the
# _trial_temp directory (which is deleted each time trial is run)
os.symlink("../ffield.lut.8", "ffield.lut.8")
enc = self.enc_class()
self.start()
- enc.set_params(size, required_shares, total_shares)
+ enc.set_params(size, required_shares, max_shares)
serialized_params = enc.get_serialized_params()
print "encoder ready", self.stop()
self.start()
now_shares = time.time()
print "shares ready", self.stop()
self.start()
- self.failUnlessEqual(len(shares), total_shares)
+ self.failUnlessEqual(len(shares), max_shares)
d.addCallback(_done)
d.addCallback(lambda res: enc.encode(data0))
d.addCallback(_done)
d1 = u.upload_data(DATA)
return d1
d.addCallback(_do_upload)
- def _upload_done(verifierid):
- log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid))
+ def _upload_done(uri):
+ log.msg("upload finished: uri is %s" % (uri,))
dl = self.clients[1].getServiceNamed("downloader")
- d1 = dl.download_to_data(verifierid)
+ d1 = dl.download_to_data(uri)
return d1
d.addCallback(_upload_done)
def _download_done(data):
log.msg("download finished")
self.failUnlessEqual(data, DATA)
d.addCallback(_download_done)
+ def _oops(res):
+ log.msg("oops, an error orccurred, finishing: %s" % res)
+ return res
+ d.addErrback(_oops)
return d
test_upload_and_download.timeout = 20
from twisted.trial import unittest
from twisted.internet import defer
+from twisted.application import service
from cStringIO import StringIO
-from allmydata import upload
+from allmydata import upload, download
class StringBucketProxy:
# This is for unit tests: make a StringIO look like a RIBucketWriter.
return defer.fail(IndexError("no connection to that peer"))
return defer.succeed(peer)
+
class NextPeerUploader(upload.FileUploader):
- def _got_all_peers(self, res):
+ _size = 100
+ def _got_enough_peers(self, res):
return res
class NextPeer(unittest.TestCase):
for peerid, bucketnum in expected]
self.failUnlessEqual(u.landlords, exp)
+ VERIFIERID = "\x00" * 20
def test_0(self):
c = FakeClient([])
u = NextPeerUploader(c)
- u._verifierid = "verifierid"
- u._shares = 2
- u._share_size = 100
+ u.set_verifierid(self.VERIFIERID)
+ u.set_params(2, 2, 2)
d = u.start()
def _check(f):
f.trap(upload.NotEnoughPeersError)
def test_1(self):
c = FakeClient(self.responses)
u = NextPeerUploader(c)
- u._verifierid = "verifierid"
- u._shares = 2
- u._share_size = 100
+ u.set_verifierid(self.VERIFIERID)
+ u.set_params(2, 2, 2)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 2)
def test_2(self):
c = FakeClient(self.responses)
u = NextPeerUploader(c)
- u._verifierid = "verifierid"
- u._shares = 3
- u._share_size = 100
+ u.set_verifierid(self.VERIFIERID)
+ u.set_params(3, 3, 3)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 3)
def test_3(self):
c = FakeClient(self.responses2)
u = NextPeerUploader(c)
- u._verifierid = "verifierid"
- u._shares = 3
- u._share_size = 100
+ u.set_verifierid(self.VERIFIERID)
+ u.set_params(3, 3, 3)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 3)
def test_4(self):
c = FakeClient(self.responses3)
u = NextPeerUploader(c)
- u._verifierid = "verifierid"
- u._shares = 4
- u._share_size = 100
+ u.set_verifierid(self.VERIFIERID)
+ u.set_params(4, 4, 4)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 4)
])
d.addCallback(_check)
return d
+
+
+class FakePeer2:
+ def __init__(self, peerid):
+ self.peerid = peerid
+ self.data = ""
+
+ def callRemote(self, methname, *args, **kwargs):
+ if methname == "allocate_bucket":
+ return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs)
+ if methname == "write":
+ return defer.maybeDeferred(self._write, *args, **kwargs)
+ if methname == "set_metadata":
+ return defer.maybeDeferred(self._set_metadata, *args, **kwargs)
+ if methname == "close":
+ return defer.maybeDeferred(self._close, *args, **kwargs)
+ return defer.maybeDeferred(self._bad_name, methname)
+
+ def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary):
+ self.allocated_size = size
+ return self
+ def _write(self, data):
+ self.data = self.data + data
+ def _set_metadata(self, metadata):
+ self.metadata = metadata
+ def _close(self):
+ pass
+ def _bad_name(self, methname):
+ raise NameError("FakePeer2 has no such method named '%s'" % methname)
+
+class FakeClient2:
+ nodeid = "fakeclient"
+ def __init__(self, max_peers):
+ self.peers = []
+ for peerid in range(max_peers):
+ self.peers.append(FakePeer2(str(peerid)))
+
+ def permute_peerids(self, key, max_peers):
+ assert max_peers == None
+ return [str(i) for i in range(len(self.peers))]
+
+ def get_remote_service(self, peerid, name):
+ peer = self.peers[int(peerid)]
+ if not peer:
+ return defer.fail(IndexError("no connection to that peer"))
+ return defer.succeed(peer)
+
+class Uploader(unittest.TestCase):
+ def setUp(self):
+ node = self.node = FakeClient2(10)
+ u = self.u = upload.Uploader()
+ u.running = 1
+ u.parent = node
+
+ def _check(self, uri):
+ self.failUnless(isinstance(uri, str))
+ self.failUnless(uri.startswith("URI:"))
+ verifierid, params = download.unpack_uri(uri)
+ self.failUnless(isinstance(verifierid, str))
+ self.failUnlessEqual(len(verifierid), 20)
+ self.failUnless(isinstance(params, str))
+ peers = self.node.peers
+ self.failUnlessEqual(peers[0].allocated_size,
+ len(peers[0].data))
+ def testData(self):
+ data = "This is some data to upload"
+ d = self.u.upload_data(data)
+ d.addCallback(self._check)
+ return d
+
+ def testFileHandle(self):
+ data = "This is some data to upload"
+ d = self.u.upload_filehandle(StringIO(data))
+ d.addCallback(self._check)
+ return d
+
+ def testFilename(self):
+ fn = "Uploader-testFilename.data"
+ f = open(fn, "w")
+ data = "This is some data to upload"
+ f.write(data)
+ f.close()
+ d = self.u.upload_filename(fn)
+ d.addCallback(self._check)
+ return d
from twisted.application import service
from foolscap import Referenceable
-from allmydata.util import idlib
+from allmydata.util import idlib, bencode
+from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
from cStringIO import StringIO
class TooFullError(Exception):
pass
+
class FileUploader:
debug = False
def __init__(self, peer):
self._peer = peer
+ def set_params(self, min_shares, target_goodness, max_shares):
+ self.min_shares = min_shares
+ self.target_goodness = target_goodness
+ self.max_shares = max_shares
+
def set_filehandle(self, filehandle):
self._filehandle = filehandle
filehandle.seek(0, 2)
self._size = filehandle.tell()
filehandle.seek(0)
- def make_encoder(self):
- self._needed_shares = 4
- self._shares = 4
- self._encoder = codec.Encoder(self._filehandle, self._shares)
- self._share_size = self._size
-
def set_verifierid(self, vid):
assert isinstance(vid, str)
+ assert len(vid) == 20
self._verifierid = vid
def start(self):
- log.msg("starting upload")
+ """Start uploading the file.
+
+ The source of the data to be uploaded must have been set before this
+ point by calling set_filehandle().
+
+ This method returns a Deferred that will fire with the URI (a
+ string)."""
+
+ log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting upload"
+ assert self.min_shares
+ assert self.target_goodness
+
+ # create the encoder, so we can know how large the shares will be
+ total_shares = self.max_shares
+ needed_shares = self.min_shares
+ self._encoder = codec.ReplicatingEncoder()
+ self._encoder.set_params(self._size, needed_shares, total_shares)
+ self._share_size = self._encoder.get_share_size()
+
# first step: who should we upload to?
- # maybe limit max_peers to 2*len(self.shares), to reduce memory
- # footprint
+ # We will talk to at most max_peers (which can be None to mean no
+ # limit). Maybe limit max_peers to 2*len(self.shares), to reduce
+ # memory footprint. For now, make it unlimited.
max_peers = None
self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+
self._total_peers = len(self.permuted)
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
self.peer_index = 0
self.goodness_points = 0
- self.target_goodness = self._shares
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
d = defer.maybeDeferred(self._check_next_peer)
- d.addCallback(self._got_all_peers)
+ d.addCallback(self._got_enough_peers)
+ d.addCallback(self._compute_uri)
return d
+ def _compute_uri(self, params):
+ return "URI:%s" % bencode.bencode((self._verifierid, params))
+
def _check_next_peer(self):
+ if self.debug:
+ log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
+ " (want %d), have %d landlords, %d total peers" %
+ (len(self.permuted), self.goodness_points,
+ self.target_goodness, len(self.landlords),
+ self._total_peers))
if len(self.permuted) == 0:
# there are no more to check
raise NotEnoughPeersError("%s goodness, want %s, have %d "
print " peerid %s will grant us a lease" % idlib.b2a(peerid)
self.landlords.append( (peerid, bucket_num, bucket) )
self.goodness_points += 1
- if self.goodness_points >= self.target_goodness:
+ if (self.goodness_points >= self.target_goodness and
+ len(self.landlords) >= self.min_shares):
if self.debug: print " we're done!"
raise HaveAllPeersError()
# otherwise we fall through to allocate more peers
d.addBoth(_done_with_peer)
return d
- def _got_all_peers(self, res):
- d = self._encoder.do_upload(self.landlords)
- d.addCallback(lambda res: self._verifierid)
+ def _got_enough_peers(self, res):
+ landlords = self.landlords
+ if self.debug:
+ log.msg("FileUploader._got_enough_peers")
+ log.msg(" %d landlords" % len(landlords))
+ if len(landlords) < 20:
+ log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0])
+ for l in landlords]))
+ log.msg(" buckets: %s" % " ".join([str(l[1])
+ for l in landlords]))
+ # assign shares to landlords
+ self.sharemap = {}
+ for peerid, bucket_num, bucket in landlords:
+ self.sharemap[bucket_num] = bucket
+ # the sharemap should have exactly len(landlords) shares, with
+ # no holes
+ assert sorted(self.sharemap.keys()) == range(len(landlords))
+ # encode all the data at once: this class does not use segmentation
+ data = self._filehandle.read()
+ d = self._encoder.encode(data, len(landlords))
+ d.addCallback(self._send_all_shares)
+ d.addCallback(lambda res: self._encoder.get_serialized_params())
+ return d
+
+ def _send_one_share(self, bucket, sharedata, metadata):
+ d = bucket.callRemote("write", sharedata)
+ d.addCallback(lambda res:
+ bucket.callRemote("set_metadata", metadata))
+ d.addCallback(lambda res:
+ bucket.callRemote("close"))
return d
+ def _send_all_shares(self, shares):
+ dl = []
+ for share in shares:
+ (sharenum,sharedata) = share
+ if self.debug:
+ log.msg(" writing share %d" % sharenum)
+ metadata = bencode.bencode(sharenum)
+ assert len(sharedata) == self._share_size
+ assert isinstance(sharedata, str)
+ bucket = self.sharemap[sharenum]
+ d = self._send_one_share(bucket, sharedata, metadata)
+ dl.append(d)
+ return DeferredListShouldSucceed(dl)
+
def netstring(s):
return "%d:%s," % (len(s), s)
"""I am a service that allows file uploading.
"""
name = "uploader"
+ uploader_class = FileUploader
+ debug = False
def _compute_verifierid(self, f):
hasher = sha.new(netstring("allmydata_v1_verifierid"))
f.seek(0)
- hasher.update(f.read())
+ data = f.read()
+ hasher.update(data)#f.read())
f.seek(0)
# note: this is only of the plaintext data, no encryption yet
return hasher.digest()
def upload(self, f):
+ # this returns (verifierid, encoding_params)
assert self.parent
assert self.running
f = IUploadable(f)
fh = f.get_filehandle()
- u = FileUploader(self.parent)
+ u = self.uploader_class(self.parent)
+ if self.debug:
+ u.debug = True
u.set_filehandle(fh)
+ # TODO: change this to (2,2,4) once Foolscap is fixed to allow
+ # connect-to-self and Client is fixed to include ourselves in the
+ # peerlist. Otherwise this usually fails because we give a share to
+ # the eventual downloader, and they won't try to get a share from
+ # themselves.
+ u.set_params(2, 3, 4)
u.set_verifierid(self._compute_verifierid(fh))
- u.make_encoder()
d = u.start()
def _done(res):
f.close_filehandle(fh)
--- /dev/null
+
+from twisted.internet import defer
+
+# utility wrapper for DeferredList
+def _check_deferred_list(results):
+ # if any of the component Deferreds failed, return the first failure such
+ # that an addErrback() would fire. If all were ok, return a list of the
+ # results (without the success/failure booleans)
+ for success,f in results:
+ if not success:
+ return f
+ return [r[1] for r in results]
+def DeferredListShouldSucceed(dl):
+ d = defer.DeferredList(dl)
+ d.addCallback(_check_deferred_list)
+ return d
+
from base64 import b32encode, b32decode
def b2a(i):
+ assert isinstance(i, str), "tried to idlib.b2a non-string '%s'" % (i,)
return b32encode(i).lower()
def a2b(i):
- return b32decode(i.upper())
+ assert isinstance(i, str), "tried to idlib.a2b non-string '%s'" % (i,)
+ try:
+ return b32decode(i.upper())
+ except TypeError:
+ print "b32decode failed on a %s byte string '%s'" % (len(i), i)
+ raise
+
d = self.dirpath(dir_or_path)
def _got_dir(dirnode):
d1 = ul.upload(uploadable)
- d1.addCallback(lambda vid:
- dirnode.callRemote("add_file", name, vid))
+ def _add(uri):
+ return dirnode.callRemote("add_file", name, uri)
+ d1.addCallback(_add)
return d1
d.addCallback(_got_dir)
def _done(res):