]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
Remove the whitespace reported by find-trailing-spaces. No code changes.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, time
4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
10                                  IMutableUploadable
11 from allmydata.util import base32, hashutil, mathutil, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
17
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
19      UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22                                      unpack_mdmf_checkstring, \
23                                      unpack_sdmf_checkstring, \
24                                      MDMFSlotWriteProxy, \
25                                      SDMFSlotWriteProxy
26
27 KiB = 1024
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
31 DONE_STATE = 2
32
33 class PublishStatus:
34     implements(IPublishStatus)
35     statusid_counter = count(0)
36     def __init__(self):
37         self.timings = {}
38         self.timings["send_per_server"] = {}
39         self.timings["encrypt"] = 0.0
40         self.timings["encode"] = 0.0
41         self.servermap = None
42         self._problems = {}
43         self.active = True
44         self.storage_index = None
45         self.helper = False
46         self.encoding = ("?", "?")
47         self.size = None
48         self.status = "Not started"
49         self.progress = 0.0
50         self.counter = self.statusid_counter.next()
51         self.started = time.time()
52
53     def add_per_server_time(self, server, elapsed):
54         if server not in self.timings["send_per_server"]:
55             self.timings["send_per_server"][server] = []
56         self.timings["send_per_server"][server].append(elapsed)
57     def accumulate_encode_time(self, elapsed):
58         self.timings["encode"] += elapsed
59     def accumulate_encrypt_time(self, elapsed):
60         self.timings["encrypt"] += elapsed
61
62     def get_started(self):
63         return self.started
64     def get_storage_index(self):
65         return self.storage_index
66     def get_encoding(self):
67         return self.encoding
68     def using_helper(self):
69         return self.helper
70     def get_servermap(self):
71         return self.servermap
72     def get_size(self):
73         return self.size
74     def get_status(self):
75         return self.status
76     def get_progress(self):
77         return self.progress
78     def get_active(self):
79         return self.active
80     def get_counter(self):
81         return self.counter
82     def get_problems(self):
83         return self._problems
84
85     def set_storage_index(self, si):
86         self.storage_index = si
87     def set_helper(self, helper):
88         self.helper = helper
89     def set_servermap(self, servermap):
90         self.servermap = servermap
91     def set_encoding(self, k, n):
92         self.encoding = (k, n)
93     def set_size(self, size):
94         self.size = size
95     def set_status(self, status):
96         self.status = status
97     def set_progress(self, value):
98         self.progress = value
99     def set_active(self, value):
100         self.active = value
101
102 class LoopLimitExceededError(Exception):
103     pass
104
105 class Publish:
106     """I represent a single act of publishing the mutable file to the grid. I
107     will only publish my data if the servermap I am using still represents
108     the current state of the world.
109
110     To make the initial publish, set servermap to None.
111     """
112
113     def __init__(self, filenode, storage_broker, servermap):
114         self._node = filenode
115         self._storage_broker = storage_broker
116         self._servermap = servermap
117         self._storage_index = self._node.get_storage_index()
118         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
119         num = self.log("Publish(%s): starting" % prefix, parent=None)
120         self._log_number = num
121         self._running = True
122         self._first_write_error = None
123         self._last_failure = None
124
125         self._status = PublishStatus()
126         self._status.set_storage_index(self._storage_index)
127         self._status.set_helper(False)
128         self._status.set_progress(0.0)
129         self._status.set_active(True)
130         self._version = self._node.get_version()
131         assert self._version in (SDMF_VERSION, MDMF_VERSION)
132
133
134     def get_status(self):
135         return self._status
136
137     def log(self, *args, **kwargs):
138         if 'parent' not in kwargs:
139             kwargs['parent'] = self._log_number
140         if "facility" not in kwargs:
141             kwargs["facility"] = "tahoe.mutable.publish"
142         return log.msg(*args, **kwargs)
143
144
145     def update(self, data, offset, blockhashes, version):
146         """
147         I replace the contents of this file with the contents of data,
148         starting at offset. I return a Deferred that fires with None
149         when the replacement has been completed, or with an error if
150         something went wrong during the process.
151
152         Note that this process will not upload new shares. If the file
153         being updated is in need of repair, callers will have to repair
154         it on their own.
155         """
156         # How this works:
157         # 1: Make server assignments. We'll assign each share that we know
158         # about on the grid to that server that currently holds that
159         # share, and will not place any new shares.
160         # 2: Setup encoding parameters. Most of these will stay the same
161         # -- datalength will change, as will some of the offsets.
162         # 3. Upload the new segments.
163         # 4. Be done.
164         assert IMutableUploadable.providedBy(data)
165
166         self.data = data
167
168         # XXX: Use the MutableFileVersion instead.
169         self.datalength = self._node.get_size()
170         if data.get_size() > self.datalength:
171             self.datalength = data.get_size()
172
173         self.log("starting update")
174         self.log("adding new data of length %d at offset %d" % \
175                     (data.get_size(), offset))
176         self.log("new data length is %d" % self.datalength)
177         self._status.set_size(self.datalength)
178         self._status.set_status("Started")
179         self._started = time.time()
180
181         self.done_deferred = defer.Deferred()
182
183         self._writekey = self._node.get_writekey()
184         assert self._writekey, "need write capability to publish"
185
186         # first, which servers will we publish to? We require that the
187         # servermap was updated in MODE_WRITE, so we can depend upon the
188         # serverlist computed by that process instead of computing our own.
189         assert self._servermap
190         assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
191         # we will push a version that is one larger than anything present
192         # in the grid, according to the servermap.
193         self._new_seqnum = self._servermap.highest_seqnum() + 1
194         self._status.set_servermap(self._servermap)
195
196         self.log(format="new seqnum will be %(seqnum)d",
197                  seqnum=self._new_seqnum, level=log.NOISY)
198
199         # We're updating an existing file, so all of the following
200         # should be available.
201         self.readkey = self._node.get_readkey()
202         self.required_shares = self._node.get_required_shares()
203         assert self.required_shares is not None
204         self.total_shares = self._node.get_total_shares()
205         assert self.total_shares is not None
206         self._status.set_encoding(self.required_shares, self.total_shares)
207
208         self._pubkey = self._node.get_pubkey()
209         assert self._pubkey
210         self._privkey = self._node.get_privkey()
211         assert self._privkey
212         self._encprivkey = self._node.get_encprivkey()
213
214         sb = self._storage_broker
215         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
216         self.full_serverlist = full_serverlist # for use later, immutable
217         self.bad_servers = set() # servers who have errbacked/refused requests
218
219         # This will set self.segment_size, self.num_segments, and
220         # self.fec. TODO: Does it know how to do the offset? Probably
221         # not. So do that part next.
222         self.setup_encoding_parameters(offset=offset)
223
224         # if we experience any surprises (writes which were rejected because
225         # our test vector did not match, or shares which we didn't expect to
226         # see), we set this flag and report an UncoordinatedWriteError at the
227         # end of the publish process.
228         self.surprised = False
229
230         # we keep track of three tables. The first is our goal: which share
231         # we want to see on which servers. This is initially populated by the
232         # existing servermap.
233         self.goal = set() # pairs of (server, shnum) tuples
234
235         # the number of outstanding queries: those that are in flight and
236         # may or may not be delivered, accepted, or acknowledged. This is
237         # incremented when a query is sent, and decremented when the response
238         # returns or errbacks.
239         self.num_outstanding = 0
240
241         # the third is a table of successes: share which have actually been
242         # placed. These are populated when responses come back with success.
243         # When self.placed == self.goal, we're done.
244         self.placed = set() # (server, shnum) tuples
245
246         self.bad_share_checkstrings = {}
247
248         # This is set at the last step of the publishing process.
249         self.versioninfo = ""
250
251         # we use the servermap to populate the initial goal: this way we will
252         # try to update each existing share in place. Since we're
253         # updating, we ignore damaged and missing shares -- callers must
254         # do a repair to repair and recreate these.
255         self.goal = set(self._servermap.get_known_shares())
256
257         # shnum -> set of IMutableSlotWriter
258         self.writers = DictOfSets()
259
260         # SDMF files are updated differently.
261         self._version = MDMF_VERSION
262         writer_class = MDMFSlotWriteProxy
263
264         # For each (server, shnum) in self.goal, we make a
265         # write proxy for that server. We'll use this to write
266         # shares to the server.
267         for (server,shnum) in self.goal:
268             write_enabler = self._node.get_write_enabler(server)
269             renew_secret = self._node.get_renewal_secret(server)
270             cancel_secret = self._node.get_cancel_secret(server)
271             secrets = (write_enabler, renew_secret, cancel_secret)
272
273             writer = writer_class(shnum,
274                                   server.get_rref(),
275                                   self._storage_index,
276                                   secrets,
277                                   self._new_seqnum,
278                                   self.required_shares,
279                                   self.total_shares,
280                                   self.segment_size,
281                                   self.datalength)
282
283             self.writers.add(shnum, writer)
284             writer.server = server
285             known_shares = self._servermap.get_known_shares()
286             assert (server, shnum) in known_shares
287             old_versionid, old_timestamp = known_shares[(server,shnum)]
288             (old_seqnum, old_root_hash, old_salt, old_segsize,
289              old_datalength, old_k, old_N, old_prefix,
290              old_offsets_tuple) = old_versionid
291             writer.set_checkstring(old_seqnum,
292                                    old_root_hash,
293                                    old_salt)
294
295         # Our remote shares will not have a complete checkstring until
296         # after we are done writing share data and have started to write
297         # blocks. In the meantime, we need to know what to look for when
298         # writing, so that we can detect UncoordinatedWriteErrors.
299         self._checkstring = self._get_some_writer().get_checkstring()
300
301         # Now, we start pushing shares.
302         self._status.timings["setup"] = time.time() - self._started
303         # First, we encrypt, encode, and publish the shares that we need
304         # to encrypt, encode, and publish.
305
306         # Our update process fetched these for us. We need to update
307         # them in place as publishing happens.
308         self.blockhashes = {} # (shnum, [blochashes])
309         for (i, bht) in blockhashes.iteritems():
310             # We need to extract the leaves from our old hash tree.
311             old_segcount = mathutil.div_ceil(version[4],
312                                              version[3])
313             h = hashtree.IncompleteHashTree(old_segcount)
314             bht = dict(enumerate(bht))
315             h.set_hashes(bht)
316             leaves = h[h.get_leaf_index(0):]
317             for j in xrange(self.num_segments - len(leaves)):
318                 leaves.append(None)
319
320             assert len(leaves) >= self.num_segments
321             self.blockhashes[i] = leaves
322             # This list will now be the leaves that were set during the
323             # initial upload + enough empty hashes to make it a
324             # power-of-two. If we exceed a power of two boundary, we
325             # should be encoding the file over again, and should not be
326             # here. So, we have
327             #assert len(self.blockhashes[i]) == \
328             #    hashtree.roundup_pow2(self.num_segments), \
329             #        len(self.blockhashes[i])
330             # XXX: Except this doesn't work. Figure out why.
331
332         # These are filled in later, after we've modified the block hash
333         # tree suitably.
334         self.sharehash_leaves = None # eventually [sharehashes]
335         self.sharehashes = {} # shnum -> [sharehash leaves necessary to
336                               # validate the share]
337
338         self.log("Starting push")
339
340         self._state = PUSHING_BLOCKS_STATE
341         self._push()
342
343         return self.done_deferred
344
345
346     def publish(self, newdata):
347         """Publish the filenode's current contents.  Returns a Deferred that
348         fires (with None) when the publish has done as much work as it's ever
349         going to do, or errbacks with ConsistencyError if it detects a
350         simultaneous write.
351         """
352
353         # 0. Setup encoding parameters, encoder, and other such things.
354         # 1. Encrypt, encode, and publish segments.
355         assert IMutableUploadable.providedBy(newdata)
356
357         self.data = newdata
358         self.datalength = newdata.get_size()
359         #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
360         #    self._version = MDMF_VERSION
361         #else:
362         #    self._version = SDMF_VERSION
363
364         self.log("starting publish, datalen is %s" % self.datalength)
365         self._status.set_size(self.datalength)
366         self._status.set_status("Started")
367         self._started = time.time()
368
369         self.done_deferred = defer.Deferred()
370
371         self._writekey = self._node.get_writekey()
372         assert self._writekey, "need write capability to publish"
373
374         # first, which servers will we publish to? We require that the
375         # servermap was updated in MODE_WRITE, so we can depend upon the
376         # serverlist computed by that process instead of computing our own.
377         if self._servermap:
378             assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
379             # we will push a version that is one larger than anything present
380             # in the grid, according to the servermap.
381             self._new_seqnum = self._servermap.highest_seqnum() + 1
382         else:
383             # If we don't have a servermap, that's because we're doing the
384             # initial publish
385             self._new_seqnum = 1
386             self._servermap = ServerMap()
387         self._status.set_servermap(self._servermap)
388
389         self.log(format="new seqnum will be %(seqnum)d",
390                  seqnum=self._new_seqnum, level=log.NOISY)
391
392         # having an up-to-date servermap (or using a filenode that was just
393         # created for the first time) also guarantees that the following
394         # fields are available
395         self.readkey = self._node.get_readkey()
396         self.required_shares = self._node.get_required_shares()
397         assert self.required_shares is not None
398         self.total_shares = self._node.get_total_shares()
399         assert self.total_shares is not None
400         self._status.set_encoding(self.required_shares, self.total_shares)
401
402         self._pubkey = self._node.get_pubkey()
403         assert self._pubkey
404         self._privkey = self._node.get_privkey()
405         assert self._privkey
406         self._encprivkey = self._node.get_encprivkey()
407
408         sb = self._storage_broker
409         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
410         self.full_serverlist = full_serverlist # for use later, immutable
411         self.bad_servers = set() # servers who have errbacked/refused requests
412
413         # This will set self.segment_size, self.num_segments, and
414         # self.fec.
415         self.setup_encoding_parameters()
416
417         # if we experience any surprises (writes which were rejected because
418         # our test vector did not match, or shares which we didn't expect to
419         # see), we set this flag and report an UncoordinatedWriteError at the
420         # end of the publish process.
421         self.surprised = False
422
423         # we keep track of three tables. The first is our goal: which share
424         # we want to see on which servers. This is initially populated by the
425         # existing servermap.
426         self.goal = set() # pairs of (server, shnum) tuples
427
428         # the number of outstanding queries: those that are in flight and
429         # may or may not be delivered, accepted, or acknowledged. This is
430         # incremented when a query is sent, and decremented when the response
431         # returns or errbacks.
432         self.num_outstanding = 0
433
434         # the third is a table of successes: share which have actually been
435         # placed. These are populated when responses come back with success.
436         # When self.placed == self.goal, we're done.
437         self.placed = set() # (server, shnum) tuples
438
439         self.bad_share_checkstrings = {}
440
441         # This is set at the last step of the publishing process.
442         self.versioninfo = ""
443
444         # we use the servermap to populate the initial goal: this way we will
445         # try to update each existing share in place.
446         self.goal = set(self._servermap.get_known_shares())
447
448         # then we add in all the shares that were bad (corrupted, bad
449         # signatures, etc). We want to replace these.
450         for key, old_checkstring in self._servermap.get_bad_shares().items():
451             (server, shnum) = key
452             self.goal.add( (server,shnum) )
453             self.bad_share_checkstrings[(server,shnum)] = old_checkstring
454
455         # TODO: Make this part do server selection.
456         self.update_goal()
457
458         # shnum -> set of IMutableSlotWriter
459         self.writers = DictOfSets()
460
461         if self._version == MDMF_VERSION:
462             writer_class = MDMFSlotWriteProxy
463         else:
464             writer_class = SDMFSlotWriteProxy
465
466         # For each (server, shnum) in self.goal, we make a
467         # write proxy for that server. We'll use this to write
468         # shares to the server.
469         for (server,shnum) in self.goal:
470             write_enabler = self._node.get_write_enabler(server)
471             renew_secret = self._node.get_renewal_secret(server)
472             cancel_secret = self._node.get_cancel_secret(server)
473             secrets = (write_enabler, renew_secret, cancel_secret)
474
475             writer =  writer_class(shnum,
476                                    server.get_rref(),
477                                    self._storage_index,
478                                    secrets,
479                                    self._new_seqnum,
480                                    self.required_shares,
481                                    self.total_shares,
482                                    self.segment_size,
483                                    self.datalength)
484             self.writers.add(shnum, writer)
485             writer.server = server
486             known_shares = self._servermap.get_known_shares()
487             if (server, shnum) in known_shares:
488                 old_versionid, old_timestamp = known_shares[(server,shnum)]
489                 (old_seqnum, old_root_hash, old_salt, old_segsize,
490                  old_datalength, old_k, old_N, old_prefix,
491                  old_offsets_tuple) = old_versionid
492                 writer.set_checkstring(old_seqnum,
493                                        old_root_hash,
494                                        old_salt)
495             elif (server, shnum) in self.bad_share_checkstrings:
496                 old_checkstring = self.bad_share_checkstrings[(server, shnum)]
497                 writer.set_checkstring(old_checkstring)
498
499         # Our remote shares will not have a complete checkstring until
500         # after we are done writing share data and have started to write
501         # blocks. In the meantime, we need to know what to look for when
502         # writing, so that we can detect UncoordinatedWriteErrors.
503         self._checkstring = self._get_some_writer().get_checkstring()
504
505         # Now, we start pushing shares.
506         self._status.timings["setup"] = time.time() - self._started
507         # First, we encrypt, encode, and publish the shares that we need
508         # to encrypt, encode, and publish.
509
510         # This will eventually hold the block hash chain for each share
511         # that we publish. We define it this way so that empty publishes
512         # will still have something to write to the remote slot.
513         self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
514         for i in xrange(self.total_shares):
515             blocks = self.blockhashes[i]
516             for j in xrange(self.num_segments):
517                 blocks.append(None)
518         self.sharehash_leaves = None # eventually [sharehashes]
519         self.sharehashes = {} # shnum -> [sharehash leaves necessary to
520                               # validate the share]
521
522         self.log("Starting push")
523
524         self._state = PUSHING_BLOCKS_STATE
525         self._push()
526
527         return self.done_deferred
528
529     def _get_some_writer(self):
530         return list(self.writers.values()[0])[0]
531
532     def _update_status(self):
533         self._status.set_status("Sending Shares: %d placed out of %d, "
534                                 "%d messages outstanding" %
535                                 (len(self.placed),
536                                  len(self.goal),
537                                  self.num_outstanding))
538         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
539
540
541     def setup_encoding_parameters(self, offset=0):
542         if self._version == MDMF_VERSION:
543             segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
544         else:
545             segment_size = self.datalength # SDMF is only one segment
546         # this must be a multiple of self.required_shares
547         segment_size = mathutil.next_multiple(segment_size,
548                                               self.required_shares)
549         self.segment_size = segment_size
550
551         # Calculate the starting segment for the upload.
552         if segment_size:
553             # We use div_ceil instead of integer division here because
554             # it is semantically correct.
555             # If datalength isn't an even multiple of segment_size, but
556             # is larger than segment_size, datalength // segment_size
557             # will be the largest number such that num <= datalength and
558             # num % segment_size == 0. But that's not what we want,
559             # because it ignores the extra data. div_ceil will give us
560             # the right number of segments for the data that we're
561             # given.
562             self.num_segments = mathutil.div_ceil(self.datalength,
563                                                   segment_size)
564
565             self.starting_segment = offset // segment_size
566
567         else:
568             self.num_segments = 0
569             self.starting_segment = 0
570
571
572         self.log("building encoding parameters for file")
573         self.log("got segsize %d" % self.segment_size)
574         self.log("got %d segments" % self.num_segments)
575
576         if self._version == SDMF_VERSION:
577             assert self.num_segments in (0, 1) # SDMF
578         # calculate the tail segment size.
579
580         if segment_size and self.datalength:
581             self.tail_segment_size = self.datalength % segment_size
582             self.log("got tail segment size %d" % self.tail_segment_size)
583         else:
584             self.tail_segment_size = 0
585
586         if self.tail_segment_size == 0 and segment_size:
587             # The tail segment is the same size as the other segments.
588             self.tail_segment_size = segment_size
589
590         # Make FEC encoders
591         fec = codec.CRSEncoder()
592         fec.set_params(self.segment_size,
593                        self.required_shares, self.total_shares)
594         self.piece_size = fec.get_block_size()
595         self.fec = fec
596
597         if self.tail_segment_size == self.segment_size:
598             self.tail_fec = self.fec
599         else:
600             tail_fec = codec.CRSEncoder()
601             tail_fec.set_params(self.tail_segment_size,
602                                 self.required_shares,
603                                 self.total_shares)
604             self.tail_fec = tail_fec
605
606         self._current_segment = self.starting_segment
607         self.end_segment = self.num_segments - 1
608         # Now figure out where the last segment should be.
609         if self.data.get_size() != self.datalength:
610             # We're updating a few segments in the middle of a mutable
611             # file, so we don't want to republish the whole thing.
612             # (we don't have enough data to do that even if we wanted
613             # to)
614             end = self.data.get_size()
615             self.end_segment = end // segment_size
616             if end % segment_size == 0:
617                 self.end_segment -= 1
618
619         self.log("got start segment %d" % self.starting_segment)
620         self.log("got end segment %d" % self.end_segment)
621
622
623     def _push(self, ignored=None):
624         """
625         I manage state transitions. In particular, I see that we still
626         have a good enough number of writers to complete the upload
627         successfully.
628         """
629         # Can we still successfully publish this file?
630         # TODO: Keep track of outstanding queries before aborting the
631         #       process.
632         num_shnums = len(self.writers)
633         if num_shnums < self.required_shares or self.surprised:
634             return self._failure()
635
636         # Figure out what we need to do next. Each of these needs to
637         # return a deferred so that we don't block execution when this
638         # is first called in the upload method.
639         if self._state == PUSHING_BLOCKS_STATE:
640             return self.push_segment(self._current_segment)
641
642         elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
643             return self.push_everything_else()
644
645         # If we make it to this point, we were successful in placing the
646         # file.
647         return self._done()
648
649
650     def push_segment(self, segnum):
651         if self.num_segments == 0 and self._version == SDMF_VERSION:
652             self._add_dummy_salts()
653
654         if segnum > self.end_segment:
655             # We don't have any more segments to push.
656             self._state = PUSHING_EVERYTHING_ELSE_STATE
657             return self._push()
658
659         d = self._encode_segment(segnum)
660         d.addCallback(self._push_segment, segnum)
661         def _increment_segnum(ign):
662             self._current_segment += 1
663         # XXX: I don't think we need to do addBoth here -- any errBacks
664         # should be handled within push_segment.
665         d.addCallback(_increment_segnum)
666         d.addCallback(self._turn_barrier)
667         d.addCallback(self._push)
668         d.addErrback(self._failure)
669
670
671     def _turn_barrier(self, result):
672         """
673         I help the publish process avoid the recursion limit issues
674         described in #237.
675         """
676         return fireEventually(result)
677
678
679     def _add_dummy_salts(self):
680         """
681         SDMF files need a salt even if they're empty, or the signature
682         won't make sense. This method adds a dummy salt to each of our
683         SDMF writers so that they can write the signature later.
684         """
685         salt = os.urandom(16)
686         assert self._version == SDMF_VERSION
687
688         for shnum, writers in self.writers.iteritems():
689             for writer in writers:
690                 writer.put_salt(salt)
691
692
693     def _encode_segment(self, segnum):
694         """
695         I encrypt and encode the segment segnum.
696         """
697         started = time.time()
698
699         if segnum + 1 == self.num_segments:
700             segsize = self.tail_segment_size
701         else:
702             segsize = self.segment_size
703
704
705         self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
706         data = self.data.read(segsize)
707         # XXX: This is dumb. Why return a list?
708         data = "".join(data)
709
710         assert len(data) == segsize, len(data)
711
712         salt = os.urandom(16)
713
714         key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
715         self._status.set_status("Encrypting")
716         enc = AES(key)
717         crypttext = enc.process(data)
718         assert len(crypttext) == len(data)
719
720         now = time.time()
721         self._status.accumulate_encrypt_time(now - started)
722         started = now
723
724         # now apply FEC
725         if segnum + 1 == self.num_segments:
726             fec = self.tail_fec
727         else:
728             fec = self.fec
729
730         self._status.set_status("Encoding")
731         crypttext_pieces = [None] * self.required_shares
732         piece_size = fec.get_block_size()
733         for i in range(len(crypttext_pieces)):
734             offset = i * piece_size
735             piece = crypttext[offset:offset+piece_size]
736             piece = piece + "\x00"*(piece_size - len(piece)) # padding
737             crypttext_pieces[i] = piece
738             assert len(piece) == piece_size
739         d = fec.encode(crypttext_pieces)
740         def _done_encoding(res):
741             elapsed = time.time() - started
742             self._status.accumulate_encode_time(elapsed)
743             return (res, salt)
744         d.addCallback(_done_encoding)
745         return d
746
747
748     def _push_segment(self, encoded_and_salt, segnum):
749         """
750         I push (data, salt) as segment number segnum.
751         """
752         results, salt = encoded_and_salt
753         shares, shareids = results
754         self._status.set_status("Pushing segment")
755         for i in xrange(len(shares)):
756             sharedata = shares[i]
757             shareid = shareids[i]
758             if self._version == MDMF_VERSION:
759                 hashed = salt + sharedata
760             else:
761                 hashed = sharedata
762             block_hash = hashutil.block_hash(hashed)
763             self.blockhashes[shareid][segnum] = block_hash
764             # find the writer for this share
765             writers = self.writers[shareid]
766             for writer in writers:
767                 writer.put_block(sharedata, segnum, salt)
768
769
770     def push_everything_else(self):
771         """
772         I put everything else associated with a share.
773         """
774         self._pack_started = time.time()
775         self.push_encprivkey()
776         self.push_blockhashes()
777         self.push_sharehashes()
778         self.push_toplevel_hashes_and_signature()
779         d = self.finish_publishing()
780         def _change_state(ignored):
781             self._state = DONE_STATE
782         d.addCallback(_change_state)
783         d.addCallback(self._push)
784         return d
785
786
787     def push_encprivkey(self):
788         encprivkey = self._encprivkey
789         self._status.set_status("Pushing encrypted private key")
790         for shnum, writers in self.writers.iteritems():
791             for writer in writers:
792                 writer.put_encprivkey(encprivkey)
793
794
795     def push_blockhashes(self):
796         self.sharehash_leaves = [None] * len(self.blockhashes)
797         self._status.set_status("Building and pushing block hash tree")
798         for shnum, blockhashes in self.blockhashes.iteritems():
799             t = hashtree.HashTree(blockhashes)
800             self.blockhashes[shnum] = list(t)
801             # set the leaf for future use.
802             self.sharehash_leaves[shnum] = t[0]
803
804             writers = self.writers[shnum]
805             for writer in writers:
806                 writer.put_blockhashes(self.blockhashes[shnum])
807
808
809     def push_sharehashes(self):
810         self._status.set_status("Building and pushing share hash chain")
811         share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
812         for shnum in xrange(len(self.sharehash_leaves)):
813             needed_indices = share_hash_tree.needed_hashes(shnum)
814             self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
815                                              for i in needed_indices] )
816             writers = self.writers[shnum]
817             for writer in writers:
818                 writer.put_sharehashes(self.sharehashes[shnum])
819         self.root_hash = share_hash_tree[0]
820
821
822     def push_toplevel_hashes_and_signature(self):
823         # We need to to three things here:
824         #   - Push the root hash and salt hash
825         #   - Get the checkstring of the resulting layout; sign that.
826         #   - Push the signature
827         self._status.set_status("Pushing root hashes and signature")
828         for shnum in xrange(self.total_shares):
829             writers = self.writers[shnum]
830             for writer in writers:
831                 writer.put_root_hash(self.root_hash)
832         self._update_checkstring()
833         self._make_and_place_signature()
834
835
836     def _update_checkstring(self):
837         """
838         After putting the root hash, MDMF files will have the
839         checkstring written to the storage server. This means that we
840         can update our copy of the checkstring so we can detect
841         uncoordinated writes. SDMF files will have the same checkstring,
842         so we need not do anything.
843         """
844         self._checkstring = self._get_some_writer().get_checkstring()
845
846
847     def _make_and_place_signature(self):
848         """
849         I create and place the signature.
850         """
851         started = time.time()
852         self._status.set_status("Signing prefix")
853         signable = self._get_some_writer().get_signable()
854         self.signature = self._privkey.sign(signable)
855
856         for (shnum, writers) in self.writers.iteritems():
857             for writer in writers:
858                 writer.put_signature(self.signature)
859         self._status.timings['sign'] = time.time() - started
860
861
862     def finish_publishing(self):
863         # We're almost done -- we just need to put the verification key
864         # and the offsets
865         started = time.time()
866         self._status.set_status("Pushing shares")
867         self._started_pushing = started
868         ds = []
869         verification_key = self._pubkey.serialize()
870
871         for (shnum, writers) in self.writers.copy().iteritems():
872             for writer in writers:
873                 writer.put_verification_key(verification_key)
874                 self.num_outstanding += 1
875                 def _no_longer_outstanding(res):
876                     self.num_outstanding -= 1
877                     return res
878
879                 d = writer.finish_publishing()
880                 d.addBoth(_no_longer_outstanding)
881                 d.addErrback(self._connection_problem, writer)
882                 d.addCallback(self._got_write_answer, writer, started)
883                 ds.append(d)
884         self._record_verinfo()
885         self._status.timings['pack'] = time.time() - started
886         return defer.DeferredList(ds)
887
888
889     def _record_verinfo(self):
890         self.versioninfo = self._get_some_writer().get_verinfo()
891
892
893     def _connection_problem(self, f, writer):
894         """
895         We ran into a connection problem while working with writer, and
896         need to deal with that.
897         """
898         self.log("found problem: %s" % str(f))
899         self._last_failure = f
900         self.writers.discard(writer.shnum, writer)
901
902
903     def log_goal(self, goal, message=""):
904         logmsg = [message]
905         for (shnum, server) in sorted([(s,p) for (p,s) in goal]):
906             logmsg.append("sh%d to [%s]" % (shnum, server.get_name()))
907         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
908         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
909                  level=log.NOISY)
910
911     def update_goal(self):
912         # if log.recording_noisy
913         if True:
914             self.log_goal(self.goal, "before update: ")
915
916         # first, remove any bad servers from our goal
917         self.goal = set([ (server, shnum)
918                           for (server, shnum) in self.goal
919                           if server not in self.bad_servers ])
920
921         # find the homeless shares:
922         homefull_shares = set([shnum for (server, shnum) in self.goal])
923         homeless_shares = set(range(self.total_shares)) - homefull_shares
924         homeless_shares = sorted(list(homeless_shares))
925         # place them somewhere. We prefer unused servers at the beginning of
926         # the available server list.
927
928         if not homeless_shares:
929             return
930
931         # if an old share X is on a node, put the new share X there too.
932         # TODO: 1: redistribute shares to achieve one-per-server, by copying
933         #       shares from existing servers to new (less-crowded) ones. The
934         #       old shares must still be updated.
935         # TODO: 2: move those shares instead of copying them, to reduce future
936         #       update work
937
938         # this is a bit CPU intensive but easy to analyze. We create a sort
939         # order for each server. If the server is marked as bad, we don't
940         # even put them in the list. Then we care about the number of shares
941         # which have already been assigned to them. After that we care about
942         # their permutation order.
943         old_assignments = DictOfSets()
944         for (server, shnum) in self.goal:
945             old_assignments.add(server, shnum)
946
947         serverlist = []
948         for i, server in enumerate(self.full_serverlist):
949             serverid = server.get_serverid()
950             if server in self.bad_servers:
951                 continue
952             entry = (len(old_assignments.get(server, [])), i, serverid, server)
953             serverlist.append(entry)
954         serverlist.sort()
955
956         if not serverlist:
957             raise NotEnoughServersError("Ran out of non-bad servers, "
958                                         "first_error=%s" %
959                                         str(self._first_write_error),
960                                         self._first_write_error)
961
962         # we then index this serverlist with an integer, because we may have
963         # to wrap. We update the goal as we go.
964         i = 0
965         for shnum in homeless_shares:
966             (ignored1, ignored2, ignored3, server) = serverlist[i]
967             # if we are forced to send a share to a server that already has
968             # one, we may have two write requests in flight, and the
969             # servermap (which was computed before either request was sent)
970             # won't reflect the new shares, so the second response will be
971             # surprising. There is code in _got_write_answer() to tolerate
972             # this, otherwise it would cause the publish to fail with an
973             # UncoordinatedWriteError. See #546 for details of the trouble
974             # this used to cause.
975             self.goal.add( (server, shnum) )
976             i += 1
977             if i >= len(serverlist):
978                 i = 0
979         if True:
980             self.log_goal(self.goal, "after update: ")
981
982
983     def _got_write_answer(self, answer, writer, started):
984         if not answer:
985             # SDMF writers only pretend to write when readers set their
986             # blocks, salts, and so on -- they actually just write once,
987             # at the end of the upload process. In fake writes, they
988             # return defer.succeed(None). If we see that, we shouldn't
989             # bother checking it.
990             return
991
992         server = writer.server
993         lp = self.log("_got_write_answer from %s, share %d" %
994                       (server.get_name(), writer.shnum))
995
996         now = time.time()
997         elapsed = now - started
998
999         self._status.add_per_server_time(server, elapsed)
1000
1001         wrote, read_data = answer
1002
1003         surprise_shares = set(read_data.keys()) - set([writer.shnum])
1004
1005         # We need to remove from surprise_shares any shares that we are
1006         # knowingly also writing to that server from other writers.
1007
1008         # TODO: Precompute this.
1009         shares = []
1010         for shnum, writers in self.writers.iteritems():
1011             shares.extend([x.shnum for x in writers if x.server == server])
1012         known_shnums = set(shares)
1013         surprise_shares -= known_shnums
1014         self.log("found the following surprise shares: %s" %
1015                  str(surprise_shares))
1016
1017         # Now surprise shares contains all of the shares that we did not
1018         # expect to be there.
1019
1020         surprised = False
1021         for shnum in surprise_shares:
1022             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1023             checkstring = read_data[shnum][0]
1024             # What we want to do here is to see if their (seqnum,
1025             # roothash, salt) is the same as our (seqnum, roothash,
1026             # salt), or the equivalent for MDMF. The best way to do this
1027             # is to store a packed representation of our checkstring
1028             # somewhere, then not bother unpacking the other
1029             # checkstring.
1030             if checkstring == self._checkstring:
1031                 # they have the right share, somehow
1032
1033                 if (server,shnum) in self.goal:
1034                     # and we want them to have it, so we probably sent them a
1035                     # copy in an earlier write. This is ok, and avoids the
1036                     # #546 problem.
1037                     continue
1038
1039                 # They aren't in our goal, but they are still for the right
1040                 # version. Somebody else wrote them, and it's a convergent
1041                 # uncoordinated write. Pretend this is ok (don't be
1042                 # surprised), since I suspect there's a decent chance that
1043                 # we'll hit this in normal operation.
1044                 continue
1045
1046             else:
1047                 # the new shares are of a different version
1048                 if server in self._servermap.get_reachable_servers():
1049                     # we asked them about their shares, so we had knowledge
1050                     # of what they used to have. Any surprising shares must
1051                     # have come from someone else, so UCW.
1052                     surprised = True
1053                 else:
1054                     # we didn't ask them, and now we've discovered that they
1055                     # have a share we didn't know about. This indicates that
1056                     # mapupdate should have wokred harder and asked more
1057                     # servers before concluding that it knew about them all.
1058
1059                     # signal UCW, but make sure to ask this server next time,
1060                     # so we'll remember to update it if/when we retry.
1061                     surprised = True
1062                     # TODO: ask this server next time. I don't yet have a good
1063                     # way to do this. Two insufficient possibilities are:
1064                     #
1065                     # self._servermap.add_new_share(server, shnum, verinfo, now)
1066                     #  but that requires fetching/validating/parsing the whole
1067                     #  version string, and all we have is the checkstring
1068                     # self._servermap.mark_bad_share(server, shnum, checkstring)
1069                     #  that will make publish overwrite the share next time,
1070                     #  but it won't re-query the server, and it won't make
1071                     #  mapupdate search further
1072
1073                     # TODO later: when publish starts, do
1074                     # servermap.get_best_version(), extract the seqnum,
1075                     # subtract one, and store as highest-replaceable-seqnum.
1076                     # Then, if this surprise-because-we-didn't-ask share is
1077                     # of highest-replaceable-seqnum or lower, we're allowed
1078                     # to replace it: send out a new writev (or rather add it
1079                     # to self.goal and loop).
1080                     pass
1081
1082                 surprised = True
1083
1084         if surprised:
1085             self.log("they had shares %s that we didn't know about" %
1086                      (list(surprise_shares),),
1087                      parent=lp, level=log.WEIRD, umid="un9CSQ")
1088             self.surprised = True
1089
1090         if not wrote:
1091             # TODO: there are two possibilities. The first is that the server
1092             # is full (or just doesn't want to give us any room), which means
1093             # we shouldn't ask them again, but is *not* an indication of an
1094             # uncoordinated write. The second is that our testv failed, which
1095             # *does* indicate an uncoordinated write. We currently don't have
1096             # a way to tell these two apart (in fact, the storage server code
1097             # doesn't have the option of refusing our share).
1098             #
1099             # If the server is full, mark the server as bad (so we don't ask
1100             # them again), but don't set self.surprised. The loop() will find
1101             # a new server.
1102             #
1103             # If the testv failed, log it, set self.surprised, but don't
1104             # bother adding to self.bad_servers .
1105
1106             self.log("our testv failed, so the write did not happen",
1107                      parent=lp, level=log.WEIRD, umid="8sc26g")
1108             self.surprised = True
1109             self.bad_servers.add(server) # don't ask them again
1110             # use the checkstring to add information to the log message
1111             unknown_format = False
1112             for (shnum,readv) in read_data.items():
1113                 checkstring = readv[0]
1114                 version = get_version_from_checkstring(checkstring)
1115                 if version == MDMF_VERSION:
1116                     (other_seqnum,
1117                      other_roothash) = unpack_mdmf_checkstring(checkstring)
1118                 elif version == SDMF_VERSION:
1119                     (other_seqnum,
1120                      other_roothash,
1121                      other_IV) = unpack_sdmf_checkstring(checkstring)
1122                 else:
1123                     unknown_format = True
1124                 expected_version = self._servermap.version_on_server(server,
1125                                                                      shnum)
1126                 if expected_version:
1127                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1128                      offsets_tuple) = expected_version
1129                     msg = ("somebody modified the share on us:"
1130                            " shnum=%d: I thought they had #%d:R=%s," %
1131                            (shnum,
1132                             seqnum, base32.b2a(root_hash)[:4]))
1133                     if unknown_format:
1134                         msg += (" but I don't know how to read share"
1135                                 " format %d" % version)
1136                     else:
1137                         msg += " but testv reported #%d:R=%s" % \
1138                                (other_seqnum, other_roothash)
1139                     self.log(msg, parent=lp, level=log.NOISY)
1140                 # if expected_version==None, then we didn't expect to see a
1141                 # share on that server, and the 'surprise_shares' clause
1142                 # above will have logged it.
1143             return
1144
1145         # and update the servermap
1146         # self.versioninfo is set during the last phase of publishing.
1147         # If we get there, we know that responses correspond to placed
1148         # shares, and can safely execute these statements.
1149         if self.versioninfo:
1150             self.log("wrote successfully: adding new share to servermap")
1151             self._servermap.add_new_share(server, writer.shnum,
1152                                           self.versioninfo, started)
1153             self.placed.add( (server, writer.shnum) )
1154         self._update_status()
1155         # the next method in the deferred chain will check to see if
1156         # we're done and successful.
1157         return
1158
1159
1160     def _done(self):
1161         if not self._running:
1162             return
1163         self._running = False
1164         now = time.time()
1165         self._status.timings["total"] = now - self._started
1166
1167         elapsed = now - self._started_pushing
1168         self._status.timings['push'] = elapsed
1169
1170         self._status.set_active(False)
1171         self.log("Publish done, success")
1172         self._status.set_status("Finished")
1173         self._status.set_progress(1.0)
1174         # Get k and segsize, then give them to the caller.
1175         hints = {}
1176         hints['segsize'] = self.segment_size
1177         hints['k'] = self.required_shares
1178         self._node.set_downloader_hints(hints)
1179         eventually(self.done_deferred.callback, None)
1180
1181     def _failure(self, f=None):
1182         if f:
1183             self._last_failure = f
1184
1185         if not self.surprised:
1186             # We ran out of servers
1187             msg = "Publish ran out of good servers"
1188             if self._last_failure:
1189                 msg += ", last failure was: %s" % str(self._last_failure)
1190             self.log(msg)
1191             e = NotEnoughServersError(msg)
1192
1193         else:
1194             # We ran into shares that we didn't recognize, which means
1195             # that we need to return an UncoordinatedWriteError.
1196             self.log("Publish failed with UncoordinatedWriteError")
1197             e = UncoordinatedWriteError()
1198         f = failure.Failure(e)
1199         eventually(self.done_deferred.callback, f)
1200
1201
1202 class MutableFileHandle:
1203     """
1204     I am a mutable uploadable built around a filehandle-like object,
1205     usually either a StringIO instance or a handle to an actual file.
1206     """
1207     implements(IMutableUploadable)
1208
1209     def __init__(self, filehandle):
1210         # The filehandle is defined as a generally file-like object that
1211         # has these two methods. We don't care beyond that.
1212         assert hasattr(filehandle, "read")
1213         assert hasattr(filehandle, "close")
1214
1215         self._filehandle = filehandle
1216         # We must start reading at the beginning of the file, or we risk
1217         # encountering errors when the data read does not match the size
1218         # reported to the uploader.
1219         self._filehandle.seek(0)
1220
1221         # We have not yet read anything, so our position is 0.
1222         self._marker = 0
1223
1224
1225     def get_size(self):
1226         """
1227         I return the amount of data in my filehandle.
1228         """
1229         if not hasattr(self, "_size"):
1230             old_position = self._filehandle.tell()
1231             # Seek to the end of the file by seeking 0 bytes from the
1232             # file's end
1233             self._filehandle.seek(0, os.SEEK_END)
1234             self._size = self._filehandle.tell()
1235             # Restore the previous position, in case this was called
1236             # after a read.
1237             self._filehandle.seek(old_position)
1238             assert self._filehandle.tell() == old_position
1239
1240         assert hasattr(self, "_size")
1241         return self._size
1242
1243
1244     def pos(self):
1245         """
1246         I return the position of my read marker -- i.e., how much data I
1247         have already read and returned to callers.
1248         """
1249         return self._marker
1250
1251
1252     def read(self, length):
1253         """
1254         I return some data (up to length bytes) from my filehandle.
1255
1256         In most cases, I return length bytes, but sometimes I won't --
1257         for example, if I am asked to read beyond the end of a file, or
1258         an error occurs.
1259         """
1260         results = self._filehandle.read(length)
1261         self._marker += len(results)
1262         return [results]
1263
1264
1265     def close(self):
1266         """
1267         I close the underlying filehandle. Any further operations on the
1268         filehandle fail at this point.
1269         """
1270         self._filehandle.close()
1271
1272
1273 class MutableData(MutableFileHandle):
1274     """
1275     I am a mutable uploadable built around a string, which I then cast
1276     into a StringIO and treat as a filehandle.
1277     """
1278
1279     def __init__(self, s):
1280         # Take a string and return a file-like uploadable.
1281         assert isinstance(s, str)
1282
1283         MutableFileHandle.__init__(self, StringIO(s))
1284
1285
1286 class TransformingUploadable:
1287     """
1288     I am an IMutableUploadable that wraps another IMutableUploadable,
1289     and some segments that are already on the grid. When I am called to
1290     read, I handle merging of boundary segments.
1291     """
1292     implements(IMutableUploadable)
1293
1294
1295     def __init__(self, data, offset, segment_size, start, end):
1296         assert IMutableUploadable.providedBy(data)
1297
1298         self._newdata = data
1299         self._offset = offset
1300         self._segment_size = segment_size
1301         self._start = start
1302         self._end = end
1303
1304         self._read_marker = 0
1305
1306         self._first_segment_offset = offset % segment_size
1307
1308         num = self.log("TransformingUploadable: starting", parent=None)
1309         self._log_number = num
1310         self.log("got fso: %d" % self._first_segment_offset)
1311         self.log("got offset: %d" % self._offset)
1312
1313
1314     def log(self, *args, **kwargs):
1315         if 'parent' not in kwargs:
1316             kwargs['parent'] = self._log_number
1317         if "facility" not in kwargs:
1318             kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1319         return log.msg(*args, **kwargs)
1320
1321
1322     def get_size(self):
1323         return self._offset + self._newdata.get_size()
1324
1325
1326     def read(self, length):
1327         # We can get data from 3 sources here.
1328         #   1. The first of the segments provided to us.
1329         #   2. The data that we're replacing things with.
1330         #   3. The last of the segments provided to us.
1331
1332         # are we in state 0?
1333         self.log("reading %d bytes" % length)
1334
1335         old_start_data = ""
1336         old_data_length = self._first_segment_offset - self._read_marker
1337         if old_data_length > 0:
1338             if old_data_length > length:
1339                 old_data_length = length
1340             self.log("returning %d bytes of old start data" % old_data_length)
1341
1342             old_data_end = old_data_length + self._read_marker
1343             old_start_data = self._start[self._read_marker:old_data_end]
1344             length -= old_data_length
1345         else:
1346             # otherwise calculations later get screwed up.
1347             old_data_length = 0
1348
1349         # Is there enough new data to satisfy this read? If not, we need
1350         # to pad the end of the data with data from our last segment.
1351         old_end_length = length - \
1352             (self._newdata.get_size() - self._newdata.pos())
1353         old_end_data = ""
1354         if old_end_length > 0:
1355             self.log("reading %d bytes of old end data" % old_end_length)
1356
1357             # TODO: We're not explicitly checking for tail segment size
1358             # here. Is that a problem?
1359             old_data_offset = (length - old_end_length + \
1360                                old_data_length) % self._segment_size
1361             self.log("reading at offset %d" % old_data_offset)
1362             old_end = old_data_offset + old_end_length
1363             old_end_data = self._end[old_data_offset:old_end]
1364             length -= old_end_length
1365             assert length == self._newdata.get_size() - self._newdata.pos()
1366
1367         self.log("reading %d bytes of new data" % length)
1368         new_data = self._newdata.read(length)
1369         new_data = "".join(new_data)
1370
1371         self._read_marker += len(old_start_data + new_data + old_end_data)
1372
1373         return old_start_data + new_data + old_end_data
1374
1375     def close(self):
1376         pass