]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
remove plaintext-hashing code from the helper interface, to close #722
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, InsufficientVersionError, NoServersError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
23
24 from cStringIO import StringIO
25
26
27 KiB=1024
28 MiB=1024*KiB
29 GiB=1024*MiB
30 TiB=1024*GiB
31 PiB=1024*TiB
32
33 class HaveAllPeersError(Exception):
34     # we use this to jump out of the loop
35     pass
36
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
39     pass
40
41 class UploadResults(Copyable, RemoteCopy):
42     implements(IUploadResults)
43     # note: don't change this string, it needs to match the value used on the
44     # helper, and it does *not* need to match the fully-qualified
45     # package/module/class name
46     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
47     copytype = typeToCopy
48
49     # also, think twice about changing the shape of any existing attribute,
50     # because instances of this class are sent from the helper to its client,
51     # so changing this may break compatibility. Consider adding new fields
52     # instead of modifying existing ones.
53
54     def __init__(self):
55         self.timings = {} # dict of name to number of seconds
56         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
58         self.file_size = None
59         self.ciphertext_fetched = None # how much the helper fetched
60         self.uri = None
61         self.preexisting_shares = None # count of shares already present
62         self.pushed_shares = None # count of shares we pushed
63
64
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
70 EXTENSION_SIZE = 1000
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
72 # this.
73
74 class PeerTracker:
75     def __init__(self, peerid, storage_server,
76                  sharesize, blocksize, num_segments, num_share_hashes,
77                  storage_index,
78                  bucket_renewal_secret, bucket_cancel_secret):
79         precondition(isinstance(peerid, str), peerid)
80         precondition(len(peerid) == 20, peerid)
81         self.peerid = peerid
82         self._storageserver = storage_server # to an RIStorageServer
83         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84         self.sharesize = sharesize
85
86         wbp = layout.make_write_bucket_proxy(None, sharesize,
87                                              blocksize, num_segments,
88                                              num_share_hashes,
89                                              EXTENSION_SIZE, peerid)
90         self.wbp_class = wbp.__class__ # to create more of them
91         self.allocated_size = wbp.get_allocated_size()
92         self.blocksize = blocksize
93         self.num_segments = num_segments
94         self.num_share_hashes = num_share_hashes
95         self.storage_index = storage_index
96
97         self.renew_secret = bucket_renewal_secret
98         self.cancel_secret = bucket_cancel_secret
99
100     def __repr__(self):
101         return ("<PeerTracker for peer %s and SI %s>"
102                 % (idlib.shortnodeid_b2a(self.peerid),
103                    si_b2a(self.storage_index)[:5]))
104
105     def query(self, sharenums):
106         d = self._storageserver.callRemote("allocate_buckets",
107                                            self.storage_index,
108                                            self.renew_secret,
109                                            self.cancel_secret,
110                                            sharenums,
111                                            self.allocated_size,
112                                            canary=Referenceable())
113         d.addCallback(self._got_reply)
114         return d
115
116     def _got_reply(self, (alreadygot, buckets)):
117         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
118         b = {}
119         for sharenum, rref in buckets.iteritems():
120             bp = self.wbp_class(rref, self.sharesize,
121                                 self.blocksize,
122                                 self.num_segments,
123                                 self.num_share_hashes,
124                                 EXTENSION_SIZE,
125                                 self.peerid)
126             b[sharenum] = bp
127         self.buckets.update(b)
128         return (alreadygot, set(b.keys()))
129
130 class Tahoe2PeerSelector:
131
132     def __init__(self, upload_id, logparent=None, upload_status=None):
133         self.upload_id = upload_id
134         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
135         self.error_count = 0
136         self.num_peers_contacted = 0
137         self.last_failure_msg = None
138         self._status = IUploadStatus(upload_status)
139         self._log_parent = log.msg("%s starting" % self, parent=logparent)
140
141     def __repr__(self):
142         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
143
144     def get_shareholders(self, client,
145                          storage_index, share_size, block_size,
146                          num_segments, total_shares, shares_of_happiness):
147         """
148         @return: (used_peers, already_peers), where used_peers is a set of
149                  PeerTracker instances that have agreed to hold some shares
150                  for us (the shnum is stashed inside the PeerTracker),
151                  and already_peers is a dict mapping shnum to a peer
152                  which claims to already have the share.
153         """
154
155         if self._status:
156             self._status.set_status("Contacting Peers..")
157
158         self.total_shares = total_shares
159         self.shares_of_happiness = shares_of_happiness
160
161         self.homeless_shares = range(total_shares)
162         # self.uncontacted_peers = list() # peers we haven't asked yet
163         self.contacted_peers = [] # peers worth asking again
164         self.contacted_peers2 = [] # peers that we have asked again
165         self._started_second_pass = False
166         self.use_peers = set() # PeerTrackers that have shares assigned to them
167         self.preexisting_shares = {} # sharenum -> peerid holding the share
168
169         sb = client.storage_broker
170         peers = list(sb.get_servers(storage_index))
171         if not peers:
172             raise NoServersError("client gave us zero peers")
173
174         # this needed_hashes computation should mirror
175         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176         # (instead of a HashTree) because we don't require actual hashing
177         # just to count the levels.
178         ht = hashtree.IncompleteHashTree(total_shares)
179         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
180
181         # figure out how much space to ask for
182         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183                                              num_share_hashes, EXTENSION_SIZE,
184                                              None)
185         allocated_size = wbp.get_allocated_size()
186
187         # filter the list of peers according to which ones can accomodate
188         # this request. This excludes older peers (which used a 4-byte size
189         # field) from getting large shares (for files larger than about
190         # 12GiB). See #439 for details.
191         def _get_maxsize(peer):
192             (peerid, conn) = peer
193             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194             return v1["maximum-immutable-share-size"]
195         peers = [peer for peer in peers
196                  if _get_maxsize(peer) >= allocated_size]
197         if not peers:
198             raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
199
200         # decide upon the renewal/cancel secrets, to include them in the
201         # allocat_buckets query.
202         client_renewal_secret = client.get_renewal_secret()
203         client_cancel_secret = client.get_cancel_secret()
204
205         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
206                                                        storage_index)
207         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
208                                                      storage_index)
209
210         trackers = [ PeerTracker(peerid, conn,
211                                  share_size, block_size,
212                                  num_segments, num_share_hashes,
213                                  storage_index,
214                                  bucket_renewal_secret_hash(file_renewal_secret,
215                                                             peerid),
216                                  bucket_cancel_secret_hash(file_cancel_secret,
217                                                            peerid),
218                                  )
219                      for (peerid, conn) in peers ]
220         self.uncontacted_peers = trackers
221
222         d = defer.maybeDeferred(self._loop)
223         return d
224
225     def _loop(self):
226         if not self.homeless_shares:
227             # all done
228             msg = ("placed all %d shares, "
229                    "sent %d queries to %d peers, "
230                    "%d queries placed some shares, %d placed none, "
231                    "got %d errors" %
232                    (self.total_shares,
233                     self.query_count, self.num_peers_contacted,
234                     self.good_query_count, self.bad_query_count,
235                     self.error_count))
236             log.msg("peer selection successful for %s: %s" % (self, msg),
237                     parent=self._log_parent)
238             return (self.use_peers, self.preexisting_shares)
239
240         if self.uncontacted_peers:
241             peer = self.uncontacted_peers.pop(0)
242             # TODO: don't pre-convert all peerids to PeerTrackers
243             assert isinstance(peer, PeerTracker)
244
245             shares_to_ask = set([self.homeless_shares.pop(0)])
246             self.query_count += 1
247             self.num_peers_contacted += 1
248             if self._status:
249                 self._status.set_status("Contacting Peers [%s] (first query),"
250                                         " %d shares left.."
251                                         % (idlib.shortnodeid_b2a(peer.peerid),
252                                            len(self.homeless_shares)))
253             d = peer.query(shares_to_ask)
254             d.addBoth(self._got_response, peer, shares_to_ask,
255                       self.contacted_peers)
256             return d
257         elif self.contacted_peers:
258             # ask a peer that we've already asked.
259             if not self._started_second_pass:
260                 log.msg("starting second pass", parent=self._log_parent,
261                         level=log.NOISY)
262                 self._started_second_pass = True
263             num_shares = mathutil.div_ceil(len(self.homeless_shares),
264                                            len(self.contacted_peers))
265             peer = self.contacted_peers.pop(0)
266             shares_to_ask = set(self.homeless_shares[:num_shares])
267             self.homeless_shares[:num_shares] = []
268             self.query_count += 1
269             if self._status:
270                 self._status.set_status("Contacting Peers [%s] (second query),"
271                                         " %d shares left.."
272                                         % (idlib.shortnodeid_b2a(peer.peerid),
273                                            len(self.homeless_shares)))
274             d = peer.query(shares_to_ask)
275             d.addBoth(self._got_response, peer, shares_to_ask,
276                       self.contacted_peers2)
277             return d
278         elif self.contacted_peers2:
279             # we've finished the second-or-later pass. Move all the remaining
280             # peers back into self.contacted_peers for the next pass.
281             self.contacted_peers.extend(self.contacted_peers2)
282             self.contacted_peers[:] = []
283             return self._loop()
284         else:
285             # no more peers. If we haven't placed enough shares, we fail.
286             placed_shares = self.total_shares - len(self.homeless_shares)
287             if placed_shares < self.shares_of_happiness:
288                 msg = ("placed %d shares out of %d total (%d homeless), "
289                        "sent %d queries to %d peers, "
290                        "%d queries placed some shares, %d placed none, "
291                        "got %d errors" %
292                        (self.total_shares - len(self.homeless_shares),
293                         self.total_shares, len(self.homeless_shares),
294                         self.query_count, self.num_peers_contacted,
295                         self.good_query_count, self.bad_query_count,
296                         self.error_count))
297                 msg = "peer selection failed for %s: %s" % (self, msg)
298                 if self.last_failure_msg:
299                     msg += " (%s)" % (self.last_failure_msg,)
300                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
301                 raise NotEnoughSharesError(msg, placed_shares,
302                                            self.shares_of_happiness)
303             else:
304                 # we placed enough to be happy, so we're done
305                 if self._status:
306                     self._status.set_status("Placed all shares")
307                 return self.use_peers
308
309     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
310         if isinstance(res, failure.Failure):
311             # This is unusual, and probably indicates a bug or a network
312             # problem.
313             log.msg("%s got error during peer selection: %s" % (peer, res),
314                     level=log.UNUSUAL, parent=self._log_parent)
315             self.error_count += 1
316             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
317             if (self.uncontacted_peers
318                 or self.contacted_peers
319                 or self.contacted_peers2):
320                 # there is still hope, so just loop
321                 pass
322             else:
323                 # No more peers, so this upload might fail (it depends upon
324                 # whether we've hit shares_of_happiness or not). Log the last
325                 # failure we got: if a coding error causes all peers to fail
326                 # in the same way, this allows the common failure to be seen
327                 # by the uploader and should help with debugging
328                 msg = ("last failure (from %s) was: %s" % (peer, res))
329                 self.last_failure_msg = msg
330         else:
331             (alreadygot, allocated) = res
332             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
333                     % (idlib.shortnodeid_b2a(peer.peerid),
334                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
335                     level=log.NOISY, parent=self._log_parent)
336             progress = False
337             for s in alreadygot:
338                 self.preexisting_shares[s] = peer.peerid
339                 if s in self.homeless_shares:
340                     self.homeless_shares.remove(s)
341                     progress = True
342
343             # the PeerTracker will remember which shares were allocated on
344             # that peer. We just have to remember to use them.
345             if allocated:
346                 self.use_peers.add(peer)
347                 progress = True
348
349             not_yet_present = set(shares_to_ask) - set(alreadygot)
350             still_homeless = not_yet_present - set(allocated)
351
352             if progress:
353                 # they accepted or already had at least one share, so
354                 # progress has been made
355                 self.good_query_count += 1
356             else:
357                 self.bad_query_count += 1
358
359             if still_homeless:
360                 # In networks with lots of space, this is very unusual and
361                 # probably indicates an error. In networks with peers that
362                 # are full, it is merely unusual. In networks that are very
363                 # full, it is common, and many uploads will fail. In most
364                 # cases, this is obviously not fatal, and we'll just use some
365                 # other peers.
366
367                 # some shares are still homeless, keep trying to find them a
368                 # home. The ones that were rejected get first priority.
369                 self.homeless_shares = (list(still_homeless)
370                                         + self.homeless_shares)
371                 # Since they were unable to accept all of our requests, so it
372                 # is safe to assume that asking them again won't help.
373             else:
374                 # if they *were* able to accept everything, they might be
375                 # willing to accept even more.
376                 put_peer_here.append(peer)
377
378         # now loop
379         return self._loop()
380
381
382 class EncryptAnUploadable:
383     """This is a wrapper that takes an IUploadable and provides
384     IEncryptedUploadable."""
385     implements(IEncryptedUploadable)
386     CHUNKSIZE = 50*1024
387
388     def __init__(self, original, log_parent=None):
389         self.original = IUploadable(original)
390         self._log_number = log_parent
391         self._encryptor = None
392         self._plaintext_hasher = plaintext_hasher()
393         self._plaintext_segment_hasher = None
394         self._plaintext_segment_hashes = []
395         self._encoding_parameters = None
396         self._file_size = None
397         self._ciphertext_bytes_read = 0
398         self._status = None
399
400     def set_upload_status(self, upload_status):
401         self._status = IUploadStatus(upload_status)
402         self.original.set_upload_status(upload_status)
403
404     def log(self, *args, **kwargs):
405         if "facility" not in kwargs:
406             kwargs["facility"] = "upload.encryption"
407         if "parent" not in kwargs:
408             kwargs["parent"] = self._log_number
409         return log.msg(*args, **kwargs)
410
411     def get_size(self):
412         if self._file_size is not None:
413             return defer.succeed(self._file_size)
414         d = self.original.get_size()
415         def _got_size(size):
416             self._file_size = size
417             if self._status:
418                 self._status.set_size(size)
419             return size
420         d.addCallback(_got_size)
421         return d
422
423     def get_all_encoding_parameters(self):
424         if self._encoding_parameters is not None:
425             return defer.succeed(self._encoding_parameters)
426         d = self.original.get_all_encoding_parameters()
427         def _got(encoding_parameters):
428             (k, happy, n, segsize) = encoding_parameters
429             self._segment_size = segsize # used by segment hashers
430             self._encoding_parameters = encoding_parameters
431             self.log("my encoding parameters: %s" % (encoding_parameters,),
432                      level=log.NOISY)
433             return encoding_parameters
434         d.addCallback(_got)
435         return d
436
437     def _get_encryptor(self):
438         if self._encryptor:
439             return defer.succeed(self._encryptor)
440
441         d = self.original.get_encryption_key()
442         def _got(key):
443             e = AES(key)
444             self._encryptor = e
445
446             storage_index = storage_index_hash(key)
447             assert isinstance(storage_index, str)
448             # There's no point to having the SI be longer than the key, so we
449             # specify that it is truncated to the same 128 bits as the AES key.
450             assert len(storage_index) == 16  # SHA-256 truncated to 128b
451             self._storage_index = storage_index
452             if self._status:
453                 self._status.set_storage_index(storage_index)
454             return e
455         d.addCallback(_got)
456         return d
457
458     def get_storage_index(self):
459         d = self._get_encryptor()
460         d.addCallback(lambda res: self._storage_index)
461         return d
462
463     def _get_segment_hasher(self):
464         p = self._plaintext_segment_hasher
465         if p:
466             left = self._segment_size - self._plaintext_segment_hashed_bytes
467             return p, left
468         p = plaintext_segment_hasher()
469         self._plaintext_segment_hasher = p
470         self._plaintext_segment_hashed_bytes = 0
471         return p, self._segment_size
472
473     def _update_segment_hash(self, chunk):
474         offset = 0
475         while offset < len(chunk):
476             p, segment_left = self._get_segment_hasher()
477             chunk_left = len(chunk) - offset
478             this_segment = min(chunk_left, segment_left)
479             p.update(chunk[offset:offset+this_segment])
480             self._plaintext_segment_hashed_bytes += this_segment
481
482             if self._plaintext_segment_hashed_bytes == self._segment_size:
483                 # we've filled this segment
484                 self._plaintext_segment_hashes.append(p.digest())
485                 self._plaintext_segment_hasher = None
486                 self.log("closed hash [%d]: %dB" %
487                          (len(self._plaintext_segment_hashes)-1,
488                           self._plaintext_segment_hashed_bytes),
489                          level=log.NOISY)
490                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
491                          segnum=len(self._plaintext_segment_hashes)-1,
492                          hash=base32.b2a(p.digest()),
493                          level=log.NOISY)
494
495             offset += this_segment
496
497
498     def read_encrypted(self, length, hash_only):
499         # make sure our parameters have been set up first
500         d = self.get_all_encoding_parameters()
501         # and size
502         d.addCallback(lambda ignored: self.get_size())
503         d.addCallback(lambda ignored: self._get_encryptor())
504         # then fetch and encrypt the plaintext. The unusual structure here
505         # (passing a Deferred *into* a function) is needed to avoid
506         # overflowing the stack: Deferreds don't optimize out tail recursion.
507         # We also pass in a list, to which _read_encrypted will append
508         # ciphertext.
509         ciphertext = []
510         d2 = defer.Deferred()
511         d.addCallback(lambda ignored:
512                       self._read_encrypted(length, ciphertext, hash_only, d2))
513         d.addCallback(lambda ignored: d2)
514         return d
515
516     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
517         if not remaining:
518             fire_when_done.callback(ciphertext)
519             return None
520         # tolerate large length= values without consuming a lot of RAM by
521         # reading just a chunk (say 50kB) at a time. This only really matters
522         # when hash_only==True (i.e. resuming an interrupted upload), since
523         # that's the case where we will be skipping over a lot of data.
524         size = min(remaining, self.CHUNKSIZE)
525         remaining = remaining - size
526         # read a chunk of plaintext..
527         d = defer.maybeDeferred(self.original.read, size)
528         # N.B.: if read() is synchronous, then since everything else is
529         # actually synchronous too, we'd blow the stack unless we stall for a
530         # tick. Once you accept a Deferred from IUploadable.read(), you must
531         # be prepared to have it fire immediately too.
532         d.addCallback(fireEventually)
533         def _good(plaintext):
534             # and encrypt it..
535             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
536             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
537             ciphertext.extend(ct)
538             self._read_encrypted(remaining, ciphertext, hash_only,
539                                  fire_when_done)
540         def _err(why):
541             fire_when_done.errback(why)
542         d.addCallback(_good)
543         d.addErrback(_err)
544         return None
545
546     def _hash_and_encrypt_plaintext(self, data, hash_only):
547         assert isinstance(data, (tuple, list)), type(data)
548         data = list(data)
549         cryptdata = []
550         # we use data.pop(0) instead of 'for chunk in data' to save
551         # memory: each chunk is destroyed as soon as we're done with it.
552         bytes_processed = 0
553         while data:
554             chunk = data.pop(0)
555             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
556                      level=log.NOISY)
557             bytes_processed += len(chunk)
558             self._plaintext_hasher.update(chunk)
559             self._update_segment_hash(chunk)
560             # TODO: we have to encrypt the data (even if hash_only==True)
561             # because pycryptopp's AES-CTR implementation doesn't offer a
562             # way to change the counter value. Once pycryptopp acquires
563             # this ability, change this to simply update the counter
564             # before each call to (hash_only==False) _encryptor.process()
565             ciphertext = self._encryptor.process(chunk)
566             if hash_only:
567                 self.log("  skipping encryption", level=log.NOISY)
568             else:
569                 cryptdata.append(ciphertext)
570             del ciphertext
571             del chunk
572         self._ciphertext_bytes_read += bytes_processed
573         if self._status:
574             progress = float(self._ciphertext_bytes_read) / self._file_size
575             self._status.set_progress(1, progress)
576         return cryptdata
577
578
579     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
580         # this is currently unused, but will live again when we fix #453
581         if len(self._plaintext_segment_hashes) < num_segments:
582             # close out the last one
583             assert len(self._plaintext_segment_hashes) == num_segments-1
584             p, segment_left = self._get_segment_hasher()
585             self._plaintext_segment_hashes.append(p.digest())
586             del self._plaintext_segment_hasher
587             self.log("closing plaintext leaf hasher, hashed %d bytes" %
588                      self._plaintext_segment_hashed_bytes,
589                      level=log.NOISY)
590             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
591                      segnum=len(self._plaintext_segment_hashes)-1,
592                      hash=base32.b2a(p.digest()),
593                      level=log.NOISY)
594         assert len(self._plaintext_segment_hashes) == num_segments
595         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
596
597     def get_plaintext_hash(self):
598         h = self._plaintext_hasher.digest()
599         return defer.succeed(h)
600
601     def close(self):
602         return self.original.close()
603
604 class UploadStatus:
605     implements(IUploadStatus)
606     statusid_counter = itertools.count(0)
607
608     def __init__(self):
609         self.storage_index = None
610         self.size = None
611         self.helper = False
612         self.status = "Not started"
613         self.progress = [0.0, 0.0, 0.0]
614         self.active = True
615         self.results = None
616         self.counter = self.statusid_counter.next()
617         self.started = time.time()
618
619     def get_started(self):
620         return self.started
621     def get_storage_index(self):
622         return self.storage_index
623     def get_size(self):
624         return self.size
625     def using_helper(self):
626         return self.helper
627     def get_status(self):
628         return self.status
629     def get_progress(self):
630         return tuple(self.progress)
631     def get_active(self):
632         return self.active
633     def get_results(self):
634         return self.results
635     def get_counter(self):
636         return self.counter
637
638     def set_storage_index(self, si):
639         self.storage_index = si
640     def set_size(self, size):
641         self.size = size
642     def set_helper(self, helper):
643         self.helper = helper
644     def set_status(self, status):
645         self.status = status
646     def set_progress(self, which, value):
647         # [0]: chk, [1]: ciphertext, [2]: encode+push
648         self.progress[which] = value
649     def set_active(self, value):
650         self.active = value
651     def set_results(self, value):
652         self.results = value
653
654 class CHKUploader:
655     peer_selector_class = Tahoe2PeerSelector
656
657     def __init__(self, client):
658         self._client = client
659         self._log_number = self._client.log("CHKUploader starting")
660         self._encoder = None
661         self._results = UploadResults()
662         self._storage_index = None
663         self._upload_status = UploadStatus()
664         self._upload_status.set_helper(False)
665         self._upload_status.set_active(True)
666         self._upload_status.set_results(self._results)
667
668         # locate_all_shareholders() will create the following attribute:
669         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
670
671     def log(self, *args, **kwargs):
672         if "parent" not in kwargs:
673             kwargs["parent"] = self._log_number
674         if "facility" not in kwargs:
675             kwargs["facility"] = "tahoe.upload"
676         return self._client.log(*args, **kwargs)
677
678     def start(self, encrypted_uploadable):
679         """Start uploading the file.
680
681         Returns a Deferred that will fire with the UploadResults instance.
682         """
683
684         self._started = time.time()
685         eu = IEncryptedUploadable(encrypted_uploadable)
686         self.log("starting upload of %s" % eu)
687
688         eu.set_upload_status(self._upload_status)
689         d = self.start_encrypted(eu)
690         def _done(uploadresults):
691             self._upload_status.set_active(False)
692             return uploadresults
693         d.addBoth(_done)
694         return d
695
696     def abort(self):
697         """Call this if the upload must be abandoned before it completes.
698         This will tell the shareholders to delete their partial shares. I
699         return a Deferred that fires when these messages have been acked."""
700         if not self._encoder:
701             # how did you call abort() before calling start() ?
702             return defer.succeed(None)
703         return self._encoder.abort()
704
705     def start_encrypted(self, encrypted):
706         """ Returns a Deferred that will fire with the UploadResults instance. """
707         eu = IEncryptedUploadable(encrypted)
708
709         started = time.time()
710         self._encoder = e = encode.Encoder(self._log_number,
711                                            self._upload_status)
712         d = e.set_encrypted_uploadable(eu)
713         d.addCallback(self.locate_all_shareholders, started)
714         d.addCallback(self.set_shareholders, e)
715         d.addCallback(lambda res: e.start())
716         d.addCallback(self._encrypted_done)
717         return d
718
719     def locate_all_shareholders(self, encoder, started):
720         peer_selection_started = now = time.time()
721         self._storage_index_elapsed = now - started
722         storage_index = encoder.get_param("storage_index")
723         self._storage_index = storage_index
724         upload_id = si_b2a(storage_index)[:5]
725         self.log("using storage index %s" % upload_id)
726         peer_selector = self.peer_selector_class(upload_id, self._log_number,
727                                                  self._upload_status)
728
729         share_size = encoder.get_param("share_size")
730         block_size = encoder.get_param("block_size")
731         num_segments = encoder.get_param("num_segments")
732         k,desired,n = encoder.get_param("share_counts")
733
734         self._peer_selection_started = time.time()
735         d = peer_selector.get_shareholders(self._client, storage_index,
736                                            share_size, block_size,
737                                            num_segments, n, desired)
738         def _done(res):
739             self._peer_selection_elapsed = time.time() - peer_selection_started
740             return res
741         d.addCallback(_done)
742         return d
743
744     def set_shareholders(self, (used_peers, already_peers), encoder):
745         """
746         @param used_peers: a sequence of PeerTracker objects
747         @paran already_peers: a dict mapping sharenum to a peerid that
748                               claims to already have this share
749         """
750         self.log("_send_shares, used_peers is %s" % (used_peers,))
751         # record already-present shares in self._results
752         self._results.preexisting_shares = len(already_peers)
753
754         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
755         for peer in used_peers:
756             assert isinstance(peer, PeerTracker)
757         buckets = {}
758         for peer in used_peers:
759             buckets.update(peer.buckets)
760             for shnum in peer.buckets:
761                 self._peer_trackers[shnum] = peer
762         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
763         encoder.set_shareholders(buckets)
764
765     def _encrypted_done(self, verifycap):
766         """ Returns a Deferred that will fire with the UploadResults instance. """
767         r = self._results
768         for shnum in self._encoder.get_shares_placed():
769             peer_tracker = self._peer_trackers[shnum]
770             peerid = peer_tracker.peerid
771             peerid_s = idlib.shortnodeid_b2a(peerid)
772             r.sharemap.add(shnum, peerid)
773             r.servermap.add(peerid, shnum)
774         r.pushed_shares = len(self._encoder.get_shares_placed())
775         now = time.time()
776         r.file_size = self._encoder.file_size
777         r.timings["total"] = now - self._started
778         r.timings["storage_index"] = self._storage_index_elapsed
779         r.timings["peer_selection"] = self._peer_selection_elapsed
780         r.timings.update(self._encoder.get_times())
781         r.uri_extension_data = self._encoder.get_uri_extension_data()
782         r.verifycapstr = verifycap.to_string()
783         return r
784
785     def get_upload_status(self):
786         return self._upload_status
787
788 def read_this_many_bytes(uploadable, size, prepend_data=[]):
789     if size == 0:
790         return defer.succeed([])
791     d = uploadable.read(size)
792     def _got(data):
793         assert isinstance(data, list)
794         bytes = sum([len(piece) for piece in data])
795         assert bytes > 0
796         assert bytes <= size
797         remaining = size - bytes
798         if remaining:
799             return read_this_many_bytes(uploadable, remaining,
800                                         prepend_data + data)
801         return prepend_data + data
802     d.addCallback(_got)
803     return d
804
805 class LiteralUploader:
806
807     def __init__(self, client):
808         self._client = client
809         self._results = UploadResults()
810         self._status = s = UploadStatus()
811         s.set_storage_index(None)
812         s.set_helper(False)
813         s.set_progress(0, 1.0)
814         s.set_active(False)
815         s.set_results(self._results)
816
817     def start(self, uploadable):
818         uploadable = IUploadable(uploadable)
819         d = uploadable.get_size()
820         def _got_size(size):
821             self._size = size
822             self._status.set_size(size)
823             self._results.file_size = size
824             return read_this_many_bytes(uploadable, size)
825         d.addCallback(_got_size)
826         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
827         d.addCallback(lambda u: u.to_string())
828         d.addCallback(self._build_results)
829         return d
830
831     def _build_results(self, uri):
832         self._results.uri = uri
833         self._status.set_status("Done")
834         self._status.set_progress(1, 1.0)
835         self._status.set_progress(2, 1.0)
836         return self._results
837
838     def close(self):
839         pass
840
841     def get_upload_status(self):
842         return self._status
843
844 class RemoteEncryptedUploadable(Referenceable):
845     implements(RIEncryptedUploadable)
846
847     def __init__(self, encrypted_uploadable, upload_status):
848         self._eu = IEncryptedUploadable(encrypted_uploadable)
849         self._offset = 0
850         self._bytes_sent = 0
851         self._status = IUploadStatus(upload_status)
852         # we are responsible for updating the status string while we run, and
853         # for setting the ciphertext-fetch progress.
854         self._size = None
855
856     def get_size(self):
857         if self._size is not None:
858             return defer.succeed(self._size)
859         d = self._eu.get_size()
860         def _got_size(size):
861             self._size = size
862             return size
863         d.addCallback(_got_size)
864         return d
865
866     def remote_get_size(self):
867         return self.get_size()
868     def remote_get_all_encoding_parameters(self):
869         return self._eu.get_all_encoding_parameters()
870
871     def _read_encrypted(self, length, hash_only):
872         d = self._eu.read_encrypted(length, hash_only)
873         def _read(strings):
874             if hash_only:
875                 self._offset += length
876             else:
877                 size = sum([len(data) for data in strings])
878                 self._offset += size
879             return strings
880         d.addCallback(_read)
881         return d
882
883     def remote_read_encrypted(self, offset, length):
884         # we don't support seek backwards, but we allow skipping forwards
885         precondition(offset >= 0, offset)
886         precondition(length >= 0, length)
887         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
888                      level=log.NOISY)
889         precondition(offset >= self._offset, offset, self._offset)
890         if offset > self._offset:
891             # read the data from disk anyways, to build up the hash tree
892             skip = offset - self._offset
893             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
894                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
895             d = self._read_encrypted(skip, hash_only=True)
896         else:
897             d = defer.succeed(None)
898
899         def _at_correct_offset(res):
900             assert offset == self._offset, "%d != %d" % (offset, self._offset)
901             return self._read_encrypted(length, hash_only=False)
902         d.addCallback(_at_correct_offset)
903
904         def _read(strings):
905             size = sum([len(data) for data in strings])
906             self._bytes_sent += size
907             return strings
908         d.addCallback(_read)
909         return d
910
911     def remote_close(self):
912         return self._eu.close()
913
914
915 class AssistedUploader:
916
917     def __init__(self, helper):
918         self._helper = helper
919         self._log_number = log.msg("AssistedUploader starting")
920         self._storage_index = None
921         self._upload_status = s = UploadStatus()
922         s.set_helper(True)
923         s.set_active(True)
924
925     def log(self, *args, **kwargs):
926         if "parent" not in kwargs:
927             kwargs["parent"] = self._log_number
928         return log.msg(*args, **kwargs)
929
930     def start(self, encrypted_uploadable, storage_index):
931         """Start uploading the file.
932
933         Returns a Deferred that will fire with the UploadResults instance.
934         """
935         precondition(isinstance(storage_index, str), storage_index)
936         self._started = time.time()
937         eu = IEncryptedUploadable(encrypted_uploadable)
938         eu.set_upload_status(self._upload_status)
939         self._encuploadable = eu
940         self._storage_index = storage_index
941         d = eu.get_size()
942         d.addCallback(self._got_size)
943         d.addCallback(lambda res: eu.get_all_encoding_parameters())
944         d.addCallback(self._got_all_encoding_parameters)
945         d.addCallback(self._contact_helper)
946         d.addCallback(self._build_verifycap)
947         def _done(res):
948             self._upload_status.set_active(False)
949             return res
950         d.addBoth(_done)
951         return d
952
953     def _got_size(self, size):
954         self._size = size
955         self._upload_status.set_size(size)
956
957     def _got_all_encoding_parameters(self, params):
958         k, happy, n, segment_size = params
959         # stash these for URI generation later
960         self._needed_shares = k
961         self._total_shares = n
962         self._segment_size = segment_size
963
964     def _contact_helper(self, res):
965         now = self._time_contacting_helper_start = time.time()
966         self._storage_index_elapsed = now - self._started
967         self.log(format="contacting helper for SI %(si)s..",
968                  si=si_b2a(self._storage_index))
969         self._upload_status.set_status("Contacting Helper")
970         d = self._helper.callRemote("upload_chk", self._storage_index)
971         d.addCallback(self._contacted_helper)
972         return d
973
974     def _contacted_helper(self, (upload_results, upload_helper)):
975         now = time.time()
976         elapsed = now - self._time_contacting_helper_start
977         self._elapsed_time_contacting_helper = elapsed
978         if upload_helper:
979             self.log("helper says we need to upload")
980             self._upload_status.set_status("Uploading Ciphertext")
981             # we need to upload the file
982             reu = RemoteEncryptedUploadable(self._encuploadable,
983                                             self._upload_status)
984             # let it pre-compute the size for progress purposes
985             d = reu.get_size()
986             d.addCallback(lambda ignored:
987                           upload_helper.callRemote("upload", reu))
988             # this Deferred will fire with the upload results
989             return d
990         self.log("helper says file is already uploaded")
991         self._upload_status.set_progress(1, 1.0)
992         self._upload_status.set_results(upload_results)
993         return upload_results
994
995     def _convert_old_upload_results(self, upload_results):
996         # pre-1.3.0 helpers return upload results which contain a mapping
997         # from shnum to a single human-readable string, containing things
998         # like "Found on [x],[y],[z]" (for healthy files that were already in
999         # the grid), "Found on [x]" (for files that needed upload but which
1000         # discovered pre-existing shares), and "Placed on [x]" (for newly
1001         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1002         # set of binary serverid strings.
1003
1004         # the old results are too hard to deal with (they don't even contain
1005         # as much information as the new results, since the nodeids are
1006         # abbreviated), so if we detect old results, just clobber them.
1007
1008         sharemap = upload_results.sharemap
1009         if str in [type(v) for v in sharemap.values()]:
1010             upload_results.sharemap = None
1011
1012     def _build_verifycap(self, upload_results):
1013         self.log("upload finished, building readcap")
1014         self._convert_old_upload_results(upload_results)
1015         self._upload_status.set_status("Building Readcap")
1016         r = upload_results
1017         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1018         assert r.uri_extension_data["total_shares"] == self._total_shares
1019         assert r.uri_extension_data["segment_size"] == self._segment_size
1020         assert r.uri_extension_data["size"] == self._size
1021         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1022                                              uri_extension_hash=r.uri_extension_hash,
1023                                              needed_shares=self._needed_shares,
1024                                              total_shares=self._total_shares, size=self._size
1025                                              ).to_string()
1026         now = time.time()
1027         r.file_size = self._size
1028         r.timings["storage_index"] = self._storage_index_elapsed
1029         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1030         if "total" in r.timings:
1031             r.timings["helper_total"] = r.timings["total"]
1032         r.timings["total"] = now - self._started
1033         self._upload_status.set_status("Done")
1034         self._upload_status.set_results(r)
1035         return r
1036
1037     def get_upload_status(self):
1038         return self._upload_status
1039
1040 class BaseUploadable:
1041     default_max_segment_size = 128*KiB # overridden by max_segment_size
1042     default_encoding_param_k = 3 # overridden by encoding_parameters
1043     default_encoding_param_happy = 7
1044     default_encoding_param_n = 10
1045
1046     max_segment_size = None
1047     encoding_param_k = None
1048     encoding_param_happy = None
1049     encoding_param_n = None
1050
1051     _all_encoding_parameters = None
1052     _status = None
1053
1054     def set_upload_status(self, upload_status):
1055         self._status = IUploadStatus(upload_status)
1056
1057     def set_default_encoding_parameters(self, default_params):
1058         assert isinstance(default_params, dict)
1059         for k,v in default_params.items():
1060             precondition(isinstance(k, str), k, v)
1061             precondition(isinstance(v, int), k, v)
1062         if "k" in default_params:
1063             self.default_encoding_param_k = default_params["k"]
1064         if "happy" in default_params:
1065             self.default_encoding_param_happy = default_params["happy"]
1066         if "n" in default_params:
1067             self.default_encoding_param_n = default_params["n"]
1068         if "max_segment_size" in default_params:
1069             self.default_max_segment_size = default_params["max_segment_size"]
1070
1071     def get_all_encoding_parameters(self):
1072         if self._all_encoding_parameters:
1073             return defer.succeed(self._all_encoding_parameters)
1074
1075         max_segsize = self.max_segment_size or self.default_max_segment_size
1076         k = self.encoding_param_k or self.default_encoding_param_k
1077         happy = self.encoding_param_happy or self.default_encoding_param_happy
1078         n = self.encoding_param_n or self.default_encoding_param_n
1079
1080         d = self.get_size()
1081         def _got_size(file_size):
1082             # for small files, shrink the segment size to avoid wasting space
1083             segsize = min(max_segsize, file_size)
1084             # this must be a multiple of 'required_shares'==k
1085             segsize = mathutil.next_multiple(segsize, k)
1086             encoding_parameters = (k, happy, n, segsize)
1087             self._all_encoding_parameters = encoding_parameters
1088             return encoding_parameters
1089         d.addCallback(_got_size)
1090         return d
1091
1092 class FileHandle(BaseUploadable):
1093     implements(IUploadable)
1094
1095     def __init__(self, filehandle, convergence):
1096         """
1097         Upload the data from the filehandle.  If convergence is None then a
1098         random encryption key will be used, else the plaintext will be hashed,
1099         then the hash will be hashed together with the string in the
1100         "convergence" argument to form the encryption key.
1101         """
1102         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1103         self._filehandle = filehandle
1104         self._key = None
1105         self.convergence = convergence
1106         self._size = None
1107
1108     def _get_encryption_key_convergent(self):
1109         if self._key is not None:
1110             return defer.succeed(self._key)
1111
1112         d = self.get_size()
1113         # that sets self._size as a side-effect
1114         d.addCallback(lambda size: self.get_all_encoding_parameters())
1115         def _got(params):
1116             k, happy, n, segsize = params
1117             f = self._filehandle
1118             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1119             f.seek(0)
1120             BLOCKSIZE = 64*1024
1121             bytes_read = 0
1122             while True:
1123                 data = f.read(BLOCKSIZE)
1124                 if not data:
1125                     break
1126                 enckey_hasher.update(data)
1127                 # TODO: setting progress in a non-yielding loop is kind of
1128                 # pointless, but I'm anticipating (perhaps prematurely) the
1129                 # day when we use a slowjob or twisted's CooperatorService to
1130                 # make this yield time to other jobs.
1131                 bytes_read += len(data)
1132                 if self._status:
1133                     self._status.set_progress(0, float(bytes_read)/self._size)
1134             f.seek(0)
1135             self._key = enckey_hasher.digest()
1136             if self._status:
1137                 self._status.set_progress(0, 1.0)
1138             assert len(self._key) == 16
1139             return self._key
1140         d.addCallback(_got)
1141         return d
1142
1143     def _get_encryption_key_random(self):
1144         if self._key is None:
1145             self._key = os.urandom(16)
1146         return defer.succeed(self._key)
1147
1148     def get_encryption_key(self):
1149         if self.convergence is not None:
1150             return self._get_encryption_key_convergent()
1151         else:
1152             return self._get_encryption_key_random()
1153
1154     def get_size(self):
1155         if self._size is not None:
1156             return defer.succeed(self._size)
1157         self._filehandle.seek(0,2)
1158         size = self._filehandle.tell()
1159         self._size = size
1160         self._filehandle.seek(0)
1161         return defer.succeed(size)
1162
1163     def read(self, length):
1164         return defer.succeed([self._filehandle.read(length)])
1165
1166     def close(self):
1167         # the originator of the filehandle reserves the right to close it
1168         pass
1169
1170 class FileName(FileHandle):
1171     def __init__(self, filename, convergence):
1172         """
1173         Upload the data from the filename.  If convergence is None then a
1174         random encryption key will be used, else the plaintext will be hashed,
1175         then the hash will be hashed together with the string in the
1176         "convergence" argument to form the encryption key.
1177         """
1178         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1179         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1180     def close(self):
1181         FileHandle.close(self)
1182         self._filehandle.close()
1183
1184 class Data(FileHandle):
1185     def __init__(self, data, convergence):
1186         """
1187         Upload the data from the data argument.  If convergence is None then a
1188         random encryption key will be used, else the plaintext will be hashed,
1189         then the hash will be hashed together with the string in the
1190         "convergence" argument to form the encryption key.
1191         """
1192         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1193         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1194
1195 class Uploader(service.MultiService, log.PrefixingLogMixin):
1196     """I am a service that allows file uploading. I am a service-child of the
1197     Client.
1198     """
1199     implements(IUploader)
1200     name = "uploader"
1201     URI_LIT_SIZE_THRESHOLD = 55
1202
1203     def __init__(self, helper_furl=None, stats_provider=None):
1204         self._helper_furl = helper_furl
1205         self.stats_provider = stats_provider
1206         self._helper = None
1207         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1208         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1209         service.MultiService.__init__(self)
1210
1211     def startService(self):
1212         service.MultiService.startService(self)
1213         if self._helper_furl:
1214             self.parent.tub.connectTo(self._helper_furl,
1215                                       self._got_helper)
1216
1217     def _got_helper(self, helper):
1218         self.log("got helper connection, getting versions")
1219         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1220                     { },
1221                     "application-version": "unknown: no get_version()",
1222                     }
1223         d = add_version_to_remote_reference(helper, default)
1224         d.addCallback(self._got_versioned_helper)
1225
1226     def _got_versioned_helper(self, helper):
1227         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1228         if needed not in helper.version:
1229             raise InsufficientVersionError(needed, helper.version)
1230         self._helper = helper
1231         helper.notifyOnDisconnect(self._lost_helper)
1232
1233     def _lost_helper(self):
1234         self._helper = None
1235
1236     def get_helper_info(self):
1237         # return a tuple of (helper_furl_or_None, connected_bool)
1238         return (self._helper_furl, bool(self._helper))
1239
1240
1241     def upload(self, uploadable, history=None):
1242         """
1243         Returns a Deferred that will fire with the UploadResults instance.
1244         """
1245         assert self.parent
1246         assert self.running
1247
1248         uploadable = IUploadable(uploadable)
1249         d = uploadable.get_size()
1250         def _got_size(size):
1251             default_params = self.parent.get_encoding_parameters()
1252             precondition(isinstance(default_params, dict), default_params)
1253             precondition("max_segment_size" in default_params, default_params)
1254             uploadable.set_default_encoding_parameters(default_params)
1255
1256             if self.stats_provider:
1257                 self.stats_provider.count('uploader.files_uploaded', 1)
1258                 self.stats_provider.count('uploader.bytes_uploaded', size)
1259
1260             if size <= self.URI_LIT_SIZE_THRESHOLD:
1261                 uploader = LiteralUploader(self.parent)
1262                 return uploader.start(uploadable)
1263             else:
1264                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1265                 d2 = defer.succeed(None)
1266                 if self._helper:
1267                     uploader = AssistedUploader(self._helper)
1268                     d2.addCallback(lambda x: eu.get_storage_index())
1269                     d2.addCallback(lambda si: uploader.start(eu, si))
1270                 else:
1271                     uploader = CHKUploader(self.parent)
1272                     d2.addCallback(lambda x: uploader.start(eu))
1273
1274                 self._all_uploads[uploader] = None
1275                 if history:
1276                     history.add_upload(uploader.get_upload_status())
1277                 def turn_verifycap_into_read_cap(uploadresults):
1278                     # Generate the uri from the verifycap plus the key.
1279                     d3 = uploadable.get_encryption_key()
1280                     def put_readcap_into_results(key):
1281                         v = uri.from_string(uploadresults.verifycapstr)
1282                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1283                         uploadresults.uri = r.to_string()
1284                         return uploadresults
1285                     d3.addCallback(put_readcap_into_results)
1286                     return d3
1287                 d2.addCallback(turn_verifycap_into_read_cap)
1288                 return d2
1289         d.addCallback(_got_size)
1290         def _done(res):
1291             uploadable.close()
1292             return res
1293         d.addBoth(_done)
1294         return d