]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/control.py
check_speed: average multiple pings when measuring RTT
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
1
2 import os, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from allmydata.interfaces import RIControlClient
8 from allmydata.util import testutil, fileutil, mathutil
9 from twisted.python import log
10
11 def get_memory_usage():
12     # this is obviously linux-specific
13     stat_names = ("VmPeak",
14                   "VmSize",
15                   #"VmHWM",
16                   "VmData")
17     stats = {}
18     for line in open("/proc/self/status", "r").readlines():
19         name, right = line.split(":",2)
20         if name in stat_names:
21             assert right.endswith(" kB\n")
22             right = right[:-4]
23             stats[name] = int(right) * 1024
24     return stats
25
26 def log_memory_usage(where=""):
27     stats = get_memory_usage()
28     log.msg("VmSize: %9d  VmPeak: %9d  %s" % (stats["VmSize"],
29                                               stats["VmPeak"],
30                                               where))
31
32
33 class ControlServer(Referenceable, service.Service, testutil.PollMixin):
34     implements(RIControlClient)
35
36     def remote_wait_for_client_connections(self, num_clients):
37         return self.parent.debug_wait_for_client_connections(num_clients)
38
39     def remote_upload_from_file_to_uri(self, filename):
40         uploader = self.parent.getServiceNamed("uploader")
41         d = uploader.upload_filename(filename)
42         return d
43
44     def remote_download_from_uri_to_file(self, uri, filename):
45         downloader = self.parent.getServiceNamed("downloader")
46         d = downloader.download_to_filename(uri, filename)
47         d.addCallback(lambda res: filename)
48         return d
49
50     def remote_speed_test(self, count, size):
51         assert size > 8
52         log.msg("speed_test: count=%d, size=%d" % (count, size))
53         st = SpeedTest(self.parent, count, size)
54         return st.run()
55
56     def remote_get_memory_usage(self):
57         return get_memory_usage()
58
59     def remote_measure_peer_response_time(self):
60         # I'd like to average together several pings, but I don't want this
61         # phase to take more than 10 seconds. Expect worst-case latency to be
62         # 300ms.
63         results = {}
64         everyone = list(self.parent.introducer_client.get_all_peers())
65         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
66         everyone = everyone * num_pings
67         d = self._do_one_ping(None, everyone, results)
68         return d
69     def _do_one_ping(self, res, everyone_left, results):
70         if not everyone_left:
71             return results
72         peerid, connection = everyone_left.pop(0)
73         start = time.time()
74         d = connection.callRemote("get_nodeid")
75         def _done(ignored):
76             stop = time.time()
77             elapsed = stop - start
78             if peerid in results:
79                 results[peerid].append(elapsed)
80             else:
81                 results[peerid] = [elapsed]
82         d.addCallback(_done)
83         d.addCallback(self._do_one_ping, everyone_left, results)
84         def _average(res):
85             averaged = {}
86             for peerid,times in results.iteritems():
87                 averaged[peerid] = sum(times) / len(times)
88             return averaged
89         d.addCallback(_average)
90         return d
91
92 class SpeedTest:
93     def __init__(self, parent, count, size):
94         self.parent = parent
95         self.count = count
96         self.size = size
97         self.uris = {}
98         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
99
100     def run(self):
101         self.create_data()
102         d = self.do_upload()
103         d.addCallback(lambda res: self.do_download())
104         d.addBoth(self.do_cleanup)
105         d.addCallback(lambda res: (self.upload_time, self.download_time))
106         return d
107
108     def create_data(self):
109         fileutil.make_dirs(self.basedir)
110         for i in range(self.count):
111             s = self.size
112             fn = os.path.join(self.basedir, str(i))
113             if os.path.exists(fn):
114                 os.unlink(fn)
115             f = open(fn, "w")
116             f.write(os.urandom(8))
117             s -= 8
118             while s > 0:
119                 chunk = min(s, 4096)
120                 f.write("\x00" * chunk)
121                 s -= chunk
122             f.close()
123
124     def do_upload(self):
125         uploader = self.parent.getServiceNamed("uploader")
126         start = time.time()
127         d = defer.succeed(None)
128         def _record_uri(uri, i):
129             self.uris[i] = uri
130         def _upload_one_file(ignored, i):
131             if i >= self.count:
132                 return
133             fn = os.path.join(self.basedir, str(i))
134             d1 = uploader.upload_filename(fn)
135             d1.addCallback(_record_uri, i)
136             d1.addCallback(_upload_one_file, i+1)
137             return d1
138         d.addCallback(_upload_one_file, 0)
139         def _upload_done(ignored):
140             stop = time.time()
141             self.upload_time = stop - start
142         d.addCallback(_upload_done)
143         return d
144
145     def do_download(self):
146         downloader = self.parent.getServiceNamed("downloader")
147         start = time.time()
148         d = defer.succeed(None)
149         def _download_one_file(ignored, i):
150             if i >= self.count:
151                 return
152             d1 = downloader.download_to_filehandle(self.uris[i], Discard())
153             d1.addCallback(_download_one_file, i+1)
154             return d1
155         d.addCallback(_download_one_file, 0)
156         def _download_done(ignored):
157             stop = time.time()
158             self.download_time = stop - start
159         d.addCallback(_download_done)
160         return d
161
162     def do_cleanup(self, res):
163         for i in range(self.count):
164             fn = os.path.join(self.basedir, str(i))
165             os.unlink(fn)
166         return res
167
168 class Discard:
169     def write(self, data):
170         pass
171     # download_to_filehandle explicitly does not close the filehandle it was
172     # given: that is reserved for the provider of the filehandle. Therefore
173     # the lack of a close() method on this otherwise filehandle-like object
174     # is a part of the test.