]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
use hashlib module if available, thus avoiding a DeprecationWarning for importing...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31 try:
32     from hashlib import sha1
33 except ImportError:
34     # hashlib was added in Python 2.5
35     import sha
36     def sha1(x):
37         return sha.new(x)
38
39 import time
40 from zope.interface import implements, Interface
41 from foolscap.api import eventually
42 from allmydata.interfaces import IStorageBroker
43 from allmydata.util import idlib, log
44 from allmydata.util.assertutil import _assert, precondition
45 from allmydata.util.rrefutil import add_version_to_remote_reference
46
47 # who is responsible for de-duplication?
48 #  both?
49 #  IC remembers the unpacked announcements it receives, to provide for late
50 #  subscribers and to remove duplicates
51
52 # if a client subscribes after startup, will they receive old announcements?
53 #  yes
54
55 # who will be responsible for signature checking?
56 #  make it be IntroducerClient, so they can push the filter outwards and
57 #  reduce inbound network traffic
58
59 # what should the interface between StorageFarmBroker and IntroducerClient
60 # look like?
61 #  don't pass signatures: only pass validated blessed-objects
62
63 class StorageFarmBroker:
64     implements(IStorageBroker)
65     """I live on the client, and know about storage servers. For each server
66     that is participating in a grid, I either maintain a connection to it or
67     remember enough information to establish a connection to it on demand.
68     I'm also responsible for subscribing to the IntroducerClient to find out
69     about new servers as they are announced by the Introducer.
70     """
71     def __init__(self, tub, permute_peers):
72         self.tub = tub
73         assert permute_peers # False not implemented yet
74         self.permute_peers = permute_peers
75         # self.descriptors maps serverid -> IServerDescriptor, and keeps
76         # track of all the storage servers that we've heard about. Each
77         # descriptor manages its own Reconnector, and will give us a
78         # RemoteReference when we ask them for it.
79         self.descriptors = {}
80         # self.test_servers are statically configured from unit tests
81         self.test_servers = {} # serverid -> rref
82         self.introducer_client = None
83
84     # these two are used in unit tests
85     def test_add_server(self, serverid, rref):
86         self.test_servers[serverid] = rref
87     def test_add_descriptor(self, serverid, dsc):
88         self.descriptors[serverid] = dsc
89
90     def use_introducer(self, introducer_client):
91         self.introducer_client = ic = introducer_client
92         ic.subscribe_to("storage", self._got_announcement)
93
94     def _got_announcement(self, serverid, ann_d):
95         precondition(isinstance(serverid, str), serverid)
96         precondition(len(serverid) == 20, serverid)
97         assert ann_d["service-name"] == "storage"
98         old = self.descriptors.get(serverid)
99         if old:
100             if old.get_announcement() == ann_d:
101                 return # duplicate
102             # replacement
103             del self.descriptors[serverid]
104             old.stop_connecting()
105             # now we forget about them and start using the new one
106         dsc = NativeStorageClientDescriptor(serverid, ann_d)
107         self.descriptors[serverid] = dsc
108         dsc.start_connecting(self.tub, self._trigger_connections)
109         # the descriptor will manage their own Reconnector, and each time we
110         # need servers, we'll ask them if they're connected or not.
111
112     def _trigger_connections(self):
113         # when one connection is established, reset the timers on all others,
114         # to trigger a reconnection attempt in one second. This is intended
115         # to accelerate server connections when we've been offline for a
116         # while. The goal is to avoid hanging out for a long time with
117         # connections to only a subset of the servers, which would increase
118         # the chances that we'll put shares in weird places (and not update
119         # existing shares of mutable files). See #374 for more details.
120         for dsc in self.descriptors.values():
121             dsc.try_to_connect()
122
123
124
125     def get_servers_for_index(self, peer_selection_index):
126         # first cut: return a list of (peerid, versioned-rref) tuples
127         assert self.permute_peers == True
128         servers = self.get_all_servers()
129         key = peer_selection_index
130         return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
131
132     def get_all_servers(self):
133         # return a frozenset of (peerid, versioned-rref) tuples
134         servers = {}
135         for serverid,rref in self.test_servers.items():
136             servers[serverid] = rref
137         for serverid,dsc in self.descriptors.items():
138             rref = dsc.get_rref()
139             if rref:
140                 servers[serverid] = rref
141         result = frozenset(servers.items())
142         _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
143         return result
144
145     def get_all_serverids(self):
146         serverids = set()
147         serverids.update(self.test_servers.keys())
148         serverids.update(self.descriptors.keys())
149         return frozenset(serverids)
150
151     def get_all_descriptors(self):
152         return sorted(self.descriptors.values(),
153                       key=lambda dsc: dsc.get_serverid())
154
155     def get_nickname_for_serverid(self, serverid):
156         if serverid in self.descriptors:
157             return self.descriptors[serverid].get_nickname()
158         return None
159
160
161 class IServerDescriptor(Interface):
162     def start_connecting(tub, trigger_cb):
163         pass
164     def get_nickname():
165         pass
166     def get_rref():
167         pass
168
169 class NativeStorageClientDescriptor:
170     """I hold information about a storage server that we want to connect to.
171     If we are connected, I hold the RemoteReference, their host address, and
172     the their version information. I remember information about when we were
173     last connected too, even if we aren't currently connected.
174
175     @ivar announcement_time: when we first heard about this service
176     @ivar last_connect_time: when we last established a connection
177     @ivar last_loss_time: when we last lost a connection
178
179     @ivar version: the server's versiondict, from the most recent announcement
180     @ivar nickname: the server's self-reported nickname (unicode), same
181
182     @ivar rref: the RemoteReference, if connected, otherwise None
183     @ivar remote_host: the IAddress, if connected, otherwise None
184     """
185     implements(IServerDescriptor)
186
187     VERSION_DEFAULTS = {
188         "http://allmydata.org/tahoe/protocols/storage/v1" :
189         { "maximum-immutable-share-size": 2**32,
190           "tolerates-immutable-read-overrun": False,
191           "delete-mutable-shares-with-zero-length-writev": False,
192           },
193         "application-version": "unknown: no get_version()",
194         }
195
196     def __init__(self, serverid, ann_d, min_shares=1):
197         self.serverid = serverid
198         self.announcement = ann_d
199         self.min_shares = min_shares
200
201         self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
202         self.announcement_time = time.time()
203         self.last_connect_time = None
204         self.last_loss_time = None
205         self.remote_host = None
206         self.rref = None
207         self._reconnector = None
208         self._trigger_cb = None
209
210     def get_serverid(self):
211         return self.serverid
212
213     def get_nickname(self):
214         return self.announcement["nickname"].decode("utf-8")
215     def get_announcement(self):
216         return self.announcement
217     def get_remote_host(self):
218         return self.remote_host
219     def get_last_connect_time(self):
220         return self.last_connect_time
221     def get_last_loss_time(self):
222         return self.last_loss_time
223     def get_announcement_time(self):
224         return self.announcement_time
225
226     def start_connecting(self, tub, trigger_cb):
227         furl = self.announcement["FURL"]
228         self._trigger_cb = trigger_cb
229         self._reconnector = tub.connectTo(furl, self._got_connection)
230
231     def _got_connection(self, rref):
232         lp = log.msg(format="got connection to %(serverid)s, getting versions",
233                      serverid=self.serverid_s,
234                      facility="tahoe.storage_broker", umid="coUECQ")
235         if self._trigger_cb:
236             eventually(self._trigger_cb)
237         default = self.VERSION_DEFAULTS
238         d = add_version_to_remote_reference(rref, default)
239         d.addCallback(self._got_versioned_service, lp)
240         d.addErrback(log.err, format="storageclient._got_connection",
241                      serverid=self.serverid_s, umid="Sdq3pg")
242
243     def _got_versioned_service(self, rref, lp):
244         log.msg(format="%(serverid)s provided version info %(version)s",
245                 serverid=self.serverid_s, version=rref.version,
246                 facility="tahoe.storage_broker", umid="SWmJYg",
247                 level=log.NOISY, parent=lp)
248
249         self.last_connect_time = time.time()
250         self.remote_host = rref.getPeer()
251         self.rref = rref
252         rref.notifyOnDisconnect(self._lost)
253
254     def get_rref(self):
255         return self.rref
256
257     def _lost(self):
258         log.msg(format="lost connection to %(serverid)s",
259                 serverid=self.serverid_s,
260                 facility="tahoe.storage_broker", umid="zbRllw")
261         self.last_loss_time = time.time()
262         self.rref = None
263         self.remote_host = None
264
265     def stop_connecting(self):
266         # used when this descriptor has been superceded by another
267         self._reconnector.stopConnecting()
268
269     def try_to_connect(self):
270         # used when the broker wants us to hurry up
271         self._reconnector.reset()
272
273 class UnknownServerTypeError(Exception):
274     pass