-import os, time
+import os, time, tempfile
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
-from foolscap import Referenceable
-from allmydata.interfaces import RIControlClient
-from allmydata.util import testutil, fileutil, mathutil
-from allmydata import upload, download
+from twisted.internet.interfaces import IConsumer
+from foolscap.api import Referenceable
+from allmydata.interfaces import RIControlClient, IFileNode
+from allmydata.util import fileutil, mathutil
+from allmydata.immutable import upload
+from allmydata.mutable.publish import MutableData
from twisted.python import log
def get_memory_usage():
stats["VmPeak"],
where))
+class FileWritingConsumer:
+ implements(IConsumer)
+ def __init__(self, filename):
+ self.done = False
+ self.f = open(filename, "wb")
+ def registerProducer(self, p, streaming):
+ if streaming:
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
+ def write(self, data):
+ self.f.write(data)
+ def unregisterProducer(self):
+ self.done = True
+ self.f.close()
-class ControlServer(Referenceable, service.Service, testutil.PollMixin):
+class ControlServer(Referenceable, service.Service):
implements(RIControlClient)
def remote_wait_for_client_connections(self, num_clients):
return self.parent.debug_wait_for_client_connections(num_clients)
- def remote_upload_from_file_to_uri(self, filename):
+ def remote_upload_random_data_from_file(self, size, convergence):
+ filename = tempfile.NamedTemporaryFile(delete=False).name
+ f = open(filename, "wb")
+ block = "a" * 8192
+ while size > 0:
+ l = min(size, 8192)
+ f.write(block[:l])
+ size -= l
+ f.close()
uploader = self.parent.getServiceNamed("uploader")
- u = upload.FileName(filename)
+ u = upload.FileName(filename, convergence=convergence)
d = uploader.upload(u)
- d.addCallback(lambda results: results.uri)
+ d.addCallback(lambda results: results.get_uri())
+ def _done(uri):
+ os.remove(filename)
+ return uri
+ d.addCallback(_done)
return d
- def remote_download_from_uri_to_file(self, uri, filename):
- downloader = self.parent.getServiceNamed("downloader")
- d = downloader.download_to_filename(uri, filename)
- d.addCallback(lambda res: filename)
+ def remote_download_to_tempfile_and_delete(self, uri):
+ tempdir = tempfile.mkdtemp()
+ filename = os.path.join(tempdir, "data")
+ filenode = self.parent.create_node_from_uri(uri, name=filename)
+ if not IFileNode.providedBy(filenode):
+ raise AssertionError("The URI does not reference a file.")
+ c = FileWritingConsumer(filename)
+ d = filenode.read(c)
+ def _done(res):
+ os.remove(filename)
+ os.rmdir(tempdir)
+ return None
+ d.addCallback(_done)
return d
def remote_speed_test(self, count, size, mutable):
# phase to take more than 10 seconds. Expect worst-case latency to be
# 300ms.
results = {}
- conns = self.parent.introducer_client.get_all_connections_for("storage")
- everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
+ sb = self.parent.get_storage_broker()
+ everyone = sb.get_connected_servers()
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
- everyone = everyone * num_pings
+ everyone = list(everyone) * num_pings
d = self._do_one_ping(None, everyone, results)
return d
def _do_one_ping(self, res, everyone_left, results):
if not everyone_left:
return results
- peerid, connection = everyone_left.pop(0)
+ server = everyone_left.pop(0)
+ server_name = server.get_longname()
+ connection = server.get_rref()
start = time.time()
- d = connection.callRemote("get_versions")
+ d = connection.callRemote("get_buckets", "\x00"*16)
def _done(ignored):
stop = time.time()
elapsed = stop - start
- if peerid in results:
- results[peerid].append(elapsed)
+ if server_name in results:
+ results[server_name].append(elapsed)
else:
- results[peerid] = [elapsed]
+ results[server_name] = [elapsed]
d.addCallback(_done)
d.addCallback(self._do_one_ping, everyone_left, results)
def _average(res):
averaged = {}
- for peerid,times in results.iteritems():
- averaged[peerid] = sum(times) / len(times)
+ for server_name,times in results.iteritems():
+ averaged[server_name] = sum(times) / len(times)
return averaged
d.addCallback(_average)
return d
d1.addCallback(lambda n: n.get_uri())
elif self.mutable_mode == "upload":
data = open(fn,"rb").read()
- d1 = self._n.replace(data)
+ d1 = self._n.overwrite(MutableData(data))
d1.addCallback(lambda res: self._n.get_uri())
else:
- up = upload.FileName(fn)
+ up = upload.FileName(fn, convergence=None)
d1 = self.parent.upload(up)
- d1.addCallback(lambda results: results.uri)
+ d1.addCallback(lambda results: results.get_uri())
d1.addCallback(_record_uri, i)
d1.addCallback(_upload_one_file, i+1)
return d1
if i >= self.count:
return
n = self.parent.create_node_from_uri(self.uris[i])
- d1 = n.download(download.FileHandle(Discard()))
+ if not IFileNode.providedBy(n):
+ raise AssertionError("The URI does not reference a file.")
+ if n.is_mutable():
+ d1 = n.download_best_version()
+ else:
+ d1 = n.read(DiscardingConsumer())
d1.addCallback(_download_one_file, i+1)
return d1
d.addCallback(_download_one_file, 0)
os.unlink(fn)
return res
-class Discard:
+class DiscardingConsumer:
+ implements(IConsumer)
+ def __init__(self):
+ self.done = False
+ def registerProducer(self, p, streaming):
+ if streaming:
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
def write(self, data):
pass
- # download_to_filehandle explicitly does not close the filehandle it was
- # given: that is reserved for the provider of the filehandle. Therefore
- # the lack of a close() method on this otherwise filehandle-like object
- # is a part of the test.
+ def unregisterProducer(self):
+ self.done = True