self.share_hash_tree = share_hash_tree
self._roothash = roothash
self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
+ self.started = False
def get_block(self, blocknum):
+ if not self.started:
+ d = self.bucket.start()
+ def _started(res):
+ self.started = True
+ return self.get_block(blocknum)
+ d.addCallback(_started)
+ return d
+
# the first time we use this bucket, we need to fetch enough elements
# of the share hash tree to validate it from our share hash up to the
# hashroot.
bucket = sources[0]
sources = sources[1:]
#d = bucket.callRemote(methname, *args)
- d = getattr(bucket, methname)(*args)
+ d = bucket.startIfNecessary()
+ d.addCallback(lambda res: getattr(bucket, methname)(*args))
d.addCallback(validatorfunc, bucket)
def _bad(f):
log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD
self.setup_codec() # TODO: duplicate call?
d = defer.succeed(None)
+ for l in self.landlords.values():
+ d.addCallback(lambda res, l=l: l.start())
+
for i in range(self.num_segments-1):
# note to self: this form doesn't work, because lambda only
# captures the slot, not the value
return Nodeid
class RIBucketWriter(RemoteInterface):
+ def write(offset=int, data=ShareData):
+ return None
+
+ def close():
+ """
+ If the data that has been written is incomplete or inconsistent then
+ the server will throw the data away, else it will store it for future
+ retrieval.
+ """
+ return None
+
+class RIBucketReader(RemoteInterface):
+ def read(offset=int, length=int):
+ return ShareData
+
+
+class RIStorageServer(RemoteInterface):
+ def allocate_buckets(storage_index=StorageIndex,
+ sharenums=SetOf(int, maxLength=MAX_BUCKETS),
+ sharesize=int, blocksize=int, canary=Referenceable):
+ """
+ @param canary: If the canary is lost before close(), the bucket is deleted.
+ @return: tuple of (alreadygot, allocated), where alreadygot is what we
+ already have and is what we hereby agree to accept
+ """
+ return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
+ DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
+ def get_buckets(storage_index=StorageIndex):
+ return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
+
+
+class IStorageBucketWriter(Interface):
def put_block(segmentnum=int, data=ShareData):
"""@param data: For most segments, this data will be 'blocksize'
bytes in length. The last segment might be shorter.
write(k + ':' + netstring(dict[k]))
"""
return None
-
def close():
- """
- If the data that has been written is incomplete or inconsistent then
- the server will throw the data away, else it will store it for future
- retrieval.
- """
- return None
+ pass
+
+class IStorageBucketReader(Interface):
-class RIBucketReader(RemoteInterface):
def get_block(blocknum=int):
"""Most blocks will be the same size. The last block might be shorter
than the others.
return URIExtensionData
-class RIStorageServer(RemoteInterface):
- def allocate_buckets(storage_index=StorageIndex,
- sharenums=SetOf(int, maxLength=MAX_BUCKETS),
- sharesize=int, blocksize=int, canary=Referenceable):
- """
- @param canary: If the canary is lost before close(), the bucket is deleted.
- @return: tuple of (alreadygot, allocated), where alreadygot is what we
- already have and is what we hereby agree to accept
- """
- return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
- DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
- def get_buckets(storage_index=StorageIndex):
- return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
-
-
-class IStorageBucketWriter(Interface):
- def put_block(segmentnum, data):
- pass
-
- def put_plaintext_hashes(hashes):
- pass
- def put_crypttext_hashes(hashes):
- pass
- def put_block_hashes(blockhashes):
- pass
- def put_share_hashes(sharehashes):
- pass
- def put_uri_extension(data):
- pass
- def close():
- pass
-
-class IStorageBucketReader(Interface):
-
- def get_block(blocknum):
- pass
-
- def get_plaintext_hashes():
- pass
- def get_crypttext_hashes():
- pass
- def get_block_hashes():
- pass
- def get_share_hashes():
- pass
- def get_uri_extension():
- pass
-
-
# hm, we need a solution for forward references in schemas
from foolscap.schema import Any
-import os, re, weakref
+import os, re, weakref, stat, struct
from foolscap import Referenceable
from twisted.application import service
+from twisted.internet import defer
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader
from allmydata import interfaces
-from allmydata.util import bencode, fileutil, idlib
+from allmydata.util import fileutil, idlib
from allmydata.util.assertutil import precondition
# store/
class BucketWriter(Referenceable):
implements(RIBucketWriter)
- def __init__(self, ss, incominghome, finalhome, blocksize, sharesize):
+ def __init__(self, ss, incominghome, finalhome, size):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
- self.blocksize = blocksize
- self.sharesize = sharesize
+ self._size = size
self.closed = False
- self._next_segnum = 0
- fileutil.make_dirs(incominghome)
- self._write_file('blocksize', str(blocksize))
+ # touch the file, so later callers will see that we're working on it
+ f = open(self.incominghome, 'ab')
+ f.close()
def allocated_size(self):
- return self.sharesize
+ return self._size
- def _write_file(self, fname, data):
- open(os.path.join(self.incominghome, fname), 'wb').write(data)
-
- def remote_put_block(self, segmentnum, data):
+ def remote_write(self, offset, data):
precondition(not self.closed)
- # all blocks but the last will be of size self.blocksize, however the
- # last one may be short, and we don't know the total number of
- # segments so we can't tell which is which.
- assert len(data) <= self.blocksize
- assert segmentnum == self._next_segnum # must write in sequence
- self._next_segnum = segmentnum + 1
- f = fileutil.open_or_create(os.path.join(self.incominghome, 'data'))
- f.seek(self.blocksize*segmentnum)
+ precondition(offset >= 0)
+ precondition(offset+len(data) <= self._size)
+ f = open(self.incominghome, 'ab')
+ f.seek(offset)
f.write(data)
-
- def remote_put_plaintext_hashes(self, hashes):
- precondition(not self.closed)
- # TODO: verify the length of blockhashes.
- # TODO: tighten foolscap schema to require exactly 32 bytes.
- self._write_file('plaintext_hashes', ''.join(hashes))
-
- def remote_put_crypttext_hashes(self, hashes):
- precondition(not self.closed)
- # TODO: verify the length of blockhashes.
- # TODO: tighten foolscap schema to require exactly 32 bytes.
- self._write_file('crypttext_hashes', ''.join(hashes))
-
- def remote_put_block_hashes(self, blockhashes):
- precondition(not self.closed)
- # TODO: verify the length of blockhashes.
- # TODO: tighten foolscap schema to require exactly 32 bytes.
- self._write_file('blockhashes', ''.join(blockhashes))
-
- def remote_put_share_hashes(self, sharehashes):
- precondition(not self.closed)
- self._write_file('sharehashes', bencode.bencode(sharehashes))
-
- def remote_put_uri_extension(self, data):
- precondition(not self.closed)
- self._write_file('uri_extension', data)
+ f.close()
def remote_close(self):
precondition(not self.closed)
- # TODO assert or check the completeness and consistency of the data that has been written
- fileutil.make_dirs(os.path.dirname(self.finalhome))
fileutil.rename(self.incominghome, self.finalhome)
- try:
- os.rmdir(os.path.dirname(self.incominghome))
- except OSError:
- # Perhaps the directory wasn't empty. In any case, ignore the error.
- pass
-
self.closed = True
- self.ss.bucket_writer_closed(self, fileutil.du(self.finalhome))
+ filelen = os.stat(self.finalhome)[stat.ST_SIZE]
+ self.ss.bucket_writer_closed(self, filelen)
-def str2l(s):
- """ split string (pulled from storage) into a list of blockids """
- return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ]
class BucketReader(Referenceable):
implements(RIBucketReader)
def __init__(self, home):
self.home = home
- self.blocksize = int(self._read_file('blocksize'))
-
- def _read_file(self, fname):
- return open(os.path.join(self.home, fname), 'rb').read()
-
- def remote_get_block(self, blocknum):
- f = open(os.path.join(self.home, 'data'), 'rb')
- f.seek(self.blocksize * blocknum)
- return f.read(self.blocksize) # this might be short for the last block
-
- def remote_get_plaintext_hashes(self):
- return str2l(self._read_file('plaintext_hashes'))
- def remote_get_crypttext_hashes(self):
- return str2l(self._read_file('crypttext_hashes'))
-
- def remote_get_block_hashes(self):
- return str2l(self._read_file('blockhashes'))
- def remote_get_share_hashes(self):
- hashes = bencode.bdecode(self._read_file('sharehashes'))
- # tuples come through bdecode(bencode()) as lists, which violates the
- # schema
- return [tuple(i) for i in hashes]
-
- def remote_get_uri_extension(self):
- return self._read_file('uri_extension')
+ def remote_read(self, offset, length):
+ f = open(self.home, 'rb')
+ f.seek(offset)
+ return f.read(length)
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
return space
def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
- blocksize, canary):
+ canary):
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_s = idlib.b2a(storage_index)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
elif no_limits or remaining_space >= space_per_bucket:
+ fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome,
- blocksize, space_per_bucket)
+ space_per_bucket)
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
if yes_limits:
# not enough space to accept this bucket
pass
+ if bucketwriters:
+ fileutil.make_dirs(os.path.join(self.storedir, si_s))
+
return alreadygot, bucketwriters
def bucket_writer_closed(self, bw, consumed_size):
return bucketreaders
+"""
+Share data is written into a single file. At the start of the file, there is
+a series of four-byte big-endian offset values, which indicate where each
+section starts. Each offset is measured from the beginning of the file.
+
+0x00: segment size
+0x04: offset of data (=00 00 00 1c)
+0x08: offset of plaintext_hash_tree
+0x0c: offset of crypttext_hash_tree
+0x10: offset of block_hashes
+0x14: offset of share_hashes
+0x18: offset of uri_extension_length + uri_extension
+0x1c: start of data
+ start of plaintext_hash_tree
+ start of crypttext_hash_tree
+ start of block_hashes
+ start of share_hashes
+ each share_hash is written as a two-byte (big-endian) hashnum
+ followed by the 32-byte SHA-256 hash. We only store the hashes
+ necessary to validate the share hash root
+ start of uri_extension_length (four-byte big-endian value)
+ start of uri_extension
+"""
+
class WriteBucketProxy:
implements(IStorageBucketWriter)
- def __init__(self, rref):
+ def __init__(self, rref, data_size, segment_size, num_segments,
+ num_share_hashes):
self._rref = rref
+ self._segment_size = segment_size
+
+ HASH_SIZE = interfaces.HASH_SIZE
+ self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
+ # how many share hashes are included in each share? This will be
+ # about ln2(num_shares).
+ self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+
+ offsets = self._offsets = {}
+ x = 0x1c
+ offsets['data'] = x
+ x += data_size
+ offsets['plaintext_hash_tree'] = x
+ x += self._segment_hash_size
+ offsets['crypttext_hash_tree'] = x
+ x += self._segment_hash_size
+ offsets['block_hashes'] = x
+ x += self._segment_hash_size
+ offsets['share_hashes'] = x
+ x += self._share_hash_size
+ offsets['uri_extension'] = x
+
+ offset_data = struct.pack(">LLLLLLL",
+ segment_size,
+ offsets['data'],
+ offsets['plaintext_hash_tree'],
+ offsets['crypttext_hash_tree'],
+ offsets['block_hashes'],
+ offsets['share_hashes'],
+ offsets['uri_extension']
+ )
+ assert len(offset_data) == 7*4
+ self._offset_data = offset_data
+
+ def start(self):
+ return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
- return self._rref.callRemote("put_block", segmentnum, data)
+ offset = self._offsets['data'] + segmentnum * self._segment_size
+ assert offset + len(data) <= self._offsets['uri_extension']
+ assert isinstance(data, str)
+ if segmentnum < self._segment_size-1:
+ assert len(data) == self._segment_size
+ else:
+ assert len(data) <= self._segment_size
+ return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
- return self._rref.callRemote("put_plaintext_hashes", hashes)
+ offset = self._offsets['plaintext_hash_tree']
+ assert isinstance(hashes, list)
+ data = "".join(hashes)
+ assert len(data) == self._segment_hash_size
+ assert offset + len(data) <= self._offsets['crypttext_hash_tree']
+ return self._write(offset, data)
+
def put_crypttext_hashes(self, hashes):
- return self._rref.callRemote("put_crypttext_hashes", hashes)
+ offset = self._offsets['crypttext_hash_tree']
+ assert isinstance(hashes, list)
+ data = "".join(hashes)
+ assert len(data) == self._segment_hash_size
+ assert offset + len(data) <= self._offsets['block_hashes']
+ return self._write(offset, data)
+
def put_block_hashes(self, blockhashes):
- return self._rref.callRemote("put_block_hashes", blockhashes)
+ offset = self._offsets['block_hashes']
+ assert isinstance(blockhashes, list)
+ data = "".join(blockhashes)
+ assert len(data) == self._segment_hash_size
+ assert offset + len(data) <= self._offsets['share_hashes']
+ return self._write(offset, data)
+
def put_share_hashes(self, sharehashes):
- return self._rref.callRemote("put_share_hashes", sharehashes)
+ # sharehashes is a list of (index, hash) tuples, so they get stored
+ # as 2+32=34 bytes each
+ offset = self._offsets['share_hashes']
+ assert isinstance(sharehashes, list)
+ data = "".join([struct.pack(">H", hashnum) + hashvalue
+ for hashnum,hashvalue in sharehashes])
+ assert len(data) == self._share_hash_size
+ assert offset + len(data) <= self._offsets['uri_extension']
+ return self._write(offset, data)
+
def put_uri_extension(self, data):
- return self._rref.callRemote("put_uri_extension", data)
+ offset = self._offsets['uri_extension']
+ assert isinstance(data, str)
+ length = struct.pack(">L", len(data))
+ return self._write(offset, length+data)
+
+ def _write(self, offset, data):
+ # TODO: for small shares, buffer the writes and do just a single call
+ return self._rref.callRemote("write", offset, data)
+
def close(self):
return self._rref.callRemote("close")
def __init__(self, rref):
self._rref = rref
+ def startIfNecessary(self):
+ if self._started:
+ return defer.succeed(self)
+ d = self.start()
+ d.addCallback(lambda res: self)
+ return d
+
+ def start(self):
+ # TODO: for small shares, read the whole bucket in start()
+ d = self._read(0, 7*4)
+ self._offsets = {}
+ def _got_offsets(data):
+ self._segment_size = struct.unpack(">L", data[0:4])[0]
+ x = 4
+ for field in ( 'data',
+ 'plaintext_hash_tree',
+ 'crypttext_hash_tree',
+ 'block_hashes',
+ 'share_hashes',
+ 'uri_extension' ):
+ offset = struct.unpack(">L", data[x:x+4])[0]
+ x += 4
+ self._offsets[field] = offset
+ d.addCallback(_got_offsets)
+ return d
+
def get_block(self, blocknum):
- return self._rref.callRemote("get_block", blocknum)
+ offset = self._offsets['data'] + blocknum * self._segment_size
+ return self._read(offset, self._segment_size)
+
+ def _str2l(self, s):
+ """ split string (pulled from storage) into a list of blockids """
+ return [ s[i:i+interfaces.HASH_SIZE]
+ for i in range(0, len(s), interfaces.HASH_SIZE) ]
def get_plaintext_hashes(self):
- return self._rref.callRemote("get_plaintext_hashes")
+ offset = self._offsets['plaintext_hash_tree']
+ size = self._offsets['crypttext_hash_tree'] - offset
+ d = self._read(offset, size)
+ d.addCallback(self._str2l)
+ return d
+
def get_crypttext_hashes(self):
- return self._rref.callRemote("get_crypttext_hashes")
+ offset = self._offsets['crypttext_hash_tree']
+ size = self._offsets['block_hashes'] - offset
+ d = self._read(offset, size)
+ d.addCallback(self._str2l)
+ return d
+
def get_block_hashes(self):
- return self._rref.callRemote("get_block_hashes")
+ offset = self._offsets['block_hashes']
+ size = self._offsets['share_hashes'] - offset
+ d = self._read(offset, size)
+ d.addCallback(self._str2l)
+ return d
+
def get_share_hashes(self):
- return self._rref.callRemote("get_share_hashes")
- def get_uri_extension(self):
- return self._rref.callRemote("get_uri_extension")
+ offset = self._offsets['share_hashes']
+ size = self._offsets['uri_extension'] - offset
+ HASH_SIZE = interfaces.HASH_SIZE
+ assert size % (2+HASH_SIZE) == 0
+ d = self._read(offset, size)
+ def _unpack_share_hashes(data):
+ assert len(data) == size
+ hashes = []
+ for i in range(0, size, 2+HASH_SIZE):
+ hashnum = struct.unpack(">H", data[i:i+2])[0]
+ hashvalue = data[i+2:i+2+HASH_SIZE]
+ hashes.append( (hashnum, hashvalue) )
+ return hashes
+ d.addCallback(_unpack_share_hashes)
+ return d
+ def get_uri_extension(self):
+ offset = self._offsets['uri_extension']
+ d = self._read(offset, 4)
+ def _got_length(data):
+ length = struct.unpack(">L", data)[0]
+ return self._read(offset+4, length)
+ d.addCallback(_got_length)
+ return d
+
+ def _read(self, offset, length):
+ return self._rref.callRemote("read", offset, length)
self.share_hashes = None
self.closed = False
- def callRemote(self, methname, *args, **kwargs):
- # this allows FakeBucketWriter to be used either as an
- # IStorageBucketWriter or as the remote reference that it wraps. This
- # should be cleaned up eventually when we change RIBucketWriter to
- # have just write(offset, data) and close()
- def _call():
- meth = getattr(self, methname)
- return meth(*args, **kwargs)
- d = eventual.fireEventually()
- d.addCallback(lambda res: _call())
- return d
+ def startIfNecessary(self):
+ return defer.succeed(self)
+ def start(self):
+ return defer.succeed(self)
def put_block(self, segmentnum, data):
def _try():
from twisted.trial import unittest
from twisted.application import service
+from twisted.internet import defer
from foolscap import Referenceable
import os.path
-from allmydata import storageserver
-from allmydata.util import fileutil
+from allmydata import storageserver, interfaces
+from allmydata.util import fileutil, hashutil
class Bucket(unittest.TestCase):
def make_workdir(self, name):
- basedir = os.path.join("test_storage", "Bucket", name)
+ basedir = os.path.join("storage", "Bucket", name)
incoming = os.path.join(basedir, "tmp", "bucket")
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
+ fileutil.make_dirs(os.path.join(basedir, "tmp"))
return incoming, final
def bucket_writer_closed(self, bw, consumed):
def test_create(self):
incoming, final = self.make_workdir("test_create")
- bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
- bw.remote_put_block(0, "a"*25)
- bw.remote_put_block(1, "b"*25)
- bw.remote_put_block(2, "c"*7) # last block may be short
+ bw = storageserver.BucketWriter(self, incoming, final, 200)
+ bw.remote_write(0, "a"*25)
+ bw.remote_write(25, "b"*25)
+ bw.remote_write(50, "c"*25)
+ bw.remote_write(75, "d"*7)
bw.remote_close()
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
- bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
- bw.remote_put_block(0, "a"*25)
- bw.remote_put_block(1, "b"*25)
- bw.remote_put_block(2, "c"*7) # last block may be short
- bw.remote_put_block_hashes(["1"*32, "2"*32, "3"*32, "4"*32])
- bw.remote_put_share_hashes([(5, "5"*32), (6, "6"*32)])
+ bw = storageserver.BucketWriter(self, incoming, final, 200)
+ bw.remote_write(0, "a"*25)
+ bw.remote_write(25, "b"*25)
+ bw.remote_write(50, "c"*7) # last block may be short
bw.remote_close()
# now read from it
br = storageserver.BucketReader(final)
- self.failUnlessEqual(br.remote_get_block(0), "a"*25)
- self.failUnlessEqual(br.remote_get_block(1), "b"*25)
- self.failUnlessEqual(br.remote_get_block(2), "c"*7)
- self.failUnlessEqual(br.remote_get_block_hashes(),
- ["1"*32, "2"*32, "3"*32, "4"*32])
- self.failUnlessEqual(br.remote_get_share_hashes(),
- [(5, "5"*32), (6, "6"*32)])
+ self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
+ self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
+ self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
+
+class RemoteBucket:
+
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ meth = getattr(self.target, "remote_" + methname)
+ return meth(*args, **kwargs)
+ return defer.maybeDeferred(_call)
+
+class BucketProxy(unittest.TestCase):
+ def make_bucket(self, name, size):
+ basedir = os.path.join("storage", "BucketProxy", name)
+ incoming = os.path.join(basedir, "tmp", "bucket")
+ final = os.path.join(basedir, "bucket")
+ fileutil.make_dirs(basedir)
+ fileutil.make_dirs(os.path.join(basedir, "tmp"))
+ bw = storageserver.BucketWriter(self, incoming, final, size)
+ rb = RemoteBucket()
+ rb.target = bw
+ return bw, rb, final
+
+ def bucket_writer_closed(self, bw, consumed):
+ pass
+
+ def test_create(self):
+ bw, rb, final = self.make_bucket("test_create", 500)
+ bp = storageserver.WriteBucketProxy(rb,
+ data_size=300,
+ segment_size=10,
+ num_segments=5,
+ num_share_hashes=3)
+ self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
+
+ def test_readwrite(self):
+ # Let's pretend each share has 100 bytes of data, and that there are
+ # 4 segments (25 bytes each), and 8 shares total. So the three
+ # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
+ # block_hashes) will have 4 leaves and 7 nodes each. The per-share
+ # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
+ # nodes. Furthermore, let's assume the uri_extension is 500 bytes
+ # long. That should make the whole share:
+ #
+ # 0x1c + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1406 bytes long
+
+ plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
+ for i in range(7)]
+ crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
+ for i in range(7)]
+ block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
+ for i in range(7)]
+ share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
+ for i in (1,9,13)]
+ uri_extension = "s" + "E"*498 + "e"
+
+ bw, rb, final = self.make_bucket("test_readwrite", 1406)
+ bp = storageserver.WriteBucketProxy(rb,
+ data_size=100,
+ segment_size=25,
+ num_segments=4,
+ num_share_hashes=3)
+
+ d = bp.start()
+ d.addCallback(lambda res: bp.put_block(0, "a"*25))
+ d.addCallback(lambda res: bp.put_block(1, "b"*25))
+ d.addCallback(lambda res: bp.put_block(2, "c"*25))
+ d.addCallback(lambda res: bp.put_block(3, "d"*25))
+ d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
+ d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
+ d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
+ d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
+ d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
+ d.addCallback(lambda res: bp.close())
+
+ # now read everything back
+ def _start_reading(res):
+ br = storageserver.BucketReader(final)
+ rb = RemoteBucket()
+ rb.target = br
+ rbp = storageserver.ReadBucketProxy(rb)
+ self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
+
+ d1 = rbp.start()
+ d1.addCallback(lambda res: rbp.get_block(0))
+ d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
+ d1.addCallback(lambda res: rbp.get_block(1))
+ d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
+ d1.addCallback(lambda res: rbp.get_block(2))
+ d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
+ d1.addCallback(lambda res: rbp.get_block(3))
+ d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25))
+
+ d1.addCallback(lambda res: rbp.get_plaintext_hashes())
+ d1.addCallback(lambda res:
+ self.failUnlessEqual(res, plaintext_hashes))
+ d1.addCallback(lambda res: rbp.get_crypttext_hashes())
+ d1.addCallback(lambda res:
+ self.failUnlessEqual(res, crypttext_hashes))
+ d1.addCallback(lambda res: rbp.get_block_hashes())
+ d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
+ d1.addCallback(lambda res: rbp.get_share_hashes())
+ d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
+ d1.addCallback(lambda res: rbp.get_uri_extension())
+ d1.addCallback(lambda res:
+ self.failUnlessEqual(res, uri_extension))
+
+ return d1
+
+ d.addCallback(_start_reading)
+
+ return d
+
+
class Server(unittest.TestCase):
canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid", [0,1,2],
- 75, 25, canary)
+ 75, canary)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
for i,wb in writers.items():
- wb.remote_put_block(0, "%25d" % i)
+ wb.remote_write(0, "%25d" % i)
wb.remote_close()
# now they should be readable
b = ss.remote_get_buckets("vid")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
- self.failUnlessEqual(b[0].remote_get_block(0),
- "%25d" % 0)
+ self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
# now if we about writing again, the server should offer those three
# buckets as already present
already,writers = ss.remote_allocate_buckets("vid", [0,1,2,3,4],
- 75, 25, canary)
+ 75, canary)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# upload into them a second time)
already,writers = ss.remote_allocate_buckets("vid", [2,3,4,5],
- 75, 25, canary)
+ 75, canary)
self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
- 25, 5, canary)
+ 25, canary)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 75 bytes provisionally allocated,
# allowing only 25 more to be claimed
+ self.failUnlessEqual(len(ss._active_writers), 3)
already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
- 25, 5, canary)
+ 25, canary)
self.failUnlessEqual(len(writers2), 1)
+ self.failUnlessEqual(len(ss._active_writers), 4)
# we abandon the first set, so their provisional allocation should be
# returned
del already
del writers
+ self.failUnlessEqual(len(ss._active_writers), 1)
# and we close the second set, so their provisional allocation should
# become real, long-term allocation
for bw in writers2.values():
+ bw.remote_write(0, "a"*25)
bw.remote_close()
del already2
del writers2
del bw
+ self.failUnlessEqual(len(ss._active_writers), 0)
# now there should be 25 bytes allocated, and 75 free
already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
- 25, 5, canary)
+ 25, canary)
self.failUnlessEqual(len(writers3), 3)
+ self.failUnlessEqual(len(ss._active_writers), 3)
del already3
del writers3
+ self.failUnlessEqual(len(ss._active_writers), 0)
ss.disownServiceParent()
del ss
# would be more than 25 bytes and this test would need to be changed.
ss = self.create("test_sizelimits", 100)
already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
- 25, 5, canary)
+ 25, canary)
self.failUnlessEqual(len(writers4), 3)
+ self.failUnlessEqual(len(ss._active_writers), 3)
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
- sharesize, blocksize, crypttext_hash):
+ sharesize, blocksize, num_segments, num_share_hashes,
+ crypttext_hash):
self.peerid = peerid
self.permutedid = permutedid
self.connection = connection # to an RIClient
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
self.blocksize = blocksize
+ self.num_segments = num_segments
+ self.num_share_hashes = num_share_hashes
self.crypttext_hash = crypttext_hash
self._storageserver = None
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
- b = dict( [ (sharenum, storageserver.WriteBucketProxy(rref))
- for sharenum, rref in buckets.iteritems() ] )
+ b = {}
+ for sharenum, rref in buckets.iteritems():
+ bp = storageserver.WriteBucketProxy(rref, self.sharesize,
+ self.blocksize,
+ self.num_segments,
+ self.num_share_hashes)
+ b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
# responsible for handling the data and sending out the shares.
peers = self._client.get_permuted_peers(self._crypttext_hash)
assert peers
+ # TODO: eek, don't pull this from here, find a better way. gross.
+ num_segments = self._encoder.uri_extension_data['num_segments']
+ from allmydata.util.mathutil import next_power_of_k
+ import math
+ num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1)
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
+ num_segments, num_share_hashes,
self._crypttext_hash)
for permutedid, peerid, conn in peers ]
self.usable_peers = set(trackers) # this set shrinks over time