]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/control.py
upload: return an UploadResults instance (with .uri) instead of just a URI
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
1
2 import os, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from allmydata.interfaces import RIControlClient
8 from allmydata.util import testutil, fileutil, mathutil
9 from allmydata import upload, download
10 from twisted.python import log
11
12 def get_memory_usage():
13     # this is obviously linux-specific
14     stat_names = ("VmPeak",
15                   "VmSize",
16                   #"VmHWM",
17                   "VmData")
18     stats = {}
19     try:
20         for line in open("/proc/self/status", "r").readlines():
21             name, right = line.split(":",2)
22             if name in stat_names:
23                 assert right.endswith(" kB\n")
24                 right = right[:-4]
25                 stats[name] = int(right) * 1024
26     except:
27         # Probably not on (a compatible version of) Linux
28         stats['VmSize'] = 0
29         stats['VmPeak'] = 0
30     return stats
31
32 def log_memory_usage(where=""):
33     stats = get_memory_usage()
34     log.msg("VmSize: %9d  VmPeak: %9d  %s" % (stats["VmSize"],
35                                               stats["VmPeak"],
36                                               where))
37
38
39 class ControlServer(Referenceable, service.Service, testutil.PollMixin):
40     implements(RIControlClient)
41
42     def remote_wait_for_client_connections(self, num_clients):
43         return self.parent.debug_wait_for_client_connections(num_clients)
44
45     def remote_upload_from_file_to_uri(self, filename):
46         uploader = self.parent.getServiceNamed("uploader")
47         u = upload.FileName(filename)
48         d = uploader.upload(u)
49         d.addCallback(lambda results: results.uri)
50         return d
51
52     def remote_download_from_uri_to_file(self, uri, filename):
53         downloader = self.parent.getServiceNamed("downloader")
54         d = downloader.download_to_filename(uri, filename)
55         d.addCallback(lambda res: filename)
56         return d
57
58     def remote_speed_test(self, count, size, mutable):
59         assert size > 8
60         log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
61                                                                mutable))
62         st = SpeedTest(self.parent, count, size, mutable)
63         return st.run()
64
65     def remote_get_memory_usage(self):
66         return get_memory_usage()
67
68     def remote_measure_peer_response_time(self):
69         # I'd like to average together several pings, but I don't want this
70         # phase to take more than 10 seconds. Expect worst-case latency to be
71         # 300ms.
72         results = {}
73         conns = self.parent.introducer_client.get_all_connections_for("storage")
74         everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
75         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
76         everyone = everyone * num_pings
77         d = self._do_one_ping(None, everyone, results)
78         return d
79     def _do_one_ping(self, res, everyone_left, results):
80         if not everyone_left:
81             return results
82         peerid, connection = everyone_left.pop(0)
83         start = time.time()
84         d = connection.callRemote("get_versions")
85         def _done(ignored):
86             stop = time.time()
87             elapsed = stop - start
88             if peerid in results:
89                 results[peerid].append(elapsed)
90             else:
91                 results[peerid] = [elapsed]
92         d.addCallback(_done)
93         d.addCallback(self._do_one_ping, everyone_left, results)
94         def _average(res):
95             averaged = {}
96             for peerid,times in results.iteritems():
97                 averaged[peerid] = sum(times) / len(times)
98             return averaged
99         d.addCallback(_average)
100         return d
101
102 class SpeedTest:
103     def __init__(self, parent, count, size, mutable):
104         self.parent = parent
105         self.count = count
106         self.size = size
107         self.mutable_mode = mutable
108         self.uris = {}
109         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
110
111     def run(self):
112         self.create_data()
113         d = self.do_upload()
114         d.addCallback(lambda res: self.do_download())
115         d.addBoth(self.do_cleanup)
116         d.addCallback(lambda res: (self.upload_time, self.download_time))
117         return d
118
119     def create_data(self):
120         fileutil.make_dirs(self.basedir)
121         for i in range(self.count):
122             s = self.size
123             fn = os.path.join(self.basedir, str(i))
124             if os.path.exists(fn):
125                 os.unlink(fn)
126             f = open(fn, "w")
127             f.write(os.urandom(8))
128             s -= 8
129             while s > 0:
130                 chunk = min(s, 4096)
131                 f.write("\x00" * chunk)
132                 s -= chunk
133             f.close()
134
135     def do_upload(self):
136         d = defer.succeed(None)
137         def _create_slot(res):
138             d1 = self.parent.create_mutable_file("")
139             def _created(n):
140                 self._n = n
141             d1.addCallback(_created)
142             return d1
143         if self.mutable_mode == "upload":
144             d.addCallback(_create_slot)
145         def _start(res):
146             self._start = time.time()
147         d.addCallback(_start)
148
149         def _record_uri(uri, i):
150             self.uris[i] = uri
151         def _upload_one_file(ignored, i):
152             if i >= self.count:
153                 return
154             fn = os.path.join(self.basedir, str(i))
155             if self.mutable_mode == "create":
156                 data = open(fn,"rb").read()
157                 d1 = self.parent.create_mutable_file(data)
158                 d1.addCallback(lambda n: n.get_uri())
159             elif self.mutable_mode == "upload":
160                 data = open(fn,"rb").read()
161                 d1 = self._n.replace(data)
162                 d1.addCallback(lambda res: self._n.get_uri())
163             else:
164                 up = upload.FileName(fn)
165                 d1 = self.parent.upload(up)
166                 d1.addCallback(lambda results: results.uri)
167             d1.addCallback(_record_uri, i)
168             d1.addCallback(_upload_one_file, i+1)
169             return d1
170         d.addCallback(_upload_one_file, 0)
171         def _upload_done(ignored):
172             stop = time.time()
173             self.upload_time = stop - self._start
174         d.addCallback(_upload_done)
175         return d
176
177     def do_download(self):
178         start = time.time()
179         d = defer.succeed(None)
180         def _download_one_file(ignored, i):
181             if i >= self.count:
182                 return
183             n = self.parent.create_node_from_uri(self.uris[i])
184             d1 = n.download(download.FileHandle(Discard()))
185             d1.addCallback(_download_one_file, i+1)
186             return d1
187         d.addCallback(_download_one_file, 0)
188         def _download_done(ignored):
189             stop = time.time()
190             self.download_time = stop - start
191         d.addCallback(_download_done)
192         return d
193
194     def do_cleanup(self, res):
195         for i in range(self.count):
196             fn = os.path.join(self.basedir, str(i))
197             os.unlink(fn)
198         return res
199
200 class Discard:
201     def write(self, data):
202         pass
203     # download_to_filehandle explicitly does not close the filehandle it was
204     # given: that is reserved for the provider of the filehandle. Therefore
205     # the lack of a close() method on this otherwise filehandle-like object
206     # is a part of the test.