def set_shareholders(self, landlords):
assert isinstance(landlords, dict)
for k in landlords:
- # it would be nice to:
- #assert RIBucketWriter.providedBy(landlords[k])
- assert IStorageBucketWriter(landlords[k])
- pass
+ assert IStorageBucketWriter.providedBy(landlords[k])
self.landlords = landlords.copy()
def start(self):
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
sharenums=SetOf(int, maxLength=MAX_BUCKETS),
- sharesize=int, blocksize=int, canary=Referenceable):
+ allocated_size=int, canary=Referenceable):
"""
@param canary: If the canary is lost before close(), the bucket is deleted.
@return: tuple of (alreadygot, allocated), where alreadygot is what we
space += bw.allocated_size()
return space
- def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
+ def remote_allocate_buckets(self, storage_index, sharenums, allocated_size,
canary):
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_s = idlib.b2a(storage_index)
- space_per_bucket = sharesize
+ space_per_bucket = allocated_size
no_limits = self.sizelimit is None
yes_limits = not no_limits
if yes_limits:
start of uri_extension
"""
+def allocated_size(data_size, num_segments, num_share_hashes,
+ uri_extension_size):
+ wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
+ uri_extension_size)
+ uri_extension_starts_at = wbp._offsets['uri_extension']
+ return uri_extension_starts_at + 4 + uri_extension_size
+
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref, data_size, segment_size, num_segments,
- num_share_hashes):
+ num_share_hashes, uri_extension_size):
self._rref = rref
self._segment_size = segment_size
+ self._num_segments = num_segments
HASH_SIZE = interfaces.HASH_SIZE
self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+ # we commit to not sending a uri extension larger than this
+ self._uri_extension_size = uri_extension_size
offsets = self._offsets = {}
x = 0x1c
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
- if segmentnum < self._segment_size-1:
- assert len(data) == self._segment_size
+ if segmentnum < self._num_segments-1:
+ precondition(len(data) == self._segment_size,
+ len(data), self._segment_size)
else:
- assert len(data) <= self._segment_size
+ precondition(len(data) <= self._segment_size,
+ len(data), self._segment_size)
return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
- assert len(data) == self._share_hash_size
+ precondition(len(data) == self._share_hash_size,
+ len(data), self._share_hash_size)
assert offset + len(data) <= self._offsets['uri_extension']
return self._write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
+ assert len(data) <= self._uri_extension_size
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
implements(IStorageBucketReader)
def __init__(self, rref):
self._rref = rref
+ self._started = False
def startIfNecessary(self):
if self._started:
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python.failure import Failure
-from foolscap import eventual
from allmydata import encode, download, hashtree
from allmydata.util import hashutil
from allmydata.uri import pack_uri
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
from cStringIO import StringIO
-class FakePeer:
- def __init__(self, mode="good"):
- self.ss = FakeStorageServer(mode)
-
- def callRemote(self, methname, *args, **kwargs):
- def _call():
- meth = getattr(self, methname)
- return meth(*args, **kwargs)
- return defer.maybeDeferred(_call)
-
- def get_service(self, sname):
- assert sname == "storageserver"
- return self.ss
-
-class FakeStorageServer:
- def __init__(self, mode):
- self.mode = mode
- def callRemote(self, methname, *args, **kwargs):
- def _call():
- meth = getattr(self, methname)
- return meth(*args, **kwargs)
- d = eventual.fireEventually()
- d.addCallback(lambda res: _call())
- return d
- def allocate_buckets(self, crypttext_hash, sharenums, shareize, blocksize, canary):
- if self.mode == "full":
- return (set(), {},)
- elif self.mode == "already got them":
- return (set(sharenums), {},)
- else:
- return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
-
class LostPeerError(Exception):
pass
def flip_bit(good): # flips the last bit
return good[:-1] + chr(ord(good[-1]) ^ 0x01)
-class FakeBucketWriter:
+class FakeBucketWriterProxy:
implements(IStorageBucketWriter, IStorageBucketReader)
# these are used for both reading and writing
def __init__(self, mode="good"):
shareholders = {}
all_shareholders = []
for shnum in range(NUM_SHARES):
- peer = FakeBucketWriter()
+ peer = FakeBucketWriterProxy()
shareholders[shnum] = peer
all_shareholders.append(peer)
e.set_shareholders(shareholders)
all_peers = []
for shnum in range(NUM_SHARES):
mode = bucket_modes.get(shnum, "good")
- peer = FakeBucketWriter(mode)
+ peer = FakeBucketWriterProxy(mode)
shareholders[shnum] = peer
e.set_shareholders(shareholders)
plaintext_hasher = hashutil.plaintext_hasher()
from twisted.internet import defer
from foolscap import Referenceable
import os.path
-from allmydata import storageserver, interfaces
+from allmydata import interfaces
from allmydata.util import fileutil, hashutil
+from allmydata.storageserver import BucketWriter, BucketReader, \
+ WriteBucketProxy, ReadBucketProxy, StorageServer
class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
- bw = storageserver.BucketWriter(self, incoming, final, 200)
+ bw = BucketWriter(self, incoming, final, 200)
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25)
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
- bw = storageserver.BucketWriter(self, incoming, final, 200)
+ bw = BucketWriter(self, incoming, final, 200)
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short
bw.remote_close()
# now read from it
- br = storageserver.BucketReader(final)
+ br = BucketReader(final)
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
- bw = storageserver.BucketWriter(self, incoming, final, size)
+ bw = BucketWriter(self, incoming, final, size)
rb = RemoteBucket()
rb.target = bw
return bw, rb, final
def test_create(self):
bw, rb, final = self.make_bucket("test_create", 500)
- bp = storageserver.WriteBucketProxy(rb,
- data_size=300,
- segment_size=10,
- num_segments=5,
- num_share_hashes=3)
+ bp = WriteBucketProxy(rb,
+ data_size=300,
+ segment_size=10,
+ num_segments=5,
+ num_share_hashes=3,
+ uri_extension_size=500)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
def test_readwrite(self):
uri_extension = "s" + "E"*498 + "e"
bw, rb, final = self.make_bucket("test_readwrite", 1406)
- bp = storageserver.WriteBucketProxy(rb,
- data_size=100,
- segment_size=25,
- num_segments=4,
- num_share_hashes=3)
+ bp = WriteBucketProxy(rb,
+ data_size=100,
+ segment_size=25,
+ num_segments=4,
+ num_share_hashes=3,
+ uri_extension_size=len(uri_extension))
d = bp.start()
d.addCallback(lambda res: bp.put_block(0, "a"*25))
# now read everything back
def _start_reading(res):
- br = storageserver.BucketReader(final)
+ br = BucketReader(final)
rb = RemoteBucket()
rb.target = br
- rbp = storageserver.ReadBucketProxy(rb)
+ rbp = ReadBucketProxy(rb)
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
- d1 = rbp.start()
+ d1 = rbp.startIfNecessary()
d1.addCallback(lambda res: rbp.get_block(0))
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
d1.addCallback(lambda res: rbp.get_block(1))
def create(self, name, sizelimit=None):
workdir = self.workdir(name)
- ss = storageserver.StorageServer(workdir, sizelimit)
+ ss = StorageServer(workdir, sizelimit)
ss.setServiceParent(self.sparent)
return ss
from twisted.trial import unittest
from twisted.python.failure import Failure
+from twisted.internet import defer
from cStringIO import StringIO
-from allmydata import upload, encode
+from allmydata import upload, encode, storageserver
from allmydata.uri import unpack_uri, unpack_lit
+from allmydata.util.assertutil import precondition
+from foolscap import eventual
-from test_encode import FakePeer
+class FakePeer:
+ def __init__(self, mode="good"):
+ self.ss = FakeStorageServer(mode)
+
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ meth = getattr(self, methname)
+ return meth(*args, **kwargs)
+ return defer.maybeDeferred(_call)
+
+ def get_service(self, sname):
+ assert sname == "storageserver"
+ return self.ss
+
+class FakeStorageServer:
+ def __init__(self, mode):
+ self.mode = mode
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ meth = getattr(self, methname)
+ return meth(*args, **kwargs)
+ d = eventual.fireEventually()
+ d.addCallback(lambda res: _call())
+ return d
+
+ def allocate_buckets(self, crypttext_hash, sharenums,
+ share_size, blocksize, canary):
+ #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
+ if self.mode == "full":
+ return (set(), {},)
+ elif self.mode == "already got them":
+ return (set(sharenums), {},)
+ else:
+ return (set(),
+ dict([( shnum, FakeBucketWriter(share_size) )
+ for shnum in sharenums]),
+ )
+
+class FakeBucketWriter:
+ # a diagnostic version of storageserver.BucketWriter
+ def __init__(self, size):
+ self.data = StringIO()
+ self.closed = False
+ self._size = size
+
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ meth = getattr(self, "remote_" + methname)
+ return meth(*args, **kwargs)
+ d = eventual.fireEventually()
+ d.addCallback(lambda res: _call())
+ return d
+
+ def remote_write(self, offset, data):
+ precondition(not self.closed)
+ precondition(offset >= 0)
+ precondition(offset+len(data) <= self._size,
+ "offset=%d + data=%d > size=%d" %
+ (offset, len(data), self._size))
+ self.data.seek(offset)
+ self.data.write(data)
+
+ def remote_close(self):
+ precondition(not self.closed)
+ self.closed = True
class FakeClient:
def __init__(self, mode="good"):
from foolscap import Referenceable
from allmydata.util import idlib, hashutil
-from allmydata import encode, storageserver
+from allmydata import encode, storageserver, hashtree
from allmydata.uri import pack_uri, pack_lit
from allmydata.interfaces import IUploadable, IUploader
from allmydata.Crypto.Cipher import AES
class TooFullError(Exception):
pass
+# our current uri_extension is 846 bytes for small files, a few bytes
+# more for larger ones (since the filesize is encoded in decimal in a
+# few places). Ask for a little bit more just in case we need it. If
+# the extension changes size, we can change EXTENSION_SIZE to
+# allocate a more accurate amount of space.
+EXTENSION_SIZE = 1000
+
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, num_segments, num_share_hashes,
self.connection = connection # to an RIClient
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
+ #print "PeerTracker", peerid, permutedid, sharesize
+ as = storageserver.allocated_size(sharesize,
+ num_segments,
+ num_share_hashes,
+ EXTENSION_SIZE)
+ self.allocated_size = as
+
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
def _got_storageserver(self, storageserver):
self._storageserver = storageserver
def _query(self, sharenums):
+ #print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets",
self.crypttext_hash,
- sharenums, self.sharesize,
- self.blocksize,
+ sharenums,
+ self.allocated_size,
canary=Referenceable())
d.addCallback(self._got_reply)
return d
bp = storageserver.WriteBucketProxy(rref, self.sharesize,
self.blocksize,
self.num_segments,
- self.num_share_hashes)
+ self.num_share_hashes,
+ EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
# responsible for handling the data and sending out the shares.
peers = self._client.get_permuted_peers(self._crypttext_hash)
assert peers
+
# TODO: eek, don't pull this from here, find a better way. gross.
num_segments = self._encoder.uri_extension_data['num_segments']
- from allmydata.util.mathutil import next_power_of_k
- import math
- num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1)
+ ht = hashtree.IncompleteHashTree(self.total_shares)
+ # this needed_hashes computation should mirror
+ # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
+ # (instead of a HashTree) because we don't require actual hashing
+ # just to count the levels.
+ num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
+
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
num_segments, num_share_hashes,
if ring[0][1] == SHARE:
sharenums_to_query.add(ring[0][2])
else:
- d = peer.query(sharenums_to_query)
- d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
- outstanding_queries.append(d)
- d.addErrback(log.err)
+ if True or sharenums_to_query:
+ d = peer.query(sharenums_to_query)
+ d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
+ outstanding_queries.append(d)
+ d.addErrback(log.err)
peer = ring[0][2]
sharenums_to_query = set()
ring.rotate(-1)