]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/control.py
f2f155059d09e891a7cae2ce150108bfe3a8248a
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
1
2 import os, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IConsumer
7 from foolscap.api import Referenceable
8 from allmydata.interfaces import RIControlClient, IFileNode
9 from allmydata.util import fileutil, mathutil
10 from allmydata.immutable import upload
11 from twisted.python import log
12
13 def get_memory_usage():
14     # this is obviously linux-specific
15     stat_names = ("VmPeak",
16                   "VmSize",
17                   #"VmHWM",
18                   "VmData")
19     stats = {}
20     try:
21         for line in open("/proc/self/status", "r").readlines():
22             name, right = line.split(":",2)
23             if name in stat_names:
24                 assert right.endswith(" kB\n")
25                 right = right[:-4]
26                 stats[name] = int(right) * 1024
27     except:
28         # Probably not on (a compatible version of) Linux
29         stats['VmSize'] = 0
30         stats['VmPeak'] = 0
31     return stats
32
33 def log_memory_usage(where=""):
34     stats = get_memory_usage()
35     log.msg("VmSize: %9d  VmPeak: %9d  %s" % (stats["VmSize"],
36                                               stats["VmPeak"],
37                                               where))
38
39 class FileWritingConsumer:
40     implements(IConsumer)
41     def __init__(self, filename):
42         self.done = False
43         self.f = open(filename, "wb")
44     def registerProducer(self, p, streaming):
45         if streaming:
46             p.resumeProducing()
47         else:
48             while not self.done:
49                 p.resumeProducing()
50     def write(self, data):
51         self.f.write(data)
52     def unregisterProducer(self):
53         self.done = True
54         self.f.close()
55
56 class ControlServer(Referenceable, service.Service):
57     implements(RIControlClient)
58
59     def remote_wait_for_client_connections(self, num_clients):
60         return self.parent.debug_wait_for_client_connections(num_clients)
61
62     def remote_upload_from_file_to_uri(self, filename, convergence):
63         uploader = self.parent.getServiceNamed("uploader")
64         u = upload.FileName(filename, convergence=convergence)
65         d = uploader.upload(u)
66         d.addCallback(lambda results: results.uri)
67         return d
68
69     def remote_download_from_uri_to_file(self, uri, filename):
70         filenode = self.parent.create_node_from_uri(uri, name=filename)
71         if not IFileNode.providedBy(filenode):
72             raise AssertionError("The URI does not reference a file.")
73         c = FileWritingConsumer(filename)
74         d = filenode.read(c)
75         d.addCallback(lambda res: filename)
76         return d
77
78     def remote_speed_test(self, count, size, mutable):
79         assert size > 8
80         log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
81                                                                mutable))
82         st = SpeedTest(self.parent, count, size, mutable)
83         return st.run()
84
85     def remote_get_memory_usage(self):
86         return get_memory_usage()
87
88     def remote_measure_peer_response_time(self):
89         # I'd like to average together several pings, but I don't want this
90         # phase to take more than 10 seconds. Expect worst-case latency to be
91         # 300ms.
92         results = {}
93         sb = self.parent.get_storage_broker()
94         everyone = sb.get_connected_servers()
95         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
96         everyone = list(everyone) * num_pings
97         d = self._do_one_ping(None, everyone, results)
98         return d
99     def _do_one_ping(self, res, everyone_left, results):
100         if not everyone_left:
101             return results
102         server = everyone_left.pop(0)
103         server_name = server.get_longname()
104         connection = server.get_rref()
105         start = time.time()
106         d = connection.callRemote("get_buckets", "\x00"*16)
107         def _done(ignored):
108             stop = time.time()
109             elapsed = stop - start
110             if server_name in results:
111                 results[server_name].append(elapsed)
112             else:
113                 results[server_name] = [elapsed]
114         d.addCallback(_done)
115         d.addCallback(self._do_one_ping, everyone_left, results)
116         def _average(res):
117             averaged = {}
118             for server_name,times in results.iteritems():
119                 averaged[server_name] = sum(times) / len(times)
120             return averaged
121         d.addCallback(_average)
122         return d
123
124 class SpeedTest:
125     def __init__(self, parent, count, size, mutable):
126         self.parent = parent
127         self.count = count
128         self.size = size
129         self.mutable_mode = mutable
130         self.uris = {}
131         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
132
133     def run(self):
134         self.create_data()
135         d = self.do_upload()
136         d.addCallback(lambda res: self.do_download())
137         d.addBoth(self.do_cleanup)
138         d.addCallback(lambda res: (self.upload_time, self.download_time))
139         return d
140
141     def create_data(self):
142         fileutil.make_dirs(self.basedir)
143         for i in range(self.count):
144             s = self.size
145             fn = os.path.join(self.basedir, str(i))
146             if os.path.exists(fn):
147                 os.unlink(fn)
148             f = open(fn, "w")
149             f.write(os.urandom(8))
150             s -= 8
151             while s > 0:
152                 chunk = min(s, 4096)
153                 f.write("\x00" * chunk)
154                 s -= chunk
155             f.close()
156
157     def do_upload(self):
158         d = defer.succeed(None)
159         def _create_slot(res):
160             d1 = self.parent.create_mutable_file("")
161             def _created(n):
162                 self._n = n
163             d1.addCallback(_created)
164             return d1
165         if self.mutable_mode == "upload":
166             d.addCallback(_create_slot)
167         def _start(res):
168             self._start = time.time()
169         d.addCallback(_start)
170
171         def _record_uri(uri, i):
172             self.uris[i] = uri
173         def _upload_one_file(ignored, i):
174             if i >= self.count:
175                 return
176             fn = os.path.join(self.basedir, str(i))
177             if self.mutable_mode == "create":
178                 data = open(fn,"rb").read()
179                 d1 = self.parent.create_mutable_file(data)
180                 d1.addCallback(lambda n: n.get_uri())
181             elif self.mutable_mode == "upload":
182                 data = open(fn,"rb").read()
183                 d1 = self._n.overwrite(data)
184                 d1.addCallback(lambda res: self._n.get_uri())
185             else:
186                 up = upload.FileName(fn, convergence=None)
187                 d1 = self.parent.upload(up)
188                 d1.addCallback(lambda results: results.uri)
189             d1.addCallback(_record_uri, i)
190             d1.addCallback(_upload_one_file, i+1)
191             return d1
192         d.addCallback(_upload_one_file, 0)
193         def _upload_done(ignored):
194             stop = time.time()
195             self.upload_time = stop - self._start
196         d.addCallback(_upload_done)
197         return d
198
199     def do_download(self):
200         start = time.time()
201         d = defer.succeed(None)
202         def _download_one_file(ignored, i):
203             if i >= self.count:
204                 return
205             n = self.parent.create_node_from_uri(self.uris[i])
206             if not IFileNode.providedBy(n):
207                 raise AssertionError("The URI does not reference a file.")
208             if n.is_mutable():
209                 d1 = n.download_best_version()
210             else:
211                 d1 = n.read(DiscardingConsumer())
212             d1.addCallback(_download_one_file, i+1)
213             return d1
214         d.addCallback(_download_one_file, 0)
215         def _download_done(ignored):
216             stop = time.time()
217             self.download_time = stop - start
218         d.addCallback(_download_done)
219         return d
220
221     def do_cleanup(self, res):
222         for i in range(self.count):
223             fn = os.path.join(self.basedir, str(i))
224             os.unlink(fn)
225         return res
226
227 class DiscardingConsumer:
228     implements(IConsumer)
229     def __init__(self):
230         self.done = False
231     def registerProducer(self, p, streaming):
232         if streaming:
233             p.resumeProducing()
234         else:
235             while not self.done:
236                 p.resumeProducing()
237     def write(self, data):
238         pass
239     def unregisterProducer(self):
240         self.done = True