]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
c3913d357581415738c77d8d177b21a4f6a73c5f
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from zope.interface import Interface, Attribute, implementer
13
14 from allmydata.util import fileutil
15 from allmydata.interfaces import IDirectoryNode
16 from allmydata.util import log
17 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
18 from allmydata.util.assertutil import precondition, _assert
19 from allmydata.util.deferredutil import HookMixin
20 from allmydata.util.progress import PercentProgress
21 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
22      extend_filepath, unicode_from_filepath, unicode_segments_from, \
23      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
24 from allmydata.immutable.upload import FileName, Data
25 from allmydata import magicfolderdb, magicpath
26
27 defer.setDebugging(True)
28 IN_EXCL_UNLINK = 0x04000000L
29
30 def get_inotify_module():
31     try:
32         if sys.platform == "win32":
33             from allmydata.windows import inotify
34         elif runtime.platform.supportsINotify():
35             from twisted.internet import inotify
36         else:
37             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
38                                       "This currently requires Linux or Windows.")
39         return inotify
40     except (ImportError, AttributeError) as e:
41         log.msg(e)
42         if sys.platform == "win32":
43             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
44                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
45         raise
46
47
48 def is_new_file(pathinfo, db_entry):
49     if db_entry is None:
50         return True
51
52     if not pathinfo.exists and db_entry.size is None:
53         return False
54
55     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
56             (db_entry.size, db_entry.ctime, db_entry.mtime))
57
58
59 class MagicFolder(service.MultiService):
60     name = 'magic-folder'
61
62     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
63                  pending_delay=1.0, clock=None):
64         precondition_abspath(local_path_u)
65
66         service.MultiService.__init__(self)
67
68         immediate = clock is not None
69         clock = clock or reactor
70         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
71         if db is None:
72             return Failure(Exception('ERROR: Unable to load magic folder db.'))
73
74         # for tests
75         self._client = client
76         self._db = db
77
78         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
79         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
80
81         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
82         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
83                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
84
85     def startService(self):
86         # TODO: why is this being called more than once?
87         if self.running:
88             return defer.succeed(None)
89         print "%r.startService" % (self,)
90         service.MultiService.startService(self)
91         return self.uploader.start_monitoring()
92
93     def ready(self):
94         """ready is used to signal us to start
95         processing the upload and download items...
96         """
97         self.uploader.start_uploading()  # synchronous
98         return self.downloader.start_downloading()
99
100     def finish(self):
101         print "finish"
102         d = self.uploader.stop()
103         d2 = self.downloader.stop()
104         d.addCallback(lambda ign: d2)
105         return d
106
107     def remove_service(self):
108         return service.MultiService.disownServiceParent(self)
109
110
111 class QueueMixin(HookMixin):
112     def __init__(self, client, local_path_u, db, name, clock):
113         self._client = client
114         self._local_path_u = local_path_u
115         self._local_filepath = to_filepath(local_path_u)
116         self._db = db
117         self._name = name
118         self._clock = clock
119         self._hooks = {'processed': None, 'started': None}
120         self.started_d = self.set_hook('started')
121
122         if not self._local_filepath.exists():
123             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
124                                  "but there is no directory at that location."
125                                  % quote_local_unicode_path(self._local_path_u))
126         if not self._local_filepath.isdir():
127             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
128                                  "but the thing at that location is not a directory."
129                                  % quote_local_unicode_path(self._local_path_u))
130
131         self._deque = deque()
132         # do we also want to bound on "maximum age"?
133         self._process_history = deque(maxlen=10)
134         self._lazy_tail = defer.succeed(None)
135         self._stopped = False
136         self._turn_delay = 0
137
138     def get_status(self):
139         """
140         Returns an iterable of instances that implement IQueuedItem
141         """
142         for item in self._deque:
143             yield item
144         for item in self._process_history:
145             yield item
146
147     def _get_filepath(self, relpath_u):
148         self._log("_get_filepath(%r)" % (relpath_u,))
149         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
150
151     def _get_relpath(self, filepath):
152         self._log("_get_relpath(%r)" % (filepath,))
153         segments = unicode_segments_from(filepath, self._local_filepath)
154         self._log("segments = %r" % (segments,))
155         return u"/".join(segments)
156
157     def _count(self, counter_name, delta=1):
158         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
159         self._log("%s += %r" % (counter_name, delta))
160         self._client.stats_provider.count(ctr, delta)
161
162     def _logcb(self, res, msg):
163         self._log("%s: %r" % (msg, res))
164         return res
165
166     def _log(self, msg):
167         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
168         self._client.log(s)
169         print s
170         #open("events", "ab+").write(msg)
171
172     def _turn_deque(self):
173         try:
174             self._log("_turn_deque")
175             if self._stopped:
176                 self._log("stopped")
177                 return
178             try:
179                 item = IQueuedItem(self._deque.pop())
180                 self._process_history.append(item)
181
182                 self._log("popped %r, now have %d" % (item, len(self._deque)))
183                 self._count('objects_queued', -1)
184             except IndexError:
185                 self._log("deque is now empty")
186                 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
187                 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
188                 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
189             else:
190                 self._log("_turn_deque else clause")
191                 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
192                 self._lazy_tail.addCallback(lambda ign: self._process(item))
193                 self._lazy_tail.addBoth(self._logcb, "got past _process")
194                 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
195                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
196                 self._lazy_tail.addErrback(log.err)
197                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
198                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
199         except Exception as e:
200             self._log("---- turn deque exception %s" % (e,))
201             raise
202
203
204 # this isn't in interfaces.py because it's very specific to QueueMixin
205 class IQueuedItem(Interface):
206     relpath_u = Attribute("The path this item represents")
207     progress = Attribute("A PercentProgress instance")
208
209     def set_status(self, status, current_time=None):
210         """
211         """
212
213     def status_time(self, state):
214         """
215         Get the time of particular state change, or None
216         """
217
218     def status_history(self):
219         """
220         All status changes, sorted latest -> oldest
221         """
222
223
224 @implementer(IQueuedItem)
225 class QueuedItem(object):
226     def __init__(self, relpath_u, progress):
227         self.relpath_u = relpath_u
228         self.progress = progress
229         self._status_history = dict()
230
231     def set_status(self, status, current_time=None):
232         if current_time is None:
233             current_time = time.time()
234         self._status_history[status] = current_time
235
236     def status_time(self, state):
237         """
238         Returns None if there's no status-update for 'state', else returns
239         the timestamp when that state was reached.
240         """
241         return self._status_history.get(state, None)
242
243     def status_history(self):
244         """
245         Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
246         """
247         hist = self._status_history.items()
248         hist.sort(lambda a, b: cmp(a[1], b[1]))
249         return hist
250
251
252 class UploadItem(QueuedItem):
253     pass
254
255
256 class Uploader(QueueMixin):
257     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
258                  immediate=False):
259         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
260
261         self.is_ready = False
262         self._immediate = immediate
263
264         if not IDirectoryNode.providedBy(upload_dirnode):
265             raise AssertionError("The URI in '%s' does not refer to a directory."
266                                  % os.path.join('private', 'magic_folder_dircap'))
267         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
268             raise AssertionError("The URI in '%s' is not a writecap to a directory."
269                                  % os.path.join('private', 'magic_folder_dircap'))
270
271         self._upload_dirnode = upload_dirnode
272         self._inotify = get_inotify_module()
273         self._notifier = self._inotify.INotify()
274         self._pending = set()  # of unicode relpaths
275
276         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
277
278         if hasattr(self._notifier, 'set_pending_delay'):
279             self._notifier.set_pending_delay(pending_delay)
280
281         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
282         #
283         self.mask = ( self._inotify.IN_CREATE
284                     | self._inotify.IN_CLOSE_WRITE
285                     | self._inotify.IN_MOVED_TO
286                     | self._inotify.IN_MOVED_FROM
287                     | self._inotify.IN_DELETE
288                     | self._inotify.IN_ONLYDIR
289                     | IN_EXCL_UNLINK
290                     )
291         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
292                              recursive=True)
293
294     def start_monitoring(self):
295         self._log("start_monitoring")
296         d = defer.succeed(None)
297         d.addCallback(lambda ign: self._notifier.startReading())
298         d.addCallback(lambda ign: self._count('dirs_monitored'))
299         d.addBoth(self._call_hook, 'started')
300         return d
301
302     def stop(self):
303         self._log("stop")
304         self._notifier.stopReading()
305         self._count('dirs_monitored', -1)
306         self.periodic_callid.cancel()
307         if hasattr(self._notifier, 'wait_until_stopped'):
308             d = self._notifier.wait_until_stopped()
309         else:
310             d = defer.succeed(None)
311         d.addCallback(lambda ign: self._lazy_tail)
312         return d
313
314     def start_uploading(self):
315         self._log("start_uploading")
316         self.is_ready = True
317
318         all_relpaths = self._db.get_all_relpaths()
319         self._log("all relpaths: %r" % (all_relpaths,))
320
321         for relpath_u in all_relpaths:
322             self._add_pending(relpath_u)
323
324         self._full_scan()
325
326     def _extend_queue_and_keep_going(self, relpaths_u):
327         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
328         for relpath_u in relpaths_u:
329             progress = PercentProgress()
330             item = UploadItem(relpath_u, progress)
331             item.set_status('queued', self._clock.seconds())
332             self._deque.append(item)
333
334         self._count('objects_queued', len(relpaths_u))
335
336         if self.is_ready:
337             if self._immediate:  # for tests
338                 self._turn_deque()
339             else:
340                 self._clock.callLater(0, self._turn_deque)
341
342     def _full_scan(self):
343         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
344         print "FULL SCAN"
345         self._log("_pending %r" % (self._pending))
346         self._scan(u"")
347         self._extend_queue_and_keep_going(self._pending)
348
349     def _add_pending(self, relpath_u):
350         self._log("add pending %r" % (relpath_u,))
351         if not magicpath.should_ignore_file(relpath_u):
352             self._pending.add(relpath_u)
353
354     def _scan(self, reldir_u):
355         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
356         # Note that this doesn't add them to the deque -- that will
357
358         self._log("scan %r" % (reldir_u,))
359         fp = self._get_filepath(reldir_u)
360         try:
361             children = listdir_filepath(fp)
362         except EnvironmentError:
363             raise Exception("WARNING: magic folder: permission denied on directory %s"
364                             % quote_filepath(fp))
365         except FilenameEncodingError:
366             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
367                             % quote_filepath(fp))
368
369         for child in children:
370             _assert(isinstance(child, unicode), child=child)
371             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
372
373     def is_pending(self, relpath_u):
374         return relpath_u in self._pending
375
376     def _notify(self, opaque, path, events_mask):
377         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
378         relpath_u = self._get_relpath(path)
379
380         # We filter out IN_CREATE events not associated with a directory.
381         # Acting on IN_CREATE for files could cause us to read and upload
382         # a possibly-incomplete file before the application has closed it.
383         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
384         # It isn't possible to avoid watching for IN_CREATE at all, because
385         # it is the only event notified for a directory creation.
386
387         if ((events_mask & self._inotify.IN_CREATE) != 0 and
388             (events_mask & self._inotify.IN_ISDIR) == 0):
389             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
390             return
391         if relpath_u in self._pending:
392             self._log("not queueing %r because it is already pending" % (relpath_u,))
393             return
394         if magicpath.should_ignore_file(relpath_u):
395             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
396             return
397
398         self._pending.add(relpath_u)
399         self._extend_queue_and_keep_going([relpath_u])
400
401     def _when_queue_is_empty(self):
402         return defer.succeed(None)
403
404     def _process(self, item):
405         # Uploader
406         relpath_u = item.relpath_u
407         self._log("_process(%r)" % (relpath_u,))
408         item.set_status('started', self._clock.seconds())
409
410         if relpath_u is None:
411             item.set_status('invalid_path', self._clock.seconds())
412             return
413         precondition(isinstance(relpath_u, unicode), relpath_u)
414         precondition(not relpath_u.endswith(u'/'), relpath_u)
415
416         d = defer.succeed(None)
417
418         def _maybe_upload(ign, now=None):
419             self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
420             if now is None:
421                 now = time.time()
422             fp = self._get_filepath(relpath_u)
423             pathinfo = get_pathinfo(unicode_from_filepath(fp))
424
425             self._log("about to remove %r from pending set %r" %
426                       (relpath_u, self._pending))
427             self._pending.remove(relpath_u)
428             encoded_path_u = magicpath.path2magic(relpath_u)
429
430             if not pathinfo.exists:
431                 # FIXME merge this with the 'isfile' case.
432                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
433                 self._count('objects_disappeared')
434
435                 db_entry = self._db.get_db_entry(relpath_u)
436                 if db_entry is None:
437                     return None
438
439                 last_downloaded_timestamp = now  # is this correct?
440
441                 if is_new_file(pathinfo, db_entry):
442                     new_version = db_entry.version + 1
443                 else:
444                     self._log("Not uploading %r" % (relpath_u,))
445                     self._count('objects_not_uploaded')
446                     return
447
448                 metadata = { 'version': new_version,
449                              'deleted': True,
450                              'last_downloaded_timestamp': last_downloaded_timestamp }
451                 if db_entry.last_downloaded_uri is not None:
452                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
453
454                 empty_uploadable = Data("", self._client.convergence)
455                 d2 = self._upload_dirnode.add_file(
456                     encoded_path_u, empty_uploadable,
457                     metadata=metadata,
458                     overwrite=True,
459                     progress=item.progress,
460                 )
461
462                 def _add_db_entry(filenode):
463                     filecap = filenode.get_uri()
464                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
465                     self._db.did_upload_version(relpath_u, new_version, filecap,
466                                                 last_downloaded_uri, last_downloaded_timestamp,
467                                                 pathinfo)
468                     self._count('files_uploaded')
469                 d2.addCallback(_add_db_entry)
470                 return d2
471             elif pathinfo.islink:
472                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
473                 return None
474             elif pathinfo.isdir:
475                 print "ISDIR "
476                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
477                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
478
479                 uploadable = Data("", self._client.convergence)
480                 encoded_path_u += magicpath.path2magic(u"/")
481                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
482                 upload_d = self._upload_dirnode.add_file(
483                     encoded_path_u, uploadable,
484                     metadata={"version": 0},
485                     overwrite=True,
486                     progress=item.progress,
487                 )
488                 def _dir_succeeded(ign):
489                     self._log("created subdirectory %r" % (relpath_u,))
490                     self._count('directories_created')
491                 def _dir_failed(f):
492                     self._log("failed to create subdirectory %r" % (relpath_u,))
493                     return f
494                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
495                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
496                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
497                 return upload_d
498             elif pathinfo.isfile:
499                 db_entry = self._db.get_db_entry(relpath_u)
500
501                 last_downloaded_timestamp = now
502
503                 if db_entry is None:
504                     new_version = 0
505                 elif is_new_file(pathinfo, db_entry):
506                     new_version = db_entry.version + 1
507                 else:
508                     self._log("Not uploading %r" % (relpath_u,))
509                     self._count('objects_not_uploaded')
510                     return None
511
512                 metadata = { 'version': new_version,
513                              'last_downloaded_timestamp': last_downloaded_timestamp }
514                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
515                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
516
517                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
518                 d2 = self._upload_dirnode.add_file(
519                     encoded_path_u, uploadable,
520                     metadata=metadata,
521                     overwrite=True,
522                     progress=item.progress,
523                 )
524
525                 def _add_db_entry(filenode):
526                     filecap = filenode.get_uri()
527                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
528                     self._db.did_upload_version(relpath_u, new_version, filecap,
529                                                 last_downloaded_uri, last_downloaded_timestamp,
530                                                 pathinfo)
531                     self._count('files_uploaded')
532                 d2.addCallback(_add_db_entry)
533                 return d2
534             else:
535                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
536                 return None
537
538         d.addCallback(_maybe_upload)
539
540         def _succeeded(res):
541             self._count('objects_succeeded')
542             item.set_status('success', self._clock.seconds())
543             return res
544         def _failed(f):
545             self._count('objects_failed')
546             self._log("%s while processing %r" % (f, relpath_u))
547             item.set_status('failure', self._clock.seconds())
548             return f
549         d.addCallbacks(_succeeded, _failed)
550         return d
551
552     def _get_metadata(self, encoded_path_u):
553         try:
554             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
555         except KeyError:
556             return Failure()
557         return d
558
559     def _get_filenode(self, encoded_path_u):
560         try:
561             d = self._upload_dirnode.get(encoded_path_u)
562         except KeyError:
563             return Failure()
564         return d
565
566
567 class WriteFileMixin(object):
568     FUDGE_SECONDS = 10.0
569
570     def _get_conflicted_filename(self, abspath_u):
571         return abspath_u + u".conflict"
572
573     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
574         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
575                   % (abspath_u, len(file_contents), is_conflict, now))
576
577         # 1. Write a temporary file, say .foo.tmp.
578         # 2. is_conflict determines whether this is an overwrite or a conflict.
579         # 3. Set the mtime of the replacement file to be T seconds before the
580         #    current local time.
581         # 4. Perform a file replacement with backup filename foo.backup,
582         #    replaced file foo, and replacement file .foo.tmp. If any step of
583         #    this operation fails, reclassify as a conflict and stop.
584         #
585         # Returns the path of the destination file.
586
587         precondition_abspath(abspath_u)
588         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
589         backup_path_u = abspath_u + u".backup"
590         if now is None:
591             now = time.time()
592
593         # ensure parent directory exists
594         head, tail = os.path.split(abspath_u)
595
596         old_mask = os.umask(self._umask)
597         try:
598             fileutil.make_dirs(head, (~ self._umask) & 0777)
599             fileutil.write(replacement_path_u, file_contents)
600         finally:
601             os.umask(old_mask)
602
603         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
604         if is_conflict:
605             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
606             return self._rename_conflicted_file(abspath_u, replacement_path_u)
607         else:
608             try:
609                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
610                 return abspath_u
611             except fileutil.ConflictError:
612                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
613
614     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
615         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
616
617         conflict_path_u = self._get_conflicted_filename(abspath_u)
618         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
619         if os.path.isfile(replacement_path_u):
620             print "%r exists" % (replacement_path_u,)
621         if os.path.isfile(conflict_path_u):
622             print "%r exists" % (conflict_path_u,)
623
624         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
625         return conflict_path_u
626
627     def _rename_deleted_file(self, abspath_u):
628         self._log('renaming deleted file to backup: %s' % (abspath_u,))
629         try:
630             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
631         except OSError:
632             self._log("Already gone: '%s'" % (abspath_u,))
633         return abspath_u
634
635
636 class DownloadItem(QueuedItem):
637     def __init__(self, relpath_u, progress, filenode, metadata):
638         super(DownloadItem, self).__init__(relpath_u, progress)
639         self.file_node = filenode
640         self.metadata = metadata
641
642
643 class Downloader(QueueMixin, WriteFileMixin):
644     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
645
646     def __init__(self, client, local_path_u, db, collective_dirnode,
647                  upload_readonly_dircap, clock, is_upload_pending, umask):
648         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
649
650         if not IDirectoryNode.providedBy(collective_dirnode):
651             raise AssertionError("The URI in '%s' does not refer to a directory."
652                                  % os.path.join('private', 'collective_dircap'))
653         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
654             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
655                                  % os.path.join('private', 'collective_dircap'))
656
657         self._collective_dirnode = collective_dirnode
658         self._upload_readonly_dircap = upload_readonly_dircap
659         self._is_upload_pending = is_upload_pending
660         self._umask = umask
661
662     def start_downloading(self):
663         self._log("start_downloading")
664         self._turn_delay = self.REMOTE_SCAN_INTERVAL
665         files = self._db.get_all_relpaths()
666         self._log("all files %s" % files)
667
668         d = self._scan_remote_collective(scan_self=True)
669         d.addBoth(self._logcb, "after _scan_remote_collective 0")
670         self._turn_deque()
671         return d
672
673     def stop(self):
674         self._log("stop")
675         self._stopped = True
676         d = defer.succeed(None)
677         d.addCallback(lambda ign: self._lazy_tail)
678         return d
679
680     def _should_download(self, relpath_u, remote_version):
681         """
682         _should_download returns a bool indicating whether or not a remote object should be downloaded.
683         We check the remote metadata version against our magic-folder db version number;
684         latest version wins.
685         """
686         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
687         if magicpath.should_ignore_file(relpath_u):
688             self._log("nope")
689             return False
690         self._log("yep")
691         db_entry = self._db.get_db_entry(relpath_u)
692         if db_entry is None:
693             return True
694         self._log("version %r" % (db_entry.version,))
695         return (db_entry.version < remote_version)
696
697     def _get_local_latest(self, relpath_u):
698         """
699         _get_local_latest takes a unicode path string checks to see if this file object
700         exists in our magic-folder db; if not then return None
701         else check for an entry in our magic-folder db and return the version number.
702         """
703         if not self._get_filepath(relpath_u).exists():
704             return None
705         db_entry = self._db.get_db_entry(relpath_u)
706         return None if db_entry is None else db_entry.version
707
708     def _get_collective_latest_file(self, filename):
709         """
710         _get_collective_latest_file takes a file path pointing to a file managed by
711         magic-folder and returns a deferred that fires with the two tuple containing a
712         file node and metadata for the latest version of the file located in the
713         magic-folder collective directory.
714         """
715         collective_dirmap_d = self._collective_dirnode.list()
716         def scan_collective(result):
717             list_of_deferreds = []
718             for dir_name in result.keys():
719                 # XXX make sure it's a directory
720                 d = defer.succeed(None)
721                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
722                 list_of_deferreds.append(d)
723             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
724             return deferList
725         collective_dirmap_d.addCallback(scan_collective)
726         def highest_version(deferredList):
727             max_version = 0
728             metadata = None
729             node = None
730             for success, result in deferredList:
731                 if success:
732                     if result[1]['version'] > max_version:
733                         node, metadata = result
734                         max_version = result[1]['version']
735             return node, metadata
736         collective_dirmap_d.addCallback(highest_version)
737         return collective_dirmap_d
738
739     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
740         self._log("_scan_remote_dmd nickname %r" % (nickname,))
741         d = dirnode.list()
742         def scan_listing(listing_map):
743             for encoded_relpath_u in listing_map.keys():
744                 relpath_u = magicpath.magic2path(encoded_relpath_u)
745                 self._log("found %r" % (relpath_u,))
746
747                 file_node, metadata = listing_map[encoded_relpath_u]
748                 local_version = self._get_local_latest(relpath_u)
749                 remote_version = metadata.get('version', None)
750                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
751
752                 if local_version is None or remote_version is None or local_version < remote_version:
753                     self._log("%r added to download queue" % (relpath_u,))
754                     if scan_batch.has_key(relpath_u):
755                         scan_batch[relpath_u] += [(file_node, metadata)]
756                     else:
757                         scan_batch[relpath_u] = [(file_node, metadata)]
758
759         d.addCallback(scan_listing)
760         d.addBoth(self._logcb, "end of _scan_remote_dmd")
761         return d
762
763     def _scan_remote_collective(self, scan_self=False):
764         self._log("_scan_remote_collective")
765         scan_batch = {}  # path -> [(filenode, metadata)]
766
767         d = self._collective_dirnode.list()
768         def scan_collective(dirmap):
769             d2 = defer.succeed(None)
770             for dir_name in dirmap:
771                 (dirnode, metadata) = dirmap[dir_name]
772                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
773                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
774                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
775                     def _err(f, dir_name=dir_name):
776                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
777                         # XXX what should we do to make this failure more visible to users?
778                     d2.addErrback(_err)
779
780             return d2
781         d.addCallback(scan_collective)
782
783         def _filter_batch_to_deque(ign):
784             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
785             for relpath_u in scan_batch.keys():
786                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
787
788                 if self._should_download(relpath_u, metadata['version']):
789                     to_dl = DownloadItem(
790                         relpath_u,
791                         PercentProgress(file_node.get_size()),
792                         file_node,
793                         metadata,
794                     )
795                     to_dl.set_status('queued', self._clock.seconds())
796                     self._deque.append(to_dl)
797                 else:
798                     self._log("Excluding %r" % (relpath_u,))
799                     self._call_hook(None, 'processed', async=True)
800
801             self._log("deque after = %r" % (self._deque,))
802         d.addCallback(_filter_batch_to_deque)
803         return d
804
805     def _when_queue_is_empty(self):
806         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
807         d.addBoth(self._logcb, "after _scan_remote_collective 1")
808         d.addCallback(lambda ign: self._turn_deque())
809         return d
810
811     def _process(self, item, now=None):
812         # Downloader
813         self._log("_process(%r)" % (item,))
814         if now is None:  # XXX why can we pass in now?
815             now = time.time()  # self._clock.seconds()
816
817         self._log("started! %s" % (now,))
818         item.set_status('started', now)
819         fp = self._get_filepath(item.relpath_u)
820         abspath_u = unicode_from_filepath(fp)
821         conflict_path_u = self._get_conflicted_filename(abspath_u)
822
823         d = defer.succeed(None)
824
825         def do_update_db(written_abspath_u):
826             filecap = item.file_node.get_uri()
827             last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
828             last_downloaded_uri = filecap
829             last_downloaded_timestamp = now
830             written_pathinfo = get_pathinfo(written_abspath_u)
831
832             if not written_pathinfo.exists and not item.metadata.get('deleted', False):
833                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
834
835             self._db.did_upload_version(
836                 item.relpath_u, item.metadata['version'], last_uploaded_uri,
837                 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
838             )
839             self._count('objects_downloaded')
840             item.set_status('success', self._clock.seconds())
841
842         def failed(f):
843             item.set_status('failure', self._clock.seconds())
844             self._log("download failed: %s" % (str(f),))
845             self._count('objects_failed')
846             return f
847
848         if os.path.isfile(conflict_path_u):
849             def fail(res):
850                 raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
851             d.addCallback(fail)
852         else:
853             is_conflict = False
854             db_entry = self._db.get_db_entry(item.relpath_u)
855             dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
856             dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
857             if db_entry:
858                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
859                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
860                         is_conflict = True
861                         self._count('objects_conflicted')
862                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
863                     is_conflict = True
864                     self._count('objects_conflicted')
865                 elif self._is_upload_pending(item.relpath_u):
866                     is_conflict = True
867                     self._count('objects_conflicted')
868
869             if item.relpath_u.endswith(u"/"):
870                 if item.metadata.get('deleted', False):
871                     self._log("rmdir(%r) ignored" % (abspath_u,))
872                 else:
873                     self._log("mkdir(%r)" % (abspath_u,))
874                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
875                     d.addCallback(lambda ign: abspath_u)
876             else:
877                 if item.metadata.get('deleted', False):
878                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
879                 else:
880                     d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
881                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
882                                                                                is_conflict=is_conflict))
883
884         d.addCallbacks(do_update_db, failed)
885
886         def trap_conflicts(f):
887             f.trap(ConflictError)
888             return None
889         d.addErrback(trap_conflicts)
890         return d