]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable WIP: rewrite ServerMap data structure, add tests
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from allmydata.interfaces import IPublishStatus
8 from allmydata.util import base32, hashutil, mathutil, idlib, log
9 from allmydata import hashtree, codec, storage
10 from pycryptopp.cipher.aes import AES
11
12 from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
13 from servermap import ServerMap
14 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
15      unpack_checkstring, SIGNED_PREFIX
16
17 class PublishStatus:
18     implements(IPublishStatus)
19     statusid_counter = count(0)
20     def __init__(self):
21         self.timings = {}
22         self.timings["per_server"] = {}
23         self.privkey_from = None
24         self.peers_queried = None
25         self.sharemap = None # DictOfSets
26         self.problems = {}
27         self.active = True
28         self.storage_index = None
29         self.helper = False
30         self.encoding = ("?", "?")
31         self.initial_read_size = None
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, op, elapsed):
39         assert op in ("read", "write")
40         if peerid not in self.timings["per_server"]:
41             self.timings["per_server"][peerid] = []
42         self.timings["per_server"][peerid].append((op,elapsed))
43
44     def get_started(self):
45         return self.started
46     def get_storage_index(self):
47         return self.storage_index
48     def get_encoding(self):
49         return self.encoding
50     def using_helper(self):
51         return self.helper
52     def get_size(self):
53         return self.size
54     def get_status(self):
55         return self.status
56     def get_progress(self):
57         return self.progress
58     def get_active(self):
59         return self.active
60     def get_counter(self):
61         return self.counter
62
63     def set_storage_index(self, si):
64         self.storage_index = si
65     def set_helper(self, helper):
66         self.helper = helper
67     def set_encoding(self, k, n):
68         self.encoding = (k, n)
69     def set_size(self, size):
70         self.size = size
71     def set_status(self, status):
72         self.status = status
73     def set_progress(self, value):
74         self.progress = value
75     def set_active(self, value):
76         self.active = value
77
78 class Publish:
79     """I represent a single act of publishing the mutable file to the grid. I
80     will only publish my data if the servermap I am using still represents
81     the current state of the world.
82
83     To make the initial publish, set servermap to None.
84     """
85
86     # we limit the segment size as usual to constrain our memory footprint.
87     # The max segsize is higher for mutable files, because we want to support
88     # dirnodes with up to 10k children, and each child uses about 330 bytes.
89     # If you actually put that much into a directory you'll be using a
90     # footprint of around 14MB, which is higher than we'd like, but it is
91     # more important right now to support large directories than to make
92     # memory usage small when you use them. Once we implement MDMF (with
93     # multiple segments), we will drop this back down, probably to 128KiB.
94     MAX_SEGMENT_SIZE = 3500000
95
96     def __init__(self, filenode, servermap):
97         self._node = filenode
98         self._servermap = servermap
99         self._storage_index = self._node.get_storage_index()
100         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
101         num = self._node._client.log("Publish(%s): starting" % prefix)
102         self._log_number = num
103         self._running = True
104
105     def log(self, *args, **kwargs):
106         if 'parent' not in kwargs:
107             kwargs['parent'] = self._log_number
108         return log.msg(*args, **kwargs)
109
110     def log_err(self, *args, **kwargs):
111         if 'parent' not in kwargs:
112             kwargs['parent'] = self._log_number
113         return log.err(*args, **kwargs)
114
115     def publish(self, newdata):
116         """Publish the filenode's current contents.  Returns a Deferred that
117         fires (with None) when the publish has done as much work as it's ever
118         going to do, or errbacks with ConsistencyError if it detects a
119         simultaneous write.
120         """
121
122         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
123         # 2: perform peer selection, get candidate servers
124         #  2a: send queries to n+epsilon servers, to determine current shares
125         #  2b: based upon responses, create target map
126         # 3: send slot_testv_and_readv_and_writev messages
127         # 4: as responses return, update share-dispatch table
128         # 4a: may need to run recovery algorithm
129         # 5: when enough responses are back, we're done
130
131         self.log("starting publish, datalen is %s" % len(newdata))
132
133         self.done_deferred = defer.Deferred()
134
135         self._writekey = self._node.get_writekey()
136         assert self._writekey, "need write capability to publish"
137
138         # first, which servers will we publish to? We require that the
139         # servermap was updated in MODE_WRITE, so we can depend upon the
140         # peerlist computed by that process instead of computing our own.
141         if self._servermap:
142             assert self._servermap.last_update_mode == MODE_WRITE
143             # we will push a version that is one larger than anything present
144             # in the grid, according to the servermap.
145             self._new_seqnum = self._servermap.highest_seqnum() + 1
146         else:
147             # If we don't have a servermap, that's because we're doing the
148             # initial publish
149             self._new_seqnum = 1
150             self._servermap = ServerMap()
151
152         self.log(format="new seqnum will be %(seqnum)d",
153                  seqnum=self._new_seqnum, level=log.NOISY)
154
155         # having an up-to-date servermap (or using a filenode that was just
156         # created for the first time) also guarantees that the following
157         # fields are available
158         self.readkey = self._node.get_readkey()
159         self.required_shares = self._node.get_required_shares()
160         assert self.required_shares is not None
161         self.total_shares = self._node.get_total_shares()
162         assert self.total_shares is not None
163         self._pubkey = self._node.get_pubkey()
164         assert self._pubkey
165         self._privkey = self._node.get_privkey()
166         assert self._privkey
167         self._encprivkey = self._node.get_encprivkey()
168
169         client = self._node._client
170         full_peerlist = client.get_permuted_peers("storage",
171                                                   self._storage_index)
172         self.full_peerlist = full_peerlist # for use later, immutable
173         self.bad_peers = set() # peerids who have errbacked/refused requests
174
175         self.newdata = newdata
176         self.salt = os.urandom(16)
177
178         self.setup_encoding_parameters()
179
180         self.surprised = False
181
182         # we keep track of three tables. The first is our goal: which share
183         # we want to see on which servers. This is initially populated by the
184         # existing servermap.
185         self.goal = set() # pairs of (peerid, shnum) tuples
186
187         # the second table is our list of outstanding queries: those which
188         # are in flight and may or may not be delivered, accepted, or
189         # acknowledged. Items are added to this table when the request is
190         # sent, and removed when the response returns (or errbacks).
191         self.outstanding = set() # (peerid, shnum) tuples
192
193         # the third is a table of successes: share which have actually been
194         # placed. These are populated when responses come back with success.
195         # When self.placed == self.goal, we're done.
196         self.placed = set() # (peerid, shnum) tuples
197
198         # we also keep a mapping from peerid to RemoteReference. Each time we
199         # pull a connection out of the full peerlist, we add it to this for
200         # use later.
201         self.connections = {}
202
203         # we use the servermap to populate the initial goal: this way we will
204         # try to update each existing share in place.
205         for (peerid, shnum) in self._servermap.servermap:
206             self.goal.add( (peerid, shnum) )
207             self.connections[peerid] = self._servermap.connections[peerid]
208
209         # create the shares. We'll discard these as they are delivered. SMDF:
210         # we're allowed to hold everything in memory.
211
212         d = self._encrypt_and_encode()
213         d.addCallback(self._generate_shares)
214         d.addCallback(self.loop) # trigger delivery
215         d.addErrback(self._fatal_error)
216
217         return self.done_deferred
218
219     def setup_encoding_parameters(self):
220         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
221         # this must be a multiple of self.required_shares
222         segment_size = mathutil.next_multiple(segment_size,
223                                               self.required_shares)
224         self.segment_size = segment_size
225         if segment_size:
226             self.num_segments = mathutil.div_ceil(len(self.newdata),
227                                                   segment_size)
228         else:
229             self.num_segments = 0
230         assert self.num_segments in [0, 1,] # SDMF restrictions
231
232     def _fatal_error(self, f):
233         self.log("error during loop", failure=f, level=log.SCARY)
234         self._done(f)
235
236     def loop(self, ignored=None):
237         self.log("entering loop", level=log.NOISY)
238         self.update_goal()
239         # how far are we from our goal?
240         needed = self.goal - self.placed - self.outstanding
241
242         if needed:
243             # we need to send out new shares
244             self.log(format="need to send %(needed)d new shares",
245                      needed=len(needed), level=log.NOISY)
246             d = self._send_shares(needed)
247             d.addCallback(self.loop)
248             d.addErrback(self._fatal_error)
249             return
250
251         if self.outstanding:
252             # queries are still pending, keep waiting
253             self.log(format="%(outstanding)d queries still outstanding",
254                      outstanding=len(self.outstanding),
255                      level=log.NOISY)
256             return
257
258         # no queries outstanding, no placements needed: we're done
259         self.log("no queries outstanding, no placements needed: done",
260                  level=log.OPERATIONAL)
261         return self._done(None)
262
263     def log_goal(self, goal):
264         logmsg = []
265         for (peerid, shnum) in goal:
266             logmsg.append("sh%d to [%s]" % (shnum,
267                                             idlib.shortnodeid_b2a(peerid)))
268         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
269         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
270                  level=log.NOISY)
271
272     def update_goal(self):
273         # first, remove any bad peers from our goal
274         self.goal = set([ (peerid, shnum)
275                           for (peerid, shnum) in self.goal
276                           if peerid not in self.bad_peers ])
277
278         # find the homeless shares:
279         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
280         homeless_shares = set(range(self.total_shares)) - homefull_shares
281         homeless_shares = sorted(list(homeless_shares))
282         # place them somewhere. We prefer unused servers at the beginning of
283         # the available peer list.
284
285         if not homeless_shares:
286             return
287
288         # if log.recording_noisy
289         if False:
290             self.log_goal(self.goal)
291
292         # if an old share X is on a node, put the new share X there too.
293         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
294         #       shares from existing peers to new (less-crowded) ones. The
295         #       old shares must still be updated.
296         # TODO: 2: move those shares instead of copying them, to reduce future
297         #       update work
298
299         # this is a bit CPU intensive but easy to analyze. We create a sort
300         # order for each peerid. If the peerid is marked as bad, we don't
301         # even put them in the list. Then we care about the number of shares
302         # which have already been assigned to them. After that we care about
303         # their permutation order.
304         old_assignments = DictOfSets()
305         for (peerid, shnum) in self.goal:
306             old_assignments.add(peerid, shnum)
307
308         peerlist = []
309         for i, (peerid, ss) in enumerate(self.full_peerlist):
310             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
311             peerlist.append(entry)
312         peerlist.sort()
313
314         new_assignments = []
315         # we then index this peerlist with an integer, because we may have to
316         # wrap. We update the goal as we go.
317         i = 0
318         for shnum in homeless_shares:
319             (ignored1, ignored2, peerid, ss) = peerlist[i]
320             self.goal.add( (peerid, shnum) )
321             self.connections[peerid] = ss
322             i += 1
323             if i >= len(peerlist):
324                 i = 0
325
326
327
328     def _encrypt_and_encode(self):
329         # this returns a Deferred that fires with a list of (sharedata,
330         # sharenum) tuples. TODO: cache the ciphertext, only produce the
331         # shares that we care about.
332         self.log("_encrypt_and_encode")
333
334         #started = time.time()
335
336         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
337         enc = AES(key)
338         crypttext = enc.process(self.newdata)
339         assert len(crypttext) == len(self.newdata)
340
341         #now = time.time()
342         #self._status.timings["encrypt"] = now - started
343         #started = now
344
345         # now apply FEC
346
347         fec = codec.CRSEncoder()
348         fec.set_params(self.segment_size,
349                        self.required_shares, self.total_shares)
350         piece_size = fec.get_block_size()
351         crypttext_pieces = [None] * self.required_shares
352         for i in range(len(crypttext_pieces)):
353             offset = i * piece_size
354             piece = crypttext[offset:offset+piece_size]
355             piece = piece + "\x00"*(piece_size - len(piece)) # padding
356             crypttext_pieces[i] = piece
357             assert len(piece) == piece_size
358
359         d = fec.encode(crypttext_pieces)
360         def _done_encoding(res):
361             #elapsed = time.time() - started
362             #self._status.timings["encode"] = elapsed
363             return res
364         d.addCallback(_done_encoding)
365         return d
366
367     def _generate_shares(self, shares_and_shareids):
368         # this sets self.shares and self.root_hash
369         self.log("_generate_shares")
370         #started = time.time()
371
372         # we should know these by now
373         privkey = self._privkey
374         encprivkey = self._encprivkey
375         pubkey = self._pubkey
376
377         (shares, share_ids) = shares_and_shareids
378
379         assert len(shares) == len(share_ids)
380         assert len(shares) == self.total_shares
381         all_shares = {}
382         block_hash_trees = {}
383         share_hash_leaves = [None] * len(shares)
384         for i in range(len(shares)):
385             share_data = shares[i]
386             shnum = share_ids[i]
387             all_shares[shnum] = share_data
388
389             # build the block hash tree. SDMF has only one leaf.
390             leaves = [hashutil.block_hash(share_data)]
391             t = hashtree.HashTree(leaves)
392             block_hash_trees[shnum] = block_hash_tree = list(t)
393             share_hash_leaves[shnum] = t[0]
394         for leaf in share_hash_leaves:
395             assert leaf is not None
396         share_hash_tree = hashtree.HashTree(share_hash_leaves)
397         share_hash_chain = {}
398         for shnum in range(self.total_shares):
399             needed_hashes = share_hash_tree.needed_hashes(shnum)
400             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
401                                               for i in needed_hashes ] )
402         root_hash = share_hash_tree[0]
403         assert len(root_hash) == 32
404         self.log("my new root_hash is %s" % base32.b2a(root_hash))
405
406         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
407                              self.required_shares, self.total_shares,
408                              self.segment_size, len(self.newdata))
409
410         # now pack the beginning of the share. All shares are the same up
411         # to the signature, then they have divergent share hash chains,
412         # then completely different block hash trees + salt + share data,
413         # then they all share the same encprivkey at the end. The sizes
414         # of everything are the same for all shares.
415
416         #sign_started = time.time()
417         signature = privkey.sign(prefix)
418         #self._status.timings["sign"] = time.time() - sign_started
419
420         verification_key = pubkey.serialize()
421
422         final_shares = {}
423         for shnum in range(self.total_shares):
424             final_share = pack_share(prefix,
425                                      verification_key,
426                                      signature,
427                                      share_hash_chain[shnum],
428                                      block_hash_trees[shnum],
429                                      all_shares[shnum],
430                                      encprivkey)
431             final_shares[shnum] = final_share
432         #elapsed = time.time() - started
433         #self._status.timings["pack"] = elapsed
434         self.shares = final_shares
435         self.root_hash = root_hash
436
437         # we also need to build up the version identifier for what we're
438         # pushing. Extract the offsets from one of our shares.
439         assert final_shares
440         offsets = unpack_header(final_shares.values()[0])[-1]
441         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
442         verinfo = (self._new_seqnum, root_hash, self.salt,
443                    self.segment_size, len(self.newdata),
444                    self.required_shares, self.total_shares,
445                    prefix, offsets_tuple)
446         self.versioninfo = verinfo
447
448
449
450     def _send_shares(self, needed):
451         self.log("_send_shares")
452         #started = time.time()
453
454         # we're finally ready to send out our shares. If we encounter any
455         # surprises here, it's because somebody else is writing at the same
456         # time. (Note: in the future, when we remove the _query_peers() step
457         # and instead speculate about [or remember] which shares are where,
458         # surprises here are *not* indications of UncoordinatedWriteError,
459         # and we'll need to respond to them more gracefully.)
460
461         # needed is a set of (peerid, shnum) tuples. The first thing we do is
462         # organize it by peerid.
463
464         peermap = DictOfSets()
465         for (peerid, shnum) in needed:
466             peermap.add(peerid, shnum)
467
468         # the next thing is to build up a bunch of test vectors. The
469         # semantics of Publish are that we perform the operation if the world
470         # hasn't changed since the ServerMap was constructed (more or less).
471         # For every share we're trying to place, we create a test vector that
472         # tests to see if the server*share still corresponds to the
473         # map.
474
475         all_tw_vectors = {} # maps peerid to tw_vectors
476         sm = self._servermap.servermap
477
478         for key in needed:
479             (peerid, shnum) = key
480
481             if key in sm:
482                 # an old version of that share already exists on the
483                 # server, according to our servermap. We will create a
484                 # request that attempts to replace it.
485                 old_versionid, old_timestamp = sm[key]
486                 (old_seqnum, old_root_hash, old_salt, old_segsize,
487                  old_datalength, old_k, old_N, old_prefix,
488                  old_offsets_tuple) = old_versionid
489                 old_checkstring = pack_checkstring(old_seqnum,
490                                                    old_root_hash,
491                                                    old_salt)
492                 testv = (0, len(old_checkstring), "eq", old_checkstring)
493
494             else:
495                 # add a testv that requires the share not exist
496                 #testv = (0, 1, 'eq', "")
497
498                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
499                 # constraints are handled. If the same object is referenced
500                 # multiple times inside the arguments, foolscap emits a
501                 # 'reference' token instead of a distinct copy of the
502                 # argument. The bug is that these 'reference' tokens are not
503                 # accepted by the inbound constraint code. To work around
504                 # this, we need to prevent python from interning the
505                 # (constant) tuple, by creating a new copy of this vector
506                 # each time. This bug is fixed in later versions of foolscap.
507                 testv = tuple([0, 1, 'eq', ""])
508
509             testvs = [testv]
510             # the write vector is simply the share
511             writev = [(0, self.shares[shnum])]
512
513             if peerid not in all_tw_vectors:
514                 all_tw_vectors[peerid] = {}
515                 # maps shnum to (testvs, writevs, new_length)
516             assert shnum not in all_tw_vectors[peerid]
517
518             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
519
520         # we read the checkstring back from each share, however we only use
521         # it to detect whether there was a new share that we didn't know
522         # about. The success or failure of the write will tell us whether
523         # there was a collision or not. If there is a collision, the first
524         # thing we'll do is update the servermap, which will find out what
525         # happened. We could conceivably reduce a roundtrip by using the
526         # readv checkstring to populate the servermap, but really we'd have
527         # to read enough data to validate the signatures too, so it wouldn't
528         # be an overall win.
529         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
530
531         # ok, send the messages!
532         started = time.time()
533         dl = []
534         for (peerid, tw_vectors) in all_tw_vectors.items():
535
536             write_enabler = self._node.get_write_enabler(peerid)
537             renew_secret = self._node.get_renewal_secret(peerid)
538             cancel_secret = self._node.get_cancel_secret(peerid)
539             secrets = (write_enabler, renew_secret, cancel_secret)
540             shnums = tw_vectors.keys()
541
542             d = self._do_testreadwrite(peerid, secrets,
543                                        tw_vectors, read_vector)
544             d.addCallbacks(self._got_write_answer, self._got_write_error,
545                            callbackArgs=(peerid, shnums, started),
546                            errbackArgs=(peerid, shnums, started))
547             d.addErrback(self._fatal_error)
548             dl.append(d)
549
550         return defer.DeferredList(dl) # purely for testing
551
552     def _do_testreadwrite(self, peerid, secrets,
553                           tw_vectors, read_vector):
554         storage_index = self._storage_index
555         ss = self.connections[peerid]
556
557         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
558         d = ss.callRemote("slot_testv_and_readv_and_writev",
559                           storage_index,
560                           secrets,
561                           tw_vectors,
562                           read_vector)
563         return d
564
565     def _got_write_answer(self, answer, peerid, shnums, started):
566         lp = self.log("_got_write_answer from %s" %
567                       idlib.shortnodeid_b2a(peerid))
568         for shnum in shnums:
569             self.outstanding.discard( (peerid, shnum) )
570
571         wrote, read_data = answer
572
573         if not wrote:
574             self.log("our testv failed, so the write did not happen",
575                      parent=lp, level=log.WEIRD)
576             self.surprised = True
577             self.bad_peers.add(peerid) # don't ask them again
578             # use the checkstring to add information to the log message
579             for (shnum,readv) in read_data.items():
580                 checkstring = readv[0]
581                 (other_seqnum,
582                  other_roothash,
583                  other_salt) = unpack_checkstring(checkstring)
584                 expected_version = self._servermap.version_on_peer(peerid,
585                                                                    shnum)
586                 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
587                  offsets_tuple) = expected_version
588                 self.log("somebody modified the share on us:"
589                          " shnum=%d: I thought they had #%d:R=%s,"
590                          " but testv reported #%d:R=%s" %
591                          (shnum,
592                           seqnum, base32.b2a(root_hash)[:4],
593                           other_seqnum, base32.b2a(other_roothash)[:4]),
594                          parent=lp, level=log.NOISY)
595             # self.loop() will take care of finding new homes
596             return
597
598         for shnum in shnums:
599             self.placed.add( (peerid, shnum) )
600             # and update the servermap
601             self._servermap.add_new_share(peerid, shnum,
602                                           self.versioninfo, started)
603
604         surprise_shares = set(read_data.keys()) - set(shnums)
605         if surprise_shares:
606             self.log("they had shares %s that we didn't know about" %
607                      (list(surprise_shares),),
608                      parent=lp, level=log.WEIRD)
609             self.surprised = True
610             return
611
612         # self.loop() will take care of checking to see if we're done
613         return
614
615     def _got_write_error(self, f, peerid, shnums, started):
616         for shnum in shnums:
617             self.outstanding.discard( (peerid, shnum) )
618         self.bad_peers.add(peerid)
619         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
620                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
621                  failure=f,
622                  level=log.UNUSUAL)
623         # self.loop() will take care of checking to see if we're done
624         return
625
626
627
628     def _log_dispatch_map(self, dispatch_map):
629         for shnum, places in dispatch_map.items():
630             sent_to = [(idlib.shortnodeid_b2a(peerid),
631                         seqnum,
632                         base32.b2a(root_hash)[:4])
633                        for (peerid,seqnum,root_hash) in places]
634             self.log(" share %d sent to: %s" % (shnum, sent_to),
635                      level=log.NOISY)
636
637     def _maybe_recover(self, (surprised, dispatch_map)):
638         self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
639                  level=log.NOISY)
640         self._log_dispatch_map(dispatch_map)
641         if not surprised:
642             self.log(" no recovery needed")
643             return
644         self.log("We need recovery!", level=log.WEIRD)
645         print "RECOVERY NOT YET IMPLEMENTED"
646         # but dispatch_map will help us do it
647         raise UncoordinatedWriteError("I was surprised!")
648
649     def _done(self, res):
650         if not self._running:
651             return
652         self._running = False
653         #now = time.time()
654         #self._status.timings["total"] = now - self._started
655         #self._status.set_active(False)
656         #self._status.set_status("Done")
657         #self._status.set_progress(1.0)
658         self.done_deferred.callback(res)
659         return None
660
661     def get_status(self):
662         return self._status
663
664