]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/control.py
check-speed: test SSK upload/download speed too. SDMF imposes a limit on the file...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
1
2 import os, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from allmydata.interfaces import RIControlClient
8 from allmydata.util import testutil, fileutil, mathutil
9 from allmydata import upload, download
10 from twisted.python import log
11
12 def get_memory_usage():
13     # this is obviously linux-specific
14     stat_names = ("VmPeak",
15                   "VmSize",
16                   #"VmHWM",
17                   "VmData")
18     stats = {}
19     try:
20         for line in open("/proc/self/status", "r").readlines():
21             name, right = line.split(":",2)
22             if name in stat_names:
23                 assert right.endswith(" kB\n")
24                 right = right[:-4]
25                 stats[name] = int(right) * 1024
26     except:
27         # Probably not on (a compatible version of) Linux
28         stats['VmSize'] = 0
29         stats['VmPeak'] = 0
30     return stats
31
32 def log_memory_usage(where=""):
33     stats = get_memory_usage()
34     log.msg("VmSize: %9d  VmPeak: %9d  %s" % (stats["VmSize"],
35                                               stats["VmPeak"],
36                                               where))
37
38
39 class ControlServer(Referenceable, service.Service, testutil.PollMixin):
40     implements(RIControlClient)
41
42     def remote_wait_for_client_connections(self, num_clients):
43         return self.parent.debug_wait_for_client_connections(num_clients)
44
45     def remote_upload_from_file_to_uri(self, filename):
46         uploader = self.parent.getServiceNamed("uploader")
47         d = uploader.upload_filename(filename)
48         return d
49
50     def remote_download_from_uri_to_file(self, uri, filename):
51         downloader = self.parent.getServiceNamed("downloader")
52         d = downloader.download_to_filename(uri, filename)
53         d.addCallback(lambda res: filename)
54         return d
55
56     def remote_speed_test(self, count, size, mutable):
57         assert size > 8
58         log.msg("speed_test: count=%d, size=%d, mutable=%d" % (count, size,
59                                                                mutable))
60         st = SpeedTest(self.parent, count, size, mutable)
61         return st.run()
62
63     def remote_get_memory_usage(self):
64         return get_memory_usage()
65
66     def remote_measure_peer_response_time(self):
67         # I'd like to average together several pings, but I don't want this
68         # phase to take more than 10 seconds. Expect worst-case latency to be
69         # 300ms.
70         results = {}
71         everyone = list(self.parent.introducer_client.get_all_peers())
72         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
73         everyone = everyone * num_pings
74         d = self._do_one_ping(None, everyone, results)
75         return d
76     def _do_one_ping(self, res, everyone_left, results):
77         if not everyone_left:
78             return results
79         peerid, connection = everyone_left.pop(0)
80         start = time.time()
81         d = connection.callRemote("get_nodeid")
82         def _done(ignored):
83             stop = time.time()
84             elapsed = stop - start
85             if peerid in results:
86                 results[peerid].append(elapsed)
87             else:
88                 results[peerid] = [elapsed]
89         d.addCallback(_done)
90         d.addCallback(self._do_one_ping, everyone_left, results)
91         def _average(res):
92             averaged = {}
93             for peerid,times in results.iteritems():
94                 averaged[peerid] = sum(times) / len(times)
95             return averaged
96         d.addCallback(_average)
97         return d
98
99 class SpeedTest:
100     def __init__(self, parent, count, size, mutable):
101         self.parent = parent
102         self.count = count
103         self.size = size
104         self.mutable = mutable
105         self.uris = {}
106         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
107
108     def run(self):
109         self.create_data()
110         d = self.do_upload()
111         d.addCallback(lambda res: self.do_download())
112         d.addBoth(self.do_cleanup)
113         d.addCallback(lambda res: (self.upload_time, self.download_time))
114         return d
115
116     def create_data(self):
117         fileutil.make_dirs(self.basedir)
118         for i in range(self.count):
119             s = self.size
120             fn = os.path.join(self.basedir, str(i))
121             if os.path.exists(fn):
122                 os.unlink(fn)
123             f = open(fn, "w")
124             f.write(os.urandom(8))
125             s -= 8
126             while s > 0:
127                 chunk = min(s, 4096)
128                 f.write("\x00" * chunk)
129                 s -= chunk
130             f.close()
131
132     def do_upload(self):
133         start = time.time()
134         d = defer.succeed(None)
135         def _record_uri(uri, i):
136             self.uris[i] = uri
137         def _upload_one_file(ignored, i):
138             if i >= self.count:
139                 return
140             fn = os.path.join(self.basedir, str(i))
141             if self.mutable:
142                 data = open(fn,"rb").read()
143                 d1 = self.parent.create_mutable_file(data)
144                 d1.addCallback(lambda n: n.get_uri())
145             else:
146                 up = upload.FileName(fn)
147                 d1 = self.parent.upload(up)
148             d1.addCallback(_record_uri, i)
149             d1.addCallback(_upload_one_file, i+1)
150             return d1
151         d.addCallback(_upload_one_file, 0)
152         def _upload_done(ignored):
153             stop = time.time()
154             self.upload_time = stop - start
155         d.addCallback(_upload_done)
156         return d
157
158     def do_download(self):
159         start = time.time()
160         d = defer.succeed(None)
161         def _download_one_file(ignored, i):
162             if i >= self.count:
163                 return
164             n = self.parent.create_node_from_uri(self.uris[i])
165             d1 = n.download(download.FileHandle(Discard()))
166             d1.addCallback(_download_one_file, i+1)
167             return d1
168         d.addCallback(_download_one_file, 0)
169         def _download_done(ignored):
170             stop = time.time()
171             self.download_time = stop - start
172         d.addCallback(_download_done)
173         return d
174
175     def do_cleanup(self, res):
176         for i in range(self.count):
177             fn = os.path.join(self.basedir, str(i))
178             os.unlink(fn)
179         return res
180
181 class Discard:
182     def write(self, data):
183         pass
184     # download_to_filehandle explicitly does not close the filehandle it was
185     # given: that is reserved for the provider of the filehandle. Therefore
186     # the lack of a close() method on this otherwise filehandle-like object
187     # is a part of the test.