]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Daira's excellent fixes to the uploader from our pairing session
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         self._log("_turn_deque")
160         if self._stopped:
161             self._log("stopped")
162             return
163         try:
164             item = self._deque.pop()
165             self._log("popped %r" % (item,))
166             self._count('objects_queued', -1)
167         except IndexError:
168             self._log("deque is now empty")
169             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170         else:
171             self._lazy_tail.addCallback(lambda ign: self._process(item))
172             self._lazy_tail.addBoth(self._call_hook, 'processed')
173             self._lazy_tail.addErrback(log.err)
174             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
175
176
177 class Uploader(QueueMixin):
178     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
179                  immediate=False):
180         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
181
182         self.is_ready = False
183         self._immediate = immediate
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195         self._pending = set()  # of unicode relpaths
196
197         if hasattr(self._notifier, 'set_pending_delay'):
198             self._notifier.set_pending_delay(pending_delay)
199
200         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
201         #
202         self.mask = ( self._inotify.IN_CREATE
203                     | self._inotify.IN_CLOSE_WRITE
204                     | self._inotify.IN_MOVED_TO
205                     | self._inotify.IN_MOVED_FROM
206                     | self._inotify.IN_DELETE
207                     | self._inotify.IN_ONLYDIR
208                     | IN_EXCL_UNLINK
209                     )
210         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
211                              recursive=True)
212
213     def start_monitoring(self):
214         self._log("start_monitoring")
215         d = defer.succeed(None)
216         d.addCallback(lambda ign: self._notifier.startReading())
217         d.addCallback(lambda ign: self._count('dirs_monitored'))
218         d.addBoth(self._call_hook, 'started')
219         return d
220
221     def stop(self):
222         self._log("stop")
223         self._notifier.stopReading()
224         self._count('dirs_monitored', -1)
225         if hasattr(self._notifier, 'wait_until_stopped'):
226             d = self._notifier.wait_until_stopped()
227         else:
228             d = defer.succeed(None)
229         d.addCallback(lambda ign: self._lazy_tail)
230         return d
231
232     def start_uploading(self):
233         self._log("start_uploading")
234         self.is_ready = True
235
236         all_relpaths = self._db.get_all_relpaths()
237         self._log("all relpaths: %r" % (all_relpaths,))
238
239         for relpath_u in all_relpaths:
240             self._add_pending(relpath_u)
241
242         self._full_scan()
243         self._extend_queue_and_keep_going(self._pending)
244
245     def _extend_queue_and_keep_going(self, relpaths_u):
246         self._log("queueing %r" % (relpaths_u,))
247         self._deque.extend(relpaths_u)
248         self._count('objects_queued', len(relpaths_u))
249
250         if self.is_ready:
251             if self._immediate:  # for tests
252                 self._turn_deque()
253             else:
254                 self._clock.callLater(0, self._turn_deque)
255
256     def _full_scan(self):
257         print "FULL SCAN"
258         self._log("all_files %r" % (self._pending))
259         self._scan(u"")
260
261     def _add_pending(self, relpath_u):
262         if not magicpath.should_ignore_file(relpath_u):
263             self._pending.add(relpath_u)
264
265     def _scan(self, reldir_u):
266         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
267         # Note that this doesn't add them to the deque -- that will
268
269         self._log("scan %r" % (reldir_u,))
270         fp = self._get_filepath(reldir_u)
271         try:
272             children = listdir_filepath(fp)
273         except EnvironmentError:
274             raise Exception("WARNING: magic folder: permission denied on directory %s"
275                             % quote_filepath(fp))
276         except FilenameEncodingError:
277             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
278                             % quote_filepath(fp))
279
280         for child in children:
281             _assert(isinstance(child, unicode), child=child)
282             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
283
284     def is_pending(self, relpath_u):
285         return relpath_u in self._pending
286
287     def _notify(self, opaque, path, events_mask):
288         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
289         relpath_u = self._get_relpath(path)
290
291         # We filter out IN_CREATE events not associated with a directory.
292         # Acting on IN_CREATE for files could cause us to read and upload
293         # a possibly-incomplete file before the application has closed it.
294         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
295         # It isn't possible to avoid watching for IN_CREATE at all, because
296         # it is the only event notified for a directory creation.
297
298         if ((events_mask & self._inotify.IN_CREATE) != 0 and
299             (events_mask & self._inotify.IN_ISDIR) == 0):
300             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
301             return
302         if relpath_u in self._pending:
303             self._log("not queueing %r because it is already pending" % (relpath_u,))
304             return
305         if magicpath.should_ignore_file(relpath_u):
306             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
307             return
308
309         self._pending.add(relpath_u)
310         self._extend_queue_and_keep_going([relpath_u])
311
312     def _when_queue_is_empty(self):
313         return defer.succeed(None)
314
315     def _process(self, relpath_u):
316         # Uploader
317         self._log("_process(%r)" % (relpath_u,))
318         if relpath_u is None:
319             return
320         precondition(isinstance(relpath_u, unicode), relpath_u)
321         precondition(not relpath_u.endswith(u'/'), relpath_u)
322
323         d = defer.succeed(None)
324
325         def _maybe_upload(val, now=None):
326             if now is None:
327                 now = time.time()
328             fp = self._get_filepath(relpath_u)
329             pathinfo = get_pathinfo(unicode_from_filepath(fp))
330
331             self._log("about to remove %r from pending set %r" %
332                       (relpath_u, self._pending))
333             self._pending.remove(relpath_u)
334             encoded_path_u = magicpath.path2magic(relpath_u)
335
336             if not pathinfo.exists:
337                 # FIXME merge this with the 'isfile' case.
338                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
339                 self._count('objects_disappeared')
340
341                 db_entry = self._db.get_db_entry(relpath_u)
342                 if db_entry is None:
343                     return None
344
345                 last_downloaded_timestamp = now  # is this correct?
346
347                 if is_new_file(pathinfo, db_entry):
348                     new_version = db_entry.version + 1
349                 else:
350                     self._log("Not uploading %r" % (relpath_u,))
351                     self._count('objects_not_uploaded')
352                     return
353
354                 metadata = { 'version': new_version,
355                              'deleted': True,
356                              'last_downloaded_timestamp': last_downloaded_timestamp }
357                 if db_entry.last_downloaded_uri is not None:
358                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
359
360                 empty_uploadable = Data("", self._client.convergence)
361                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
362                                                    metadata=metadata, overwrite=True)
363
364                 def _add_db_entry(filenode):
365                     filecap = filenode.get_uri()
366                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
367                     self._db.did_upload_version(relpath_u, new_version, filecap,
368                                                 last_downloaded_uri, last_downloaded_timestamp,
369                                                 pathinfo)
370                     self._count('files_uploaded')
371                 d2.addCallback(_add_db_entry)
372                 return d2
373             elif pathinfo.islink:
374                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
375                 return None
376             elif pathinfo.isdir:
377                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
378                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
379
380                 uploadable = Data("", self._client.convergence)
381                 encoded_path_u += magicpath.path2magic(u"/")
382                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
383                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
384                 def _succeeded(ign):
385                     self._log("created subdirectory %r" % (relpath_u,))
386                     self._count('directories_created')
387                 def _failed(f):
388                     self._log("failed to create subdirectory %r" % (relpath_u,))
389                     return f
390                 upload_d.addCallbacks(_succeeded, _failed)
391                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
392                 return upload_d
393             elif pathinfo.isfile:
394                 db_entry = self._db.get_db_entry(relpath_u)
395
396                 last_downloaded_timestamp = now
397
398                 if db_entry is None:
399                     new_version = 0
400                 elif is_new_file(pathinfo, db_entry):
401                     new_version = db_entry.version + 1
402                 else:
403                     self._log("Not uploading %r" % (relpath_u,))
404                     self._count('objects_not_uploaded')
405                     return None
406
407                 metadata = { 'version': new_version,
408                              'last_downloaded_timestamp': last_downloaded_timestamp }
409                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
410                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
411
412                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
413                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
414                                                    metadata=metadata, overwrite=True)
415
416                 def _add_db_entry(filenode):
417                     filecap = filenode.get_uri()
418                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
419                     self._db.did_upload_version(relpath_u, new_version, filecap,
420                                                 last_downloaded_uri, last_downloaded_timestamp,
421                                                 pathinfo)
422                     self._count('files_uploaded')
423                 d2.addCallback(_add_db_entry)
424                 return d2
425             else:
426                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
427                 return None
428
429         d.addCallback(_maybe_upload)
430
431         def _succeeded(res):
432             self._count('objects_succeeded')
433             return res
434         def _failed(f):
435             self._count('objects_failed')
436             self._log("%s while processing %r" % (f, relpath_u))
437             return f
438         d.addCallbacks(_succeeded, _failed)
439         return d
440
441     def _get_metadata(self, encoded_path_u):
442         try:
443             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
444         except KeyError:
445             return Failure()
446         return d
447
448     def _get_filenode(self, encoded_path_u):
449         try:
450             d = self._upload_dirnode.get(encoded_path_u)
451         except KeyError:
452             return Failure()
453         return d
454
455
456 class WriteFileMixin(object):
457     FUDGE_SECONDS = 10.0
458
459     def _get_conflicted_filename(self, abspath_u):
460         return abspath_u + u".conflict"
461
462     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
463         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
464                   % (abspath_u, len(file_contents), is_conflict, now))
465
466         # 1. Write a temporary file, say .foo.tmp.
467         # 2. is_conflict determines whether this is an overwrite or a conflict.
468         # 3. Set the mtime of the replacement file to be T seconds before the
469         #    current local time.
470         # 4. Perform a file replacement with backup filename foo.backup,
471         #    replaced file foo, and replacement file .foo.tmp. If any step of
472         #    this operation fails, reclassify as a conflict and stop.
473         #
474         # Returns the path of the destination file.
475
476         precondition_abspath(abspath_u)
477         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
478         backup_path_u = abspath_u + u".backup"
479         if now is None:
480             now = time.time()
481
482         # ensure parent directory exists
483         head, tail = os.path.split(abspath_u)
484
485         old_mask = os.umask(self._umask)
486         try:
487             fileutil.make_dirs(head, (~ self._umask) & 0777)
488             fileutil.write(replacement_path_u, file_contents)
489         finally:
490             os.umask(old_mask)
491
492         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
493         if is_conflict:
494             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
495             return self._rename_conflicted_file(abspath_u, replacement_path_u)
496         else:
497             try:
498                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
499                 return abspath_u
500             except fileutil.ConflictError:
501                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
502
503     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
504         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
505
506         conflict_path_u = self._get_conflicted_filename(abspath_u)
507         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
508         if os.path.isfile(replacement_path_u):
509             print "%r exists" % (replacement_path_u,)
510         if os.path.isfile(conflict_path_u):
511             print "%r exists" % (conflict_path_u,)
512
513         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
514         return conflict_path_u
515
516     def _rename_deleted_file(self, abspath_u):
517         self._log('renaming deleted file to backup: %s' % (abspath_u,))
518         try:
519             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
520         except OSError:
521             self._log("Already gone: '%s'" % (abspath_u,))
522         return abspath_u
523
524
525 class Downloader(QueueMixin, WriteFileMixin):
526     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
527
528     def __init__(self, client, local_path_u, db, collective_dirnode,
529                  upload_readonly_dircap, clock, is_upload_pending, umask):
530         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
531
532         if not IDirectoryNode.providedBy(collective_dirnode):
533             raise AssertionError("The URI in '%s' does not refer to a directory."
534                                  % os.path.join('private', 'collective_dircap'))
535         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
536             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
537                                  % os.path.join('private', 'collective_dircap'))
538
539         self._collective_dirnode = collective_dirnode
540         self._upload_readonly_dircap = upload_readonly_dircap
541         self._is_upload_pending = is_upload_pending
542         self._umask = umask
543
544     def start_downloading(self):
545         self._log("start_downloading")
546         files = self._db.get_all_relpaths()
547         self._log("all files %s" % files)
548
549         d = self._scan_remote_collective(scan_self=True)
550         d.addBoth(self._logcb, "after _scan_remote_collective 0")
551         self._turn_deque()
552         return d
553
554     def stop(self):
555         self._stopped = True
556         d = defer.succeed(None)
557         d.addCallback(lambda ign: self._lazy_tail)
558         return d
559
560     def _should_download(self, relpath_u, remote_version):
561         """
562         _should_download returns a bool indicating whether or not a remote object should be downloaded.
563         We check the remote metadata version against our magic-folder db version number;
564         latest version wins.
565         """
566         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
567         if magicpath.should_ignore_file(relpath_u):
568             self._log("nope")
569             return False
570         self._log("yep")
571         db_entry = self._db.get_db_entry(relpath_u)
572         if db_entry is None:
573             return True
574         self._log("version %r" % (db_entry.version,))
575         return (db_entry.version < remote_version)
576
577     def _get_local_latest(self, relpath_u):
578         """
579         _get_local_latest takes a unicode path string checks to see if this file object
580         exists in our magic-folder db; if not then return None
581         else check for an entry in our magic-folder db and return the version number.
582         """
583         if not self._get_filepath(relpath_u).exists():
584             return None
585         db_entry = self._db.get_db_entry(relpath_u)
586         return None if db_entry is None else db_entry.version
587
588     def _get_collective_latest_file(self, filename):
589         """
590         _get_collective_latest_file takes a file path pointing to a file managed by
591         magic-folder and returns a deferred that fires with the two tuple containing a
592         file node and metadata for the latest version of the file located in the
593         magic-folder collective directory.
594         """
595         collective_dirmap_d = self._collective_dirnode.list()
596         def scan_collective(result):
597             list_of_deferreds = []
598             for dir_name in result.keys():
599                 # XXX make sure it's a directory
600                 d = defer.succeed(None)
601                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
602                 list_of_deferreds.append(d)
603             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
604             return deferList
605         collective_dirmap_d.addCallback(scan_collective)
606         def highest_version(deferredList):
607             max_version = 0
608             metadata = None
609             node = None
610             for success, result in deferredList:
611                 if success:
612                     if result[1]['version'] > max_version:
613                         node, metadata = result
614                         max_version = result[1]['version']
615             return node, metadata
616         collective_dirmap_d.addCallback(highest_version)
617         return collective_dirmap_d
618
619     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
620         self._log("_scan_remote_dmd nickname %r" % (nickname,))
621         d = dirnode.list()
622         def scan_listing(listing_map):
623             for encoded_relpath_u in listing_map.keys():
624                 relpath_u = magicpath.magic2path(encoded_relpath_u)
625                 self._log("found %r" % (relpath_u,))
626
627                 file_node, metadata = listing_map[encoded_relpath_u]
628                 local_version = self._get_local_latest(relpath_u)
629                 remote_version = metadata.get('version', None)
630                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
631
632                 if local_version is None or remote_version is None or local_version < remote_version:
633                     self._log("%r added to download queue" % (relpath_u,))
634                     if scan_batch.has_key(relpath_u):
635                         scan_batch[relpath_u] += [(file_node, metadata)]
636                     else:
637                         scan_batch[relpath_u] = [(file_node, metadata)]
638
639         d.addCallback(scan_listing)
640         d.addBoth(self._logcb, "end of _scan_remote_dmd")
641         return d
642
643     def _scan_remote_collective(self, scan_self=False):
644         self._log("_scan_remote_collective")
645         scan_batch = {}  # path -> [(filenode, metadata)]
646
647         d = self._collective_dirnode.list()
648         def scan_collective(dirmap):
649             d2 = defer.succeed(None)
650             for dir_name in dirmap:
651                 (dirnode, metadata) = dirmap[dir_name]
652                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
653                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
654                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
655                     def _err(f, dir_name=dir_name):
656                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
657                         # XXX what should we do to make this failure more visible to users?
658                     d2.addErrback(_err)
659
660             return d2
661         d.addCallback(scan_collective)
662
663         def _filter_batch_to_deque(ign):
664             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
665             for relpath_u in scan_batch.keys():
666                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
667
668                 if self._should_download(relpath_u, metadata['version']):
669                     self._deque.append( (relpath_u, file_node, metadata) )
670                 else:
671                     self._log("Excluding %r" % (relpath_u,))
672                     self._call_hook(None, 'processed')
673
674             self._log("deque after = %r" % (self._deque,))
675         d.addCallback(_filter_batch_to_deque)
676         return d
677
678     def _when_queue_is_empty(self):
679         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
680         d.addBoth(self._logcb, "after _scan_remote_collective 1")
681         d.addCallback(lambda ign: self._turn_deque())
682         return d
683
684     def _process(self, item, now=None):
685         # Downloader
686         self._log("_process(%r)" % (item,))
687         if now is None:
688             now = time.time()
689         (relpath_u, file_node, metadata) = item
690         fp = self._get_filepath(relpath_u)
691         abspath_u = unicode_from_filepath(fp)
692         conflict_path_u = self._get_conflicted_filename(abspath_u)
693
694         d = defer.succeed(None)
695
696         def do_update_db(written_abspath_u):
697             filecap = file_node.get_uri()
698             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
699             last_downloaded_uri = filecap
700             last_downloaded_timestamp = now
701             written_pathinfo = get_pathinfo(written_abspath_u)
702
703             if not written_pathinfo.exists and not metadata.get('deleted', False):
704                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
705
706             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
707                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
708             self._count('objects_downloaded')
709         def failed(f):
710             self._log("download failed: %s" % (str(f),))
711             self._count('objects_failed')
712             return f
713
714         if os.path.isfile(conflict_path_u):
715             def fail(res):
716                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
717             d.addCallback(fail)
718         else:
719             is_conflict = False
720             db_entry = self._db.get_db_entry(relpath_u)
721             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
722             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
723             if db_entry:
724                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
725                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
726                         is_conflict = True
727                         self._count('objects_conflicted')
728                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
729                     is_conflict = True
730                     self._count('objects_conflicted')
731                 elif self._is_upload_pending(relpath_u):
732                     is_conflict = True
733                     self._count('objects_conflicted')
734
735             if relpath_u.endswith(u"/"):
736                 if metadata.get('deleted', False):
737                     self._log("rmdir(%r) ignored" % (abspath_u,))
738                 else:
739                     self._log("mkdir(%r)" % (abspath_u,))
740                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
741                     d.addCallback(lambda ign: abspath_u)
742             else:
743                 if metadata.get('deleted', False):
744                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
745                 else:
746                     d.addCallback(lambda ign: file_node.download_best_version())
747                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
748                                                                                is_conflict=is_conflict))
749
750         d.addCallbacks(do_update_db, failed)
751
752         def trap_conflicts(f):
753             f.trap(ConflictError)
754             return None
755         d.addErrback(trap_conflicts)
756         return d