# use the upload-side code to get this as accurate as possible
ht = IncompleteHashTree(N)
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
- wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
- r["num_segments"], num_share_hashes, 0,
- None)
+ wbp = make_write_bucket_proxy(None, None, share_size, r["block_size"],
+ r["num_segments"], num_share_hashes, 0)
self._fieldsize = wbp.fieldsize
self._fieldstruct = wbp.fieldstruct
self.guessed_offsets = wbp._offsets
FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
-def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
- num_share_hashes, uri_extension_size_max, nodeid):
+def make_write_bucket_proxy(rref, server,
+ data_size, block_size, num_segments,
+ num_share_hashes, uri_extension_size_max):
# Use layout v1 for small files, so they'll be readable by older versions
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
# by tahoe-1.3.0 or later.
try:
if FORCE_V2:
raise FileTooLargeError
- wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
- num_share_hashes, uri_extension_size_max, nodeid)
+ wbp = WriteBucketProxy(rref, server,
+ data_size, block_size, num_segments,
+ num_share_hashes, uri_extension_size_max)
except FileTooLargeError:
- wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
- num_share_hashes, uri_extension_size_max, nodeid)
+ wbp = WriteBucketProxy_v2(rref, server,
+ data_size, block_size, num_segments,
+ num_share_hashes, uri_extension_size_max)
return wbp
class WriteBucketProxy:
fieldsize = 4
fieldstruct = ">L"
- def __init__(self, rref, data_size, block_size, num_segments,
- num_share_hashes, uri_extension_size_max, nodeid,
- pipeline_size=50000):
+ def __init__(self, rref, server, data_size, block_size, num_segments,
+ num_share_hashes, uri_extension_size_max, pipeline_size=50000):
self._rref = rref
+ self._server = server
self._data_size = data_size
self._block_size = block_size
self._num_segments = num_segments
- self._nodeid = nodeid
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
self._offset_data = offset_data
def __repr__(self):
- if self._nodeid:
- nodeid_s = idlib.nodeid_b2a(self._nodeid)
- else:
- nodeid_s = "[None]"
- return "<WriteBucketProxy for node %s>" % nodeid_s
+ return "<WriteBucketProxy for node %s>" % self._server.get_name()
def put_header(self):
return self._write(0, self._offset_data)
return self._rref.callRemoteOnly("abort")
+ def get_servername(self):
+ return self._server.get_name()
def get_peerid(self):
- if self._nodeid:
- return self._nodeid
- return None
+ return self._server.get_serverid()
class WriteBucketProxy_v2(WriteBucketProxy):
fieldsize = 8
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
- wbp = layout.make_write_bucket_proxy(None, sharesize,
+ wbp = layout.make_write_bucket_proxy(None, None, sharesize,
blocksize, num_segments,
num_share_hashes,
- EXTENSION_SIZE, server.get_serverid())
+ EXTENSION_SIZE)
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
- bp = self.wbp_class(rref, self.sharesize,
+ bp = self.wbp_class(rref, self._server, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
- EXTENSION_SIZE,
- self._server.get_serverid())
+ EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
def str_shareloc(shnum, bucketwriter):
- return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
+ return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class Tahoe2ServerSelector(log.PrefixingLogMixin):
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# figure out how much space to ask for
- wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
- num_share_hashes, EXTENSION_SIZE,
- None)
+ wbp = layout.make_write_bucket_proxy(None, None,
+ share_size, 0, num_segments,
+ num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
def test_create(self):
bw, rb, sharefname = self.make_bucket("test_create", 500)
- bp = WriteBucketProxy(rb,
+ bp = WriteBucketProxy(rb, None,
data_size=300,
block_size=10,
num_segments=5,
num_share_hashes=3,
- uri_extension_size_max=500, nodeid=None)
+ uri_extension_size_max=500)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
uri_extension = "s" + "E"*498 + "e"
bw, rb, sharefname = self.make_bucket(name, sharesize)
- bp = wbp_class(rb,
+ bp = wbp_class(rb, None,
data_size=95,
block_size=25,
num_segments=4,
num_share_hashes=3,
- uri_extension_size_max=len(uri_extension),
- nodeid=None)
+ uri_extension_size_max=len(uri_extension))
d = bp.put_header()
d.addCallback(lambda res: bp.put_block(0, "a"*25))