]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
4f61ad13f8268834df7209a311a82556e38a01d8
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, time
4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
10                                  IMutableUploadable
11 from allmydata.util import base32, hashutil, mathutil, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
17
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19      UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22                                      unpack_mdmf_checkstring, \
23                                      unpack_sdmf_checkstring, \
24                                      MDMFSlotWriteProxy, \
25                                      SDMFSlotWriteProxy
26
27 KiB = 1024
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
31 DONE_STATE = 2
32
33 class PublishStatus:
34     implements(IPublishStatus)
35     statusid_counter = count(0)
36     def __init__(self):
37         self.timings = {}
38         self.timings["send_per_server"] = {}
39         self.timings["encrypt"] = 0.0
40         self.timings["encode"] = 0.0
41         self.servermap = None
42         self._problems = {}
43         self.active = True
44         self.storage_index = None
45         self.helper = False
46         self.encoding = ("?", "?")
47         self.size = None
48         self.status = "Not started"
49         self.progress = 0.0
50         self.counter = self.statusid_counter.next()
51         self.started = time.time()
52
53     def add_per_server_time(self, server, elapsed):
54         if server not in self.timings["send_per_server"]:
55             self.timings["send_per_server"][server] = []
56         self.timings["send_per_server"][server].append(elapsed)
57     def accumulate_encode_time(self, elapsed):
58         self.timings["encode"] += elapsed
59     def accumulate_encrypt_time(self, elapsed):
60         self.timings["encrypt"] += elapsed
61
62     def get_started(self):
63         return self.started
64     def get_storage_index(self):
65         return self.storage_index
66     def get_encoding(self):
67         return self.encoding
68     def using_helper(self):
69         return self.helper
70     def get_servermap(self):
71         return self.servermap
72     def get_size(self):
73         return self.size
74     def get_status(self):
75         return self.status
76     def get_progress(self):
77         return self.progress
78     def get_active(self):
79         return self.active
80     def get_counter(self):
81         return self.counter
82     def get_problems(self):
83         return self._problems
84
85     def set_storage_index(self, si):
86         self.storage_index = si
87     def set_helper(self, helper):
88         self.helper = helper
89     def set_servermap(self, servermap):
90         self.servermap = servermap
91     def set_encoding(self, k, n):
92         self.encoding = (k, n)
93     def set_size(self, size):
94         self.size = size
95     def set_status(self, status):
96         self.status = status
97     def set_progress(self, value):
98         self.progress = value
99     def set_active(self, value):
100         self.active = value
101
102 class LoopLimitExceededError(Exception):
103     pass
104
105 class Publish:
106     """I represent a single act of publishing the mutable file to the grid. I
107     will only publish my data if the servermap I am using still represents
108     the current state of the world.
109
110     To make the initial publish, set servermap to None.
111     """
112
113     def __init__(self, filenode, storage_broker, servermap):
114         self._node = filenode
115         self._storage_broker = storage_broker
116         self._servermap = servermap
117         self._storage_index = self._node.get_storage_index()
118         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
119         num = self.log("Publish(%s): starting" % prefix, parent=None)
120         self._log_number = num
121         self._running = True
122         self._first_write_error = None
123         self._last_failure = None
124
125         self._status = PublishStatus()
126         self._status.set_storage_index(self._storage_index)
127         self._status.set_helper(False)
128         self._status.set_progress(0.0)
129         self._status.set_active(True)
130         self._version = self._node.get_version()
131         assert self._version in (SDMF_VERSION, MDMF_VERSION)
132
133
134     def get_status(self):
135         return self._status
136
137     def log(self, *args, **kwargs):
138         if 'parent' not in kwargs:
139             kwargs['parent'] = self._log_number
140         if "facility" not in kwargs:
141             kwargs["facility"] = "tahoe.mutable.publish"
142         return log.msg(*args, **kwargs)
143
144
145     def update(self, data, offset, blockhashes, version):
146         """
147         I replace the contents of this file with the contents of data,
148         starting at offset. I return a Deferred that fires with None
149         when the replacement has been completed, or with an error if
150         something went wrong during the process.
151
152         Note that this process will not upload new shares. If the file
153         being updated is in need of repair, callers will have to repair
154         it on their own.
155         """
156         # How this works:
157         # 1: Make server assignments. We'll assign each share that we know
158         # about on the grid to that server that currently holds that
159         # share, and will not place any new shares.
160         # 2: Setup encoding parameters. Most of these will stay the same
161         # -- datalength will change, as will some of the offsets.
162         # 3. Upload the new segments.
163         # 4. Be done.
164         assert IMutableUploadable.providedBy(data)
165
166         self.data = data
167
168         # XXX: Use the MutableFileVersion instead.
169         self.datalength = self._node.get_size()
170         if data.get_size() > self.datalength:
171             self.datalength = data.get_size()
172
173         self.log("starting update")
174         self.log("adding new data of length %d at offset %d" % \
175                     (data.get_size(), offset))
176         self.log("new data length is %d" % self.datalength)
177         self._status.set_size(self.datalength)
178         self._status.set_status("Started")
179         self._started = time.time()
180
181         self.done_deferred = defer.Deferred()
182
183         self._writekey = self._node.get_writekey()
184         assert self._writekey, "need write capability to publish"
185
186         # first, which servers will we publish to? We require that the
187         # servermap was updated in MODE_WRITE, so we can depend upon the
188         # serverlist computed by that process instead of computing our own.
189         assert self._servermap
190         assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
191         # we will push a version that is one larger than anything present
192         # in the grid, according to the servermap.
193         self._new_seqnum = self._servermap.highest_seqnum() + 1
194         self._status.set_servermap(self._servermap)
195
196         self.log(format="new seqnum will be %(seqnum)d",
197                  seqnum=self._new_seqnum, level=log.NOISY)
198
199         # We're updating an existing file, so all of the following
200         # should be available.
201         self.readkey = self._node.get_readkey()
202         self.required_shares = self._node.get_required_shares()
203         assert self.required_shares is not None
204         self.total_shares = self._node.get_total_shares()
205         assert self.total_shares is not None
206         self._status.set_encoding(self.required_shares, self.total_shares)
207
208         self._pubkey = self._node.get_pubkey()
209         assert self._pubkey
210         self._privkey = self._node.get_privkey()
211         assert self._privkey
212         self._encprivkey = self._node.get_encprivkey()
213
214         sb = self._storage_broker
215         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
216         self.full_serverlist = full_serverlist # for use later, immutable
217         self.bad_servers = set() # servers who have errbacked/refused requests
218
219         # This will set self.segment_size, self.num_segments, and
220         # self.fec. TODO: Does it know how to do the offset? Probably
221         # not. So do that part next.
222         self.setup_encoding_parameters(offset=offset)
223
224         # if we experience any surprises (writes which were rejected because
225         # our test vector did not match, or shares which we didn't expect to
226         # see), we set this flag and report an UncoordinatedWriteError at the
227         # end of the publish process.
228         self.surprised = False
229
230         # we keep track of three tables. The first is our goal: which share
231         # we want to see on which servers. This is initially populated by the
232         # existing servermap.
233         self.goal = set() # pairs of (server, shnum) tuples
234
235         # the number of outstanding queries: those that are in flight and
236         # may or may not be delivered, accepted, or acknowledged. This is
237         # incremented when a query is sent, and decremented when the response
238         # returns or errbacks.
239         self.num_outstanding = 0
240
241         # the third is a table of successes: share which have actually been
242         # placed. These are populated when responses come back with success.
243         # When self.placed == self.goal, we're done.
244         self.placed = set() # (server, shnum) tuples
245
246         self.bad_share_checkstrings = {}
247
248         # This is set at the last step of the publishing process.
249         self.versioninfo = ""
250
251         # we use the servermap to populate the initial goal: this way we will
252         # try to update each existing share in place. Since we're
253         # updating, we ignore damaged and missing shares -- callers must
254         # do a repair to repair and recreate these.
255         self.goal = set(self._servermap.get_known_shares())
256         self.writers = {}
257
258         # SDMF files are updated differently.
259         self._version = MDMF_VERSION
260         writer_class = MDMFSlotWriteProxy
261
262         # For each (server, shnum) in self.goal, we make a
263         # write proxy for that server. We'll use this to write
264         # shares to the server.
265         for (server,shnum) in self.goal:
266             write_enabler = self._node.get_write_enabler(server)
267             renew_secret = self._node.get_renewal_secret(server)
268             cancel_secret = self._node.get_cancel_secret(server)
269             secrets = (write_enabler, renew_secret, cancel_secret)
270
271             writer = writer_class(shnum,
272                                   server.get_rref(),
273                                   self._storage_index,
274                                   secrets,
275                                   self._new_seqnum,
276                                   self.required_shares,
277                                   self.total_shares,
278                                   self.segment_size,
279                                   self.datalength)
280
281             self.writers.setdefault(shnum, []).append(writer)
282             writer.server = server
283             known_shares = self._servermap.get_known_shares()
284             assert (server, shnum) in known_shares
285             old_versionid, old_timestamp = known_shares[(server,shnum)]
286             (old_seqnum, old_root_hash, old_salt, old_segsize,
287              old_datalength, old_k, old_N, old_prefix,
288              old_offsets_tuple) = old_versionid
289             writer.set_checkstring(old_seqnum,
290                                    old_root_hash,
291                                    old_salt)
292
293         # Our remote shares will not have a complete checkstring until
294         # after we are done writing share data and have started to write
295         # blocks. In the meantime, we need to know what to look for when
296         # writing, so that we can detect UncoordinatedWriteErrors.
297         self._checkstring = self.writers.values()[0][0].get_checkstring()
298
299         # Now, we start pushing shares.
300         self._status.timings["setup"] = time.time() - self._started
301         # First, we encrypt, encode, and publish the shares that we need
302         # to encrypt, encode, and publish.
303
304         # Our update process fetched these for us. We need to update
305         # them in place as publishing happens.
306         self.blockhashes = {} # (shnum, [blochashes])
307         for (i, bht) in blockhashes.iteritems():
308             # We need to extract the leaves from our old hash tree.
309             old_segcount = mathutil.div_ceil(version[4],
310                                              version[3])
311             h = hashtree.IncompleteHashTree(old_segcount)
312             bht = dict(enumerate(bht))
313             h.set_hashes(bht)
314             leaves = h[h.get_leaf_index(0):]
315             for j in xrange(self.num_segments - len(leaves)):
316                 leaves.append(None)
317
318             assert len(leaves) >= self.num_segments
319             self.blockhashes[i] = leaves
320             # This list will now be the leaves that were set during the
321             # initial upload + enough empty hashes to make it a
322             # power-of-two. If we exceed a power of two boundary, we
323             # should be encoding the file over again, and should not be
324             # here. So, we have
325             #assert len(self.blockhashes[i]) == \
326             #    hashtree.roundup_pow2(self.num_segments), \
327             #        len(self.blockhashes[i])
328             # XXX: Except this doesn't work. Figure out why.
329
330         # These are filled in later, after we've modified the block hash
331         # tree suitably.
332         self.sharehash_leaves = None # eventually [sharehashes]
333         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
334                               # validate the share]
335
336         self.log("Starting push")
337
338         self._state = PUSHING_BLOCKS_STATE
339         self._push()
340
341         return self.done_deferred
342
343
344     def publish(self, newdata):
345         """Publish the filenode's current contents.  Returns a Deferred that
346         fires (with None) when the publish has done as much work as it's ever
347         going to do, or errbacks with ConsistencyError if it detects a
348         simultaneous write.
349         """
350
351         # 0. Setup encoding parameters, encoder, and other such things.
352         # 1. Encrypt, encode, and publish segments.
353         assert IMutableUploadable.providedBy(newdata)
354
355         self.data = newdata
356         self.datalength = newdata.get_size()
357         #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
358         #    self._version = MDMF_VERSION
359         #else:
360         #    self._version = SDMF_VERSION
361
362         self.log("starting publish, datalen is %s" % self.datalength)
363         self._status.set_size(self.datalength)
364         self._status.set_status("Started")
365         self._started = time.time()
366
367         self.done_deferred = defer.Deferred()
368
369         self._writekey = self._node.get_writekey()
370         assert self._writekey, "need write capability to publish"
371
372         # first, which servers will we publish to? We require that the
373         # servermap was updated in MODE_WRITE, so we can depend upon the
374         # serverlist computed by that process instead of computing our own.
375         if self._servermap:
376             assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
377             # we will push a version that is one larger than anything present
378             # in the grid, according to the servermap.
379             self._new_seqnum = self._servermap.highest_seqnum() + 1
380         else:
381             # If we don't have a servermap, that's because we're doing the
382             # initial publish
383             self._new_seqnum = 1
384             self._servermap = ServerMap()
385         self._status.set_servermap(self._servermap)
386
387         self.log(format="new seqnum will be %(seqnum)d",
388                  seqnum=self._new_seqnum, level=log.NOISY)
389
390         # having an up-to-date servermap (or using a filenode that was just
391         # created for the first time) also guarantees that the following
392         # fields are available
393         self.readkey = self._node.get_readkey()
394         self.required_shares = self._node.get_required_shares()
395         assert self.required_shares is not None
396         self.total_shares = self._node.get_total_shares()
397         assert self.total_shares is not None
398         self._status.set_encoding(self.required_shares, self.total_shares)
399
400         self._pubkey = self._node.get_pubkey()
401         assert self._pubkey
402         self._privkey = self._node.get_privkey()
403         assert self._privkey
404         self._encprivkey = self._node.get_encprivkey()
405
406         sb = self._storage_broker
407         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
408         self.full_serverlist = full_serverlist # for use later, immutable
409         self.bad_servers = set() # servers who have errbacked/refused requests
410
411         # This will set self.segment_size, self.num_segments, and
412         # self.fec.
413         self.setup_encoding_parameters()
414
415         # if we experience any surprises (writes which were rejected because
416         # our test vector did not match, or shares which we didn't expect to
417         # see), we set this flag and report an UncoordinatedWriteError at the
418         # end of the publish process.
419         self.surprised = False
420
421         # we keep track of three tables. The first is our goal: which share
422         # we want to see on which servers. This is initially populated by the
423         # existing servermap.
424         self.goal = set() # pairs of (server, shnum) tuples
425
426         # the number of outstanding queries: those that are in flight and
427         # may or may not be delivered, accepted, or acknowledged. This is
428         # incremented when a query is sent, and decremented when the response
429         # returns or errbacks.
430         self.num_outstanding = 0
431
432         # the third is a table of successes: share which have actually been
433         # placed. These are populated when responses come back with success.
434         # When self.placed == self.goal, we're done.
435         self.placed = set() # (server, shnum) tuples
436
437         self.bad_share_checkstrings = {}
438
439         # This is set at the last step of the publishing process.
440         self.versioninfo = ""
441
442         # we use the servermap to populate the initial goal: this way we will
443         # try to update each existing share in place.
444         self.goal = set(self._servermap.get_known_shares())
445
446         # then we add in all the shares that were bad (corrupted, bad
447         # signatures, etc). We want to replace these.
448         for key, old_checkstring in self._servermap.get_bad_shares().items():
449             (server, shnum) = key
450             self.goal.add( (server,shnum) )
451             self.bad_share_checkstrings[(server,shnum)] = old_checkstring
452
453         # TODO: Make this part do server selection.
454         self.update_goal()
455         self.writers = {}
456         if self._version == MDMF_VERSION:
457             writer_class = MDMFSlotWriteProxy
458         else:
459             writer_class = SDMFSlotWriteProxy
460
461         # For each (server, shnum) in self.goal, we make a
462         # write proxy for that server. We'll use this to write
463         # shares to the server.
464         for (server,shnum) in self.goal:
465             write_enabler = self._node.get_write_enabler(server)
466             renew_secret = self._node.get_renewal_secret(server)
467             cancel_secret = self._node.get_cancel_secret(server)
468             secrets = (write_enabler, renew_secret, cancel_secret)
469
470             writer =  writer_class(shnum,
471                                    server.get_rref(),
472                                    self._storage_index,
473                                    secrets,
474                                    self._new_seqnum,
475                                    self.required_shares,
476                                    self.total_shares,
477                                    self.segment_size,
478                                    self.datalength)
479             self.writers.setdefault(shnum, []).append(writer)
480             writer.server = server
481             known_shares = self._servermap.get_known_shares()
482             if (server, shnum) in known_shares:
483                 old_versionid, old_timestamp = known_shares[(server,shnum)]
484                 (old_seqnum, old_root_hash, old_salt, old_segsize,
485                  old_datalength, old_k, old_N, old_prefix,
486                  old_offsets_tuple) = old_versionid
487                 writer.set_checkstring(old_seqnum,
488                                        old_root_hash,
489                                        old_salt)
490             elif (server, shnum) in self.bad_share_checkstrings:
491                 old_checkstring = self.bad_share_checkstrings[(server, shnum)]
492                 writer.set_checkstring(old_checkstring)
493
494         # Our remote shares will not have a complete checkstring until
495         # after we are done writing share data and have started to write
496         # blocks. In the meantime, we need to know what to look for when
497         # writing, so that we can detect UncoordinatedWriteErrors.
498         self._checkstring = self.writers.values()[0][0].get_checkstring()
499
500         # Now, we start pushing shares.
501         self._status.timings["setup"] = time.time() - self._started
502         # First, we encrypt, encode, and publish the shares that we need
503         # to encrypt, encode, and publish.
504
505         # This will eventually hold the block hash chain for each share
506         # that we publish. We define it this way so that empty publishes
507         # will still have something to write to the remote slot.
508         self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
509         for i in xrange(self.total_shares):
510             blocks = self.blockhashes[i]
511             for j in xrange(self.num_segments):
512                 blocks.append(None)
513         self.sharehash_leaves = None # eventually [sharehashes]
514         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
515                               # validate the share]
516
517         self.log("Starting push")
518
519         self._state = PUSHING_BLOCKS_STATE
520         self._push()
521
522         return self.done_deferred
523
524
525     def _update_status(self):
526         self._status.set_status("Sending Shares: %d placed out of %d, "
527                                 "%d messages outstanding" %
528                                 (len(self.placed),
529                                  len(self.goal),
530                                  self.num_outstanding))
531         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
532
533
534     def setup_encoding_parameters(self, offset=0):
535         if self._version == MDMF_VERSION:
536             segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
537         else:
538             segment_size = self.datalength # SDMF is only one segment
539         # this must be a multiple of self.required_shares
540         segment_size = mathutil.next_multiple(segment_size,
541                                               self.required_shares)
542         self.segment_size = segment_size
543
544         # Calculate the starting segment for the upload.
545         if segment_size:
546             # We use div_ceil instead of integer division here because
547             # it is semantically correct.
548             # If datalength isn't an even multiple of segment_size, but
549             # is larger than segment_size, datalength // segment_size
550             # will be the largest number such that num <= datalength and
551             # num % segment_size == 0. But that's not what we want,
552             # because it ignores the extra data. div_ceil will give us
553             # the right number of segments for the data that we're
554             # given.
555             self.num_segments = mathutil.div_ceil(self.datalength,
556                                                   segment_size)
557
558             self.starting_segment = offset // segment_size
559
560         else:
561             self.num_segments = 0
562             self.starting_segment = 0
563
564
565         self.log("building encoding parameters for file")
566         self.log("got segsize %d" % self.segment_size)
567         self.log("got %d segments" % self.num_segments)
568
569         if self._version == SDMF_VERSION:
570             assert self.num_segments in (0, 1) # SDMF
571         # calculate the tail segment size.
572
573         if segment_size and self.datalength:
574             self.tail_segment_size = self.datalength % segment_size
575             self.log("got tail segment size %d" % self.tail_segment_size)
576         else:
577             self.tail_segment_size = 0
578
579         if self.tail_segment_size == 0 and segment_size:
580             # The tail segment is the same size as the other segments.
581             self.tail_segment_size = segment_size
582
583         # Make FEC encoders
584         fec = codec.CRSEncoder()
585         fec.set_params(self.segment_size,
586                        self.required_shares, self.total_shares)
587         self.piece_size = fec.get_block_size()
588         self.fec = fec
589
590         if self.tail_segment_size == self.segment_size:
591             self.tail_fec = self.fec
592         else:
593             tail_fec = codec.CRSEncoder()
594             tail_fec.set_params(self.tail_segment_size,
595                                 self.required_shares,
596                                 self.total_shares)
597             self.tail_fec = tail_fec
598
599         self._current_segment = self.starting_segment
600         self.end_segment = self.num_segments - 1
601         # Now figure out where the last segment should be.
602         if self.data.get_size() != self.datalength:
603             # We're updating a few segments in the middle of a mutable
604             # file, so we don't want to republish the whole thing.
605             # (we don't have enough data to do that even if we wanted
606             # to)
607             end = self.data.get_size()
608             self.end_segment = end // segment_size
609             if end % segment_size == 0:
610                 self.end_segment -= 1
611
612         self.log("got start segment %d" % self.starting_segment)
613         self.log("got end segment %d" % self.end_segment)
614
615
616     def _push(self, ignored=None):
617         """
618         I manage state transitions. In particular, I see that we still
619         have a good enough number of writers to complete the upload
620         successfully.
621         """
622         # Can we still successfully publish this file?
623         # TODO: Keep track of outstanding queries before aborting the
624         #       process.
625         all_shnums = filter(lambda sh: len(self.writers[sh]) > 0,
626                             self.writers.iterkeys())
627         if len(all_shnums) < self.required_shares or self.surprised:
628             return self._failure()
629
630         # Figure out what we need to do next. Each of these needs to
631         # return a deferred so that we don't block execution when this
632         # is first called in the upload method.
633         if self._state == PUSHING_BLOCKS_STATE:
634             return self.push_segment(self._current_segment)
635
636         elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
637             return self.push_everything_else()
638
639         # If we make it to this point, we were successful in placing the
640         # file.
641         return self._done()
642
643
644     def push_segment(self, segnum):
645         if self.num_segments == 0 and self._version == SDMF_VERSION:
646             self._add_dummy_salts()
647
648         if segnum > self.end_segment:
649             # We don't have any more segments to push.
650             self._state = PUSHING_EVERYTHING_ELSE_STATE
651             return self._push()
652
653         d = self._encode_segment(segnum)
654         d.addCallback(self._push_segment, segnum)
655         def _increment_segnum(ign):
656             self._current_segment += 1
657         # XXX: I don't think we need to do addBoth here -- any errBacks
658         # should be handled within push_segment.
659         d.addCallback(_increment_segnum)
660         d.addCallback(self._turn_barrier)
661         d.addCallback(self._push)
662         d.addErrback(self._failure)
663
664
665     def _turn_barrier(self, result):
666         """
667         I help the publish process avoid the recursion limit issues
668         described in #237.
669         """
670         return fireEventually(result)
671
672
673     def _add_dummy_salts(self):
674         """
675         SDMF files need a salt even if they're empty, or the signature
676         won't make sense. This method adds a dummy salt to each of our
677         SDMF writers so that they can write the signature later.
678         """
679         salt = os.urandom(16)
680         assert self._version == SDMF_VERSION
681
682         for shnum, writers in self.writers.iteritems():
683             for writer in writers:
684                 writer.put_salt(salt)
685
686
687     def _encode_segment(self, segnum):
688         """
689         I encrypt and encode the segment segnum.
690         """
691         started = time.time()
692
693         if segnum + 1 == self.num_segments:
694             segsize = self.tail_segment_size
695         else:
696             segsize = self.segment_size
697
698
699         self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
700         data = self.data.read(segsize)
701         # XXX: This is dumb. Why return a list?
702         data = "".join(data)
703
704         assert len(data) == segsize, len(data)
705
706         salt = os.urandom(16)
707
708         key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
709         self._status.set_status("Encrypting")
710         enc = AES(key)
711         crypttext = enc.process(data)
712         assert len(crypttext) == len(data)
713
714         now = time.time()
715         self._status.accumulate_encrypt_time(now - started)
716         started = now
717
718         # now apply FEC
719         if segnum + 1 == self.num_segments:
720             fec = self.tail_fec
721         else:
722             fec = self.fec
723
724         self._status.set_status("Encoding")
725         crypttext_pieces = [None] * self.required_shares
726         piece_size = fec.get_block_size()
727         for i in range(len(crypttext_pieces)):
728             offset = i * piece_size
729             piece = crypttext[offset:offset+piece_size]
730             piece = piece + "\x00"*(piece_size - len(piece)) # padding
731             crypttext_pieces[i] = piece
732             assert len(piece) == piece_size
733         d = fec.encode(crypttext_pieces)
734         def _done_encoding(res):
735             elapsed = time.time() - started
736             self._status.accumulate_encode_time(elapsed)
737             return (res, salt)
738         d.addCallback(_done_encoding)
739         return d
740
741
742     def _push_segment(self, encoded_and_salt, segnum):
743         """
744         I push (data, salt) as segment number segnum.
745         """
746         results, salt = encoded_and_salt
747         shares, shareids = results
748         self._status.set_status("Pushing segment")
749         for i in xrange(len(shares)):
750             sharedata = shares[i]
751             shareid = shareids[i]
752             if self._version == MDMF_VERSION:
753                 hashed = salt + sharedata
754             else:
755                 hashed = sharedata
756             block_hash = hashutil.block_hash(hashed)
757             self.blockhashes[shareid][segnum] = block_hash
758             # find the writer for this share
759             writers = self.writers[shareid]
760             for writer in writers:
761                 writer.put_block(sharedata, segnum, salt)
762
763
764     def push_everything_else(self):
765         """
766         I put everything else associated with a share.
767         """
768         self._pack_started = time.time()
769         self.push_encprivkey()
770         self.push_blockhashes()
771         self.push_sharehashes()
772         self.push_toplevel_hashes_and_signature()
773         d = self.finish_publishing()
774         def _change_state(ignored):
775             self._state = DONE_STATE
776         d.addCallback(_change_state)
777         d.addCallback(self._push)
778         return d
779
780
781     def push_encprivkey(self):
782         encprivkey = self._encprivkey
783         self._status.set_status("Pushing encrypted private key")
784         for shnum, writers in self.writers.iteritems():
785             for writer in writers:
786                 writer.put_encprivkey(encprivkey)
787
788
789     def push_blockhashes(self):
790         self.sharehash_leaves = [None] * len(self.blockhashes)
791         self._status.set_status("Building and pushing block hash tree")
792         for shnum, blockhashes in self.blockhashes.iteritems():
793             t = hashtree.HashTree(blockhashes)
794             self.blockhashes[shnum] = list(t)
795             # set the leaf for future use.
796             self.sharehash_leaves[shnum] = t[0]
797
798             writers = self.writers[shnum]
799             for writer in writers:
800                 writer.put_blockhashes(self.blockhashes[shnum])
801
802
803     def push_sharehashes(self):
804         self._status.set_status("Building and pushing share hash chain")
805         share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
806         for shnum in xrange(len(self.sharehash_leaves)):
807             needed_indices = share_hash_tree.needed_hashes(shnum)
808             self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
809                                              for i in needed_indices] )
810             writers = self.writers[shnum]
811             for writer in writers:
812                 writer.put_sharehashes(self.sharehashes[shnum])
813         self.root_hash = share_hash_tree[0]
814
815
816     def push_toplevel_hashes_and_signature(self):
817         # We need to to three things here:
818         #   - Push the root hash and salt hash
819         #   - Get the checkstring of the resulting layout; sign that.
820         #   - Push the signature
821         self._status.set_status("Pushing root hashes and signature")
822         for shnum in xrange(self.total_shares):
823             writers = self.writers[shnum]
824             for writer in writers:
825                 writer.put_root_hash(self.root_hash)
826         self._update_checkstring()
827         self._make_and_place_signature()
828
829
830     def _update_checkstring(self):
831         """
832         After putting the root hash, MDMF files will have the
833         checkstring written to the storage server. This means that we
834         can update our copy of the checkstring so we can detect
835         uncoordinated writes. SDMF files will have the same checkstring,
836         so we need not do anything.
837         """
838         self._checkstring = self.writers.values()[0][0].get_checkstring()
839
840
841     def _make_and_place_signature(self):
842         """
843         I create and place the signature.
844         """
845         started = time.time()
846         self._status.set_status("Signing prefix")
847         signable = self.writers.values()[0][0].get_signable()
848         self.signature = self._privkey.sign(signable)
849
850         for (shnum, writers) in self.writers.iteritems():
851             for writer in writers:
852                 writer.put_signature(self.signature)
853         self._status.timings['sign'] = time.time() - started
854
855
856     def finish_publishing(self):
857         # We're almost done -- we just need to put the verification key
858         # and the offsets
859         started = time.time()
860         self._status.set_status("Pushing shares")
861         self._started_pushing = started
862         ds = []
863         verification_key = self._pubkey.serialize()
864
865         for (shnum, writers) in self.writers.copy().iteritems():
866             for writer in writers:
867                 writer.put_verification_key(verification_key)
868                 self.num_outstanding += 1
869                 def _no_longer_outstanding(res):
870                     self.num_outstanding -= 1
871                     return res
872
873                 d = writer.finish_publishing()
874                 d.addBoth(_no_longer_outstanding)
875                 d.addErrback(self._connection_problem, writer)
876                 d.addCallback(self._got_write_answer, writer, started)
877                 ds.append(d)
878         self._record_verinfo()
879         self._status.timings['pack'] = time.time() - started
880         return defer.DeferredList(ds)
881
882
883     def _record_verinfo(self):
884         self.versioninfo = self.writers.values()[0][0].get_verinfo()
885
886
887     def _connection_problem(self, f, writer):
888         """
889         We ran into a connection problem while working with writer, and
890         need to deal with that.
891         """
892         self.log("found problem: %s" % str(f))
893         self._last_failure = f
894         self.writers[writer.shnum].remove(writer)
895
896
897     def log_goal(self, goal, message=""):
898         logmsg = [message]
899         for (shnum, server) in sorted([(s,p) for (p,s) in goal]):
900             logmsg.append("sh%d to [%s]" % (shnum, server.get_name()))
901         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
902         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
903                  level=log.NOISY)
904
905     def update_goal(self):
906         # if log.recording_noisy
907         if True:
908             self.log_goal(self.goal, "before update: ")
909
910         # first, remove any bad servers from our goal
911         self.goal = set([ (server, shnum)
912                           for (server, shnum) in self.goal
913                           if server not in self.bad_servers ])
914
915         # find the homeless shares:
916         homefull_shares = set([shnum for (server, shnum) in self.goal])
917         homeless_shares = set(range(self.total_shares)) - homefull_shares
918         homeless_shares = sorted(list(homeless_shares))
919         # place them somewhere. We prefer unused servers at the beginning of
920         # the available server list.
921
922         if not homeless_shares:
923             return
924
925         # if an old share X is on a node, put the new share X there too.
926         # TODO: 1: redistribute shares to achieve one-per-server, by copying
927         #       shares from existing servers to new (less-crowded) ones. The
928         #       old shares must still be updated.
929         # TODO: 2: move those shares instead of copying them, to reduce future
930         #       update work
931
932         # this is a bit CPU intensive but easy to analyze. We create a sort
933         # order for each server. If the server is marked as bad, we don't
934         # even put them in the list. Then we care about the number of shares
935         # which have already been assigned to them. After that we care about
936         # their permutation order.
937         old_assignments = DictOfSets()
938         for (server, shnum) in self.goal:
939             old_assignments.add(server, shnum)
940
941         serverlist = []
942         for i, server in enumerate(self.full_serverlist):
943             serverid = server.get_serverid()
944             if server in self.bad_servers:
945                 continue
946             entry = (len(old_assignments.get(server, [])), i, serverid, server)
947             serverlist.append(entry)
948         serverlist.sort()
949
950         if not serverlist:
951             raise NotEnoughServersError("Ran out of non-bad servers, "
952                                         "first_error=%s" %
953                                         str(self._first_write_error),
954                                         self._first_write_error)
955
956         # we then index this serverlist with an integer, because we may have
957         # to wrap. We update the goal as we go.
958         i = 0
959         for shnum in homeless_shares:
960             (ignored1, ignored2, ignored3, server) = serverlist[i]
961             # if we are forced to send a share to a server that already has
962             # one, we may have two write requests in flight, and the
963             # servermap (which was computed before either request was sent)
964             # won't reflect the new shares, so the second response will be
965             # surprising. There is code in _got_write_answer() to tolerate
966             # this, otherwise it would cause the publish to fail with an
967             # UncoordinatedWriteError. See #546 for details of the trouble
968             # this used to cause.
969             self.goal.add( (server, shnum) )
970             i += 1
971             if i >= len(serverlist):
972                 i = 0
973         if True:
974             self.log_goal(self.goal, "after update: ")
975
976
977     def _got_write_answer(self, answer, writer, started):
978         if not answer:
979             # SDMF writers only pretend to write when readers set their
980             # blocks, salts, and so on -- they actually just write once,
981             # at the end of the upload process. In fake writes, they
982             # return defer.succeed(None). If we see that, we shouldn't
983             # bother checking it.
984             return
985
986         server = writer.server
987         lp = self.log("_got_write_answer from %s, share %d" %
988                       (server.get_name(), writer.shnum))
989
990         now = time.time()
991         elapsed = now - started
992
993         self._status.add_per_server_time(server, elapsed)
994
995         wrote, read_data = answer
996
997         surprise_shares = set(read_data.keys()) - set([writer.shnum])
998
999         # We need to remove from surprise_shares any shares that we are
1000         # knowingly also writing to that server from other writers.
1001
1002         # TODO: Precompute this.
1003         shares = []
1004         for shnum, writers in self.writers.iteritems():
1005             shares.extend([x.shnum for x in writers if x.server == server])
1006         known_shnums = set(shares)
1007         surprise_shares -= known_shnums
1008         self.log("found the following surprise shares: %s" %
1009                  str(surprise_shares))
1010
1011         # Now surprise shares contains all of the shares that we did not
1012         # expect to be there.
1013
1014         surprised = False
1015         for shnum in surprise_shares:
1016             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1017             checkstring = read_data[shnum][0]
1018             # What we want to do here is to see if their (seqnum,
1019             # roothash, salt) is the same as our (seqnum, roothash,
1020             # salt), or the equivalent for MDMF. The best way to do this
1021             # is to store a packed representation of our checkstring
1022             # somewhere, then not bother unpacking the other
1023             # checkstring.
1024             if checkstring == self._checkstring:
1025                 # they have the right share, somehow
1026
1027                 if (server,shnum) in self.goal:
1028                     # and we want them to have it, so we probably sent them a
1029                     # copy in an earlier write. This is ok, and avoids the
1030                     # #546 problem.
1031                     continue
1032
1033                 # They aren't in our goal, but they are still for the right
1034                 # version. Somebody else wrote them, and it's a convergent
1035                 # uncoordinated write. Pretend this is ok (don't be
1036                 # surprised), since I suspect there's a decent chance that
1037                 # we'll hit this in normal operation.
1038                 continue
1039
1040             else:
1041                 # the new shares are of a different version
1042                 if server in self._servermap.get_reachable_servers():
1043                     # we asked them about their shares, so we had knowledge
1044                     # of what they used to have. Any surprising shares must
1045                     # have come from someone else, so UCW.
1046                     surprised = True
1047                 else:
1048                     # we didn't ask them, and now we've discovered that they
1049                     # have a share we didn't know about. This indicates that
1050                     # mapupdate should have wokred harder and asked more
1051                     # servers before concluding that it knew about them all.
1052
1053                     # signal UCW, but make sure to ask this server next time,
1054                     # so we'll remember to update it if/when we retry.
1055                     surprised = True
1056                     # TODO: ask this server next time. I don't yet have a good
1057                     # way to do this. Two insufficient possibilities are:
1058                     #
1059                     # self._servermap.add_new_share(server, shnum, verinfo, now)
1060                     #  but that requires fetching/validating/parsing the whole
1061                     #  version string, and all we have is the checkstring
1062                     # self._servermap.mark_bad_share(server, shnum, checkstring)
1063                     #  that will make publish overwrite the share next time,
1064                     #  but it won't re-query the server, and it won't make
1065                     #  mapupdate search further
1066
1067                     # TODO later: when publish starts, do
1068                     # servermap.get_best_version(), extract the seqnum,
1069                     # subtract one, and store as highest-replaceable-seqnum.
1070                     # Then, if this surprise-because-we-didn't-ask share is
1071                     # of highest-replaceable-seqnum or lower, we're allowed
1072                     # to replace it: send out a new writev (or rather add it
1073                     # to self.goal and loop).
1074                     pass
1075
1076                 surprised = True
1077
1078         if surprised:
1079             self.log("they had shares %s that we didn't know about" %
1080                      (list(surprise_shares),),
1081                      parent=lp, level=log.WEIRD, umid="un9CSQ")
1082             self.surprised = True
1083
1084         if not wrote:
1085             # TODO: there are two possibilities. The first is that the server
1086             # is full (or just doesn't want to give us any room), which means
1087             # we shouldn't ask them again, but is *not* an indication of an
1088             # uncoordinated write. The second is that our testv failed, which
1089             # *does* indicate an uncoordinated write. We currently don't have
1090             # a way to tell these two apart (in fact, the storage server code
1091             # doesn't have the option of refusing our share).
1092             #
1093             # If the server is full, mark the server as bad (so we don't ask
1094             # them again), but don't set self.surprised. The loop() will find
1095             # a new server.
1096             #
1097             # If the testv failed, log it, set self.surprised, but don't
1098             # bother adding to self.bad_servers .
1099
1100             self.log("our testv failed, so the write did not happen",
1101                      parent=lp, level=log.WEIRD, umid="8sc26g")
1102             self.surprised = True
1103             self.bad_servers.add(server) # don't ask them again
1104             # use the checkstring to add information to the log message
1105             unknown_format = False
1106             for (shnum,readv) in read_data.items():
1107                 checkstring = readv[0]
1108                 version = get_version_from_checkstring(checkstring)
1109                 if version == MDMF_VERSION:
1110                     (other_seqnum,
1111                      other_roothash) = unpack_mdmf_checkstring(checkstring)
1112                 elif version == SDMF_VERSION:
1113                     (other_seqnum,
1114                      other_roothash,
1115                      other_IV) = unpack_sdmf_checkstring(checkstring)
1116                 else:
1117                     unknown_format = True
1118                 expected_version = self._servermap.version_on_server(server,
1119                                                                      shnum)
1120                 if expected_version:
1121                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1122                      offsets_tuple) = expected_version
1123                     msg = ("somebody modified the share on us:"
1124                            " shnum=%d: I thought they had #%d:R=%s," %
1125                            (shnum,
1126                             seqnum, base32.b2a(root_hash)[:4]))
1127                     if unknown_format:
1128                         msg += (" but I don't know how to read share"
1129                                 " format %d" % version)
1130                     else:
1131                         msg += " but testv reported #%d:R=%s" % \
1132                                (other_seqnum, other_roothash)
1133                     self.log(msg, parent=lp, level=log.NOISY)
1134                 # if expected_version==None, then we didn't expect to see a
1135                 # share on that server, and the 'surprise_shares' clause
1136                 # above will have logged it.
1137             return
1138
1139         # and update the servermap
1140         # self.versioninfo is set during the last phase of publishing.
1141         # If we get there, we know that responses correspond to placed
1142         # shares, and can safely execute these statements.
1143         if self.versioninfo:
1144             self.log("wrote successfully: adding new share to servermap")
1145             self._servermap.add_new_share(server, writer.shnum,
1146                                           self.versioninfo, started)
1147             self.placed.add( (server, writer.shnum) )
1148         self._update_status()
1149         # the next method in the deferred chain will check to see if
1150         # we're done and successful.
1151         return
1152
1153
1154     def _done(self):
1155         if not self._running:
1156             return
1157         self._running = False
1158         now = time.time()
1159         self._status.timings["total"] = now - self._started
1160
1161         elapsed = now - self._started_pushing
1162         self._status.timings['push'] = elapsed
1163
1164         self._status.set_active(False)
1165         self.log("Publish done, success")
1166         self._status.set_status("Finished")
1167         self._status.set_progress(1.0)
1168         # Get k and segsize, then give them to the caller.
1169         hints = {}
1170         hints['segsize'] = self.segment_size
1171         hints['k'] = self.required_shares
1172         self._node.set_downloader_hints(hints)
1173         eventually(self.done_deferred.callback, None)
1174
1175     def _failure(self, f=None):
1176         if f:
1177             self._last_failure = f
1178
1179         if not self.surprised:
1180             # We ran out of servers
1181             msg = "Publish ran out of good servers"
1182             if self._last_failure:
1183                 msg += ", last failure was: %s" % str(self._last_failure)
1184             self.log(msg)
1185             e = NotEnoughServersError(msg)
1186
1187         else:
1188             # We ran into shares that we didn't recognize, which means
1189             # that we need to return an UncoordinatedWriteError.
1190             self.log("Publish failed with UncoordinatedWriteError")
1191             e = UncoordinatedWriteError()
1192         f = failure.Failure(e)
1193         eventually(self.done_deferred.callback, f)
1194
1195
1196 class MutableFileHandle:
1197     """
1198     I am a mutable uploadable built around a filehandle-like object,
1199     usually either a StringIO instance or a handle to an actual file.
1200     """
1201     implements(IMutableUploadable)
1202
1203     def __init__(self, filehandle):
1204         # The filehandle is defined as a generally file-like object that
1205         # has these two methods. We don't care beyond that.
1206         assert hasattr(filehandle, "read")
1207         assert hasattr(filehandle, "close")
1208
1209         self._filehandle = filehandle
1210         # We must start reading at the beginning of the file, or we risk
1211         # encountering errors when the data read does not match the size
1212         # reported to the uploader.
1213         self._filehandle.seek(0)
1214
1215         # We have not yet read anything, so our position is 0.
1216         self._marker = 0
1217
1218
1219     def get_size(self):
1220         """
1221         I return the amount of data in my filehandle.
1222         """
1223         if not hasattr(self, "_size"):
1224             old_position = self._filehandle.tell()
1225             # Seek to the end of the file by seeking 0 bytes from the
1226             # file's end
1227             self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1228             self._size = self._filehandle.tell()
1229             # Restore the previous position, in case this was called
1230             # after a read.
1231             self._filehandle.seek(old_position)
1232             assert self._filehandle.tell() == old_position
1233
1234         assert hasattr(self, "_size")
1235         return self._size
1236
1237
1238     def pos(self):
1239         """
1240         I return the position of my read marker -- i.e., how much data I
1241         have already read and returned to callers.
1242         """
1243         return self._marker
1244
1245
1246     def read(self, length):
1247         """
1248         I return some data (up to length bytes) from my filehandle.
1249
1250         In most cases, I return length bytes, but sometimes I won't --
1251         for example, if I am asked to read beyond the end of a file, or
1252         an error occurs.
1253         """
1254         results = self._filehandle.read(length)
1255         self._marker += len(results)
1256         return [results]
1257
1258
1259     def close(self):
1260         """
1261         I close the underlying filehandle. Any further operations on the
1262         filehandle fail at this point.
1263         """
1264         self._filehandle.close()
1265
1266
1267 class MutableData(MutableFileHandle):
1268     """
1269     I am a mutable uploadable built around a string, which I then cast
1270     into a StringIO and treat as a filehandle.
1271     """
1272
1273     def __init__(self, s):
1274         # Take a string and return a file-like uploadable.
1275         assert isinstance(s, str)
1276
1277         MutableFileHandle.__init__(self, StringIO(s))
1278
1279
1280 class TransformingUploadable:
1281     """
1282     I am an IMutableUploadable that wraps another IMutableUploadable,
1283     and some segments that are already on the grid. When I am called to
1284     read, I handle merging of boundary segments.
1285     """
1286     implements(IMutableUploadable)
1287
1288
1289     def __init__(self, data, offset, segment_size, start, end):
1290         assert IMutableUploadable.providedBy(data)
1291
1292         self._newdata = data
1293         self._offset = offset
1294         self._segment_size = segment_size
1295         self._start = start
1296         self._end = end
1297
1298         self._read_marker = 0
1299
1300         self._first_segment_offset = offset % segment_size
1301
1302         num = self.log("TransformingUploadable: starting", parent=None)
1303         self._log_number = num
1304         self.log("got fso: %d" % self._first_segment_offset)
1305         self.log("got offset: %d" % self._offset)
1306
1307
1308     def log(self, *args, **kwargs):
1309         if 'parent' not in kwargs:
1310             kwargs['parent'] = self._log_number
1311         if "facility" not in kwargs:
1312             kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1313         return log.msg(*args, **kwargs)
1314
1315
1316     def get_size(self):
1317         return self._offset + self._newdata.get_size()
1318
1319
1320     def read(self, length):
1321         # We can get data from 3 sources here. 
1322         #   1. The first of the segments provided to us.
1323         #   2. The data that we're replacing things with.
1324         #   3. The last of the segments provided to us.
1325
1326         # are we in state 0?
1327         self.log("reading %d bytes" % length)
1328
1329         old_start_data = ""
1330         old_data_length = self._first_segment_offset - self._read_marker
1331         if old_data_length > 0:
1332             if old_data_length > length:
1333                 old_data_length = length
1334             self.log("returning %d bytes of old start data" % old_data_length)
1335
1336             old_data_end = old_data_length + self._read_marker
1337             old_start_data = self._start[self._read_marker:old_data_end]
1338             length -= old_data_length
1339         else:
1340             # otherwise calculations later get screwed up.
1341             old_data_length = 0
1342
1343         # Is there enough new data to satisfy this read? If not, we need
1344         # to pad the end of the data with data from our last segment.
1345         old_end_length = length - \
1346             (self._newdata.get_size() - self._newdata.pos())
1347         old_end_data = ""
1348         if old_end_length > 0:
1349             self.log("reading %d bytes of old end data" % old_end_length)
1350
1351             # TODO: We're not explicitly checking for tail segment size
1352             # here. Is that a problem?
1353             old_data_offset = (length - old_end_length + \
1354                                old_data_length) % self._segment_size
1355             self.log("reading at offset %d" % old_data_offset)
1356             old_end = old_data_offset + old_end_length
1357             old_end_data = self._end[old_data_offset:old_end]
1358             length -= old_end_length
1359             assert length == self._newdata.get_size() - self._newdata.pos()
1360
1361         self.log("reading %d bytes of new data" % length)
1362         new_data = self._newdata.read(length)
1363         new_data = "".join(new_data)
1364
1365         self._read_marker += len(old_start_data + new_data + old_end_data)
1366
1367         return old_start_data + new_data + old_end_data
1368
1369     def close(self):
1370         pass