This will make it easier to change RIBucketWriter in the future to reduce the wire
protocol to just open/write(offset,data)/close, and do all the structuring on the
client end. The ultimate goal is to store each bucket in a single file, to reduce
the considerable filesystem-quantization/inode overhead on the storage servers.
from allmydata.util import idlib, mathutil, hashutil
from allmydata.util.assertutil import _assert
-from allmydata import codec, hashtree
+from allmydata import codec, hashtree, storageserver
from allmydata.Crypto.Cipher import AES
from allmydata.uri import unpack_uri, unpack_extension
from allmydata.interfaces import IDownloadTarget, IDownloader
# of the share hash tree to validate it from our share hash up to the
# hashroot.
if not self._share_hash:
- d1 = self.bucket.callRemote('get_share_hashes')
+ d1 = self.bucket.get_share_hashes()
else:
d1 = defer.succeed(None)
# validate the requested block up to the share hash
needed = self.block_hash_tree.needed_hashes(blocknum)
if needed:
- # TODO: get fewer hashes, callRemote('get_block_hashes', needed)
- d2 = self.bucket.callRemote('get_block_hashes')
+ # TODO: get fewer hashes, use get_block_hashes(needed)
+ d2 = self.bucket.get_block_hashes()
else:
d2 = defer.succeed([])
- d3 = self.bucket.callRemote('get_block', blocknum)
+ d3 = self.bucket.get_block(blocknum)
d = defer.gatherResults([d1, d2, d3])
d.addCallback(self._got_data, blocknum)
def _got_response(self, buckets, connection):
_assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
for sharenum, bucket in buckets.iteritems():
- self.add_share_bucket(sharenum, bucket)
- self._uri_extension_sources.append(bucket)
+ b = storageserver.ReadBucketProxy(bucket)
+ self.add_share_bucket(sharenum, b)
+ self._uri_extension_sources.append(b)
def add_share_bucket(self, sharenum, bucket):
# this is split out for the benefit of test_encode.py
"%s" % name)
bucket = sources[0]
sources = sources[1:]
- d = bucket.callRemote(methname, *args)
+ #d = bucket.callRemote(methname, *args)
+ d = getattr(bucket, methname)(*args)
d.addCallback(validatorfunc, bucket)
def _bad(f):
log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD
from allmydata.util import mathutil, hashutil
from allmydata.util.assertutil import _assert
from allmydata.codec import CRSEncoder
-from allmydata.interfaces import IEncoder
+from allmydata.interfaces import IEncoder, IStorageBucketWriter
"""
for k in landlords:
# it would be nice to:
#assert RIBucketWriter.providedBy(landlords[k])
+ assert IStorageBucketWriter(landlords[k])
pass
self.landlords = landlords.copy()
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- d = sh.callRemote("put_block", segment_num, subshare)
+ d = sh.put_block(segment_num, subshare)
d.addErrback(self._remove_shareholder, shareid,
"segnum=%d" % segment_num)
return d
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- d = sh.callRemote("put_plaintext_hashes", all_hashes)
+ d = sh.put_plaintext_hashes(all_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
return d
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- d = sh.callRemote("put_crypttext_hashes", all_hashes)
+ d = sh.put_crypttext_hashes(all_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
return d
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- d = sh.callRemote("put_block_hashes", all_hashes)
+ d = sh.put_block_hashes(all_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
return d
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- d = sh.callRemote("put_share_hashes", needed_hashes)
+ d = sh.put_share_hashes(needed_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
return d
def send_uri_extension(self, shareid, uri_extension):
sh = self.landlords[shareid]
- d = sh.callRemote("put_uri_extension", uri_extension)
+ d = sh.put_uri_extension(uri_extension)
d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
return d
log.msg("%s: closing shareholders" % self)
dl = []
for shareid in self.landlords:
- d = self.landlords[shareid].callRemote("close")
+ d = self.landlords[shareid].close()
d.addErrback(self._remove_shareholder, shareid, "close")
dl.append(d)
return self._gather_responses(dl)
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
+
+class IStorageBucketWriter(Interface):
+ def put_block(segmentnum, data):
+ pass
+
+ def put_plaintext_hashes(hashes):
+ pass
+ def put_crypttext_hashes(hashes):
+ pass
+ def put_block_hashes(blockhashes):
+ pass
+ def put_share_hashes(sharehashes):
+ pass
+ def put_uri_extension(data):
+ pass
+ def close():
+ pass
+
+class IStorageBucketReader(Interface):
+
+ def get_block(blocknum):
+ pass
+
+ def get_plaintext_hashes():
+ pass
+ def get_crypttext_hashes():
+ pass
+ def get_block_hashes():
+ pass
+ def get_share_hashes():
+ pass
+ def get_uri_extension():
+ pass
+
+
+
# hm, we need a solution for forward references in schemas
from foolscap.schema import Any
RIMutableDirectoryNode_ = Any() # TODO: how can we avoid this?
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
- RIBucketReader
+ RIBucketReader, IStorageBucketWriter, IStorageBucketReader
from allmydata import interfaces
from allmydata.util import bencode, fileutil, idlib
from allmydata.util.assertutil import precondition
pass
return bucketreaders
+
+class WriteBucketProxy:
+ implements(IStorageBucketWriter)
+ def __init__(self, rref):
+ self._rref = rref
+
+ def put_block(self, segmentnum, data):
+ return self._rref.callRemote("put_block", segmentnum, data)
+
+ def put_plaintext_hashes(self, hashes):
+ return self._rref.callRemote("put_plaintext_hashes", hashes)
+ def put_crypttext_hashes(self, hashes):
+ return self._rref.callRemote("put_crypttext_hashes", hashes)
+ def put_block_hashes(self, blockhashes):
+ return self._rref.callRemote("put_block_hashes", blockhashes)
+ def put_share_hashes(self, sharehashes):
+ return self._rref.callRemote("put_share_hashes", sharehashes)
+ def put_uri_extension(self, data):
+ return self._rref.callRemote("put_uri_extension", data)
+ def close(self):
+ return self._rref.callRemote("close")
+
+class ReadBucketProxy:
+ implements(IStorageBucketReader)
+ def __init__(self, rref):
+ self._rref = rref
+
+ def get_block(self, blocknum):
+ return self._rref.callRemote("get_block", blocknum)
+
+ def get_plaintext_hashes(self):
+ return self._rref.callRemote("get_plaintext_hashes")
+ def get_crypttext_hashes(self):
+ return self._rref.callRemote("get_crypttext_hashes")
+ def get_block_hashes(self):
+ return self._rref.callRemote("get_block_hashes")
+ def get_share_hashes(self):
+ return self._rref.callRemote("get_share_hashes")
+ def get_uri_extension(self):
+ return self._rref.callRemote("get_uri_extension")
+
+from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python.failure import Failure
from allmydata.util import hashutil
from allmydata.uri import pack_uri
from allmydata.Crypto.Cipher import AES
+from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
from cStringIO import StringIO
class FakePeer:
return good[:-1] + chr(ord(good[-1]) ^ 0x01)
class FakeBucketWriter:
+ implements(IStorageBucketWriter, IStorageBucketReader)
# these are used for both reading and writing
def __init__(self, mode="good"):
self.mode = mode
self.closed = False
def callRemote(self, methname, *args, **kwargs):
+ # this allows FakeBucketWriter to be used either as an
+ # IStorageBucketWriter or as the remote reference that it wraps. This
+ # should be cleaned up eventually when we change RIBucketWriter to
+ # have just write(offset, data) and close()
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
- return defer.maybeDeferred(_call)
+ d = eventual.fireEventually()
+ d.addCallback(lambda res: _call())
+ return d
def put_block(self, segmentnum, data):
- assert not self.closed
- assert segmentnum not in self.blocks
- if self.mode == "lost" and segmentnum >= 1:
- raise LostPeerError("I'm going away now")
- self.blocks[segmentnum] = data
+ def _try():
+ assert not self.closed
+ assert segmentnum not in self.blocks
+ if self.mode == "lost" and segmentnum >= 1:
+ raise LostPeerError("I'm going away now")
+ self.blocks[segmentnum] = data
+ return defer.maybeDeferred(_try)
def put_plaintext_hashes(self, hashes):
- assert not self.closed
- assert self.plaintext_hashes is None
- self.plaintext_hashes = hashes
+ def _try():
+ assert not self.closed
+ assert self.plaintext_hashes is None
+ self.plaintext_hashes = hashes
+ return defer.maybeDeferred(_try)
def put_crypttext_hashes(self, hashes):
- assert not self.closed
- assert self.crypttext_hashes is None
- self.crypttext_hashes = hashes
+ def _try():
+ assert not self.closed
+ assert self.crypttext_hashes is None
+ self.crypttext_hashes = hashes
+ return defer.maybeDeferred(_try)
def put_block_hashes(self, blockhashes):
- assert not self.closed
- assert self.block_hashes is None
- self.block_hashes = blockhashes
+ def _try():
+ assert not self.closed
+ assert self.block_hashes is None
+ self.block_hashes = blockhashes
+ return defer.maybeDeferred(_try)
def put_share_hashes(self, sharehashes):
- assert not self.closed
- assert self.share_hashes is None
- self.share_hashes = sharehashes
+ def _try():
+ assert not self.closed
+ assert self.share_hashes is None
+ self.share_hashes = sharehashes
+ return defer.maybeDeferred(_try)
def put_uri_extension(self, uri_extension):
- assert not self.closed
- self.uri_extension = uri_extension
+ def _try():
+ assert not self.closed
+ self.uri_extension = uri_extension
+ return defer.maybeDeferred(_try)
def close(self):
- assert not self.closed
- self.closed = True
+ def _try():
+ assert not self.closed
+ self.closed = True
+ return defer.maybeDeferred(_try)
def get_block(self, blocknum):
- assert isinstance(blocknum, (int, long))
- if self.mode == "bad block":
- return flip_bit(self.blocks[blocknum])
- return self.blocks[blocknum]
+ def _try():
+ assert isinstance(blocknum, (int, long))
+ if self.mode == "bad block":
+ return flip_bit(self.blocks[blocknum])
+ return self.blocks[blocknum]
+ return defer.maybeDeferred(_try)
def get_plaintext_hashes(self):
- hashes = self.plaintext_hashes[:]
- if self.mode == "bad plaintext hashroot":
- hashes[0] = flip_bit(hashes[0])
- if self.mode == "bad plaintext hash":
- hashes[1] = flip_bit(hashes[1])
- return hashes
+ def _try():
+ hashes = self.plaintext_hashes[:]
+ if self.mode == "bad plaintext hashroot":
+ hashes[0] = flip_bit(hashes[0])
+ if self.mode == "bad plaintext hash":
+ hashes[1] = flip_bit(hashes[1])
+ return hashes
+ return defer.maybeDeferred(_try)
def get_crypttext_hashes(self):
- hashes = self.crypttext_hashes[:]
- if self.mode == "bad crypttext hashroot":
- hashes[0] = flip_bit(hashes[0])
- if self.mode == "bad crypttext hash":
- hashes[1] = flip_bit(hashes[1])
- return hashes
+ def _try():
+ hashes = self.crypttext_hashes[:]
+ if self.mode == "bad crypttext hashroot":
+ hashes[0] = flip_bit(hashes[0])
+ if self.mode == "bad crypttext hash":
+ hashes[1] = flip_bit(hashes[1])
+ return hashes
+ return defer.maybeDeferred(_try)
def get_block_hashes(self):
- if self.mode == "bad blockhash":
- hashes = self.block_hashes[:]
- hashes[1] = flip_bit(hashes[1])
- return hashes
- return self.block_hashes
+ def _try():
+ if self.mode == "bad blockhash":
+ hashes = self.block_hashes[:]
+ hashes[1] = flip_bit(hashes[1])
+ return hashes
+ return self.block_hashes
+ return defer.maybeDeferred(_try)
+
def get_share_hashes(self):
- if self.mode == "bad sharehash":
- hashes = self.share_hashes[:]
- hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
- return hashes
- if self.mode == "missing sharehash":
- # one sneaky attack would be to pretend we don't know our own
- # sharehash, which could manage to frame someone else.
- # download.py is supposed to guard against this case.
- return []
- return self.share_hashes
+ def _try():
+ if self.mode == "bad sharehash":
+ hashes = self.share_hashes[:]
+ hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
+ return hashes
+ if self.mode == "missing sharehash":
+ # one sneaky attack would be to pretend we don't know our own
+ # sharehash, which could manage to frame someone else.
+ # download.py is supposed to guard against this case.
+ return []
+ return self.share_hashes
+ return defer.maybeDeferred(_try)
def get_uri_extension(self):
- if self.mode == "bad uri_extension":
- return flip_bit(self.uri_extension)
- return self.uri_extension
+ def _try():
+ if self.mode == "bad uri_extension":
+ return flip_bit(self.uri_extension)
+ return self.uri_extension
+ return defer.maybeDeferred(_try)
def make_data(length):
from foolscap import Referenceable
from allmydata.util import idlib, hashutil
-from allmydata import encode
+from allmydata import encode, storageserver
from allmydata.uri import pack_uri
from allmydata.interfaces import IUploadable, IUploader
from allmydata.Crypto.Cipher import AES
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
- self.buckets.update(buckets)
- return (alreadygot, set(buckets.keys()))
+ b = dict( [ (sharenum, storageserver.WriteBucketProxy(rref))
+ for sharenum, rref in buckets.iteritems() ] )
+ self.buckets.update(b)
+ return (alreadygot, set(b.keys()))
class FileUploader: