]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
55eb47011740356fef7f8930380eedf930eb5a91
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         d = self.uploader.start_scanning()
95         d2 = self.downloader.start_scanning()
96         d.addCallback(lambda ign: d2)
97         return d
98
99     def finish(self):
100         print "finish"
101         d = self.uploader.stop()
102         d2 = self.downloader.stop()
103         d.addCallback(lambda ign: d2)
104         return d
105
106     def remove_service(self):
107         return service.MultiService.disownServiceParent(self)
108
109
110 class QueueMixin(HookMixin):
111     def __init__(self, client, local_path_u, db, name, clock):
112         self._client = client
113         self._local_path_u = local_path_u
114         self._local_filepath = to_filepath(local_path_u)
115         self._db = db
116         self._name = name
117         self._clock = clock
118         self._hooks = {'processed': None, 'started': None}
119         self.started_d = self.set_hook('started')
120
121         if not self._local_filepath.exists():
122             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
123                                  "but there is no directory at that location."
124                                  % quote_local_unicode_path(self._local_path_u))
125         if not self._local_filepath.isdir():
126             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
127                                  "but the thing at that location is not a directory."
128                                  % quote_local_unicode_path(self._local_path_u))
129
130         self._deque = deque()
131         self._lazy_tail = defer.succeed(None)
132         self._stopped = False
133         self._turn_delay = 0
134
135     def _get_filepath(self, relpath_u):
136         self._log("_get_filepath(%r)" % (relpath_u,))
137         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
138
139     def _get_relpath(self, filepath):
140         self._log("_get_relpath(%r)" % (filepath,))
141         segments = unicode_segments_from(filepath, self._local_filepath)
142         self._log("segments = %r" % (segments,))
143         return u"/".join(segments)
144
145     def _count(self, counter_name, delta=1):
146         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
147         self._log("%s += %r" % (counter_name, delta))
148         self._client.stats_provider.count(ctr, delta)
149
150     def _logcb(self, res, msg):
151         self._log("%s: %r" % (msg, res))
152         return res
153
154     def _log(self, msg):
155         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156         self._client.log(s)
157         print s
158         #open("events", "ab+").write(msg)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
181                  immediate=False):
182         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
183
184         self.is_ready = False
185         self._immediate = immediate
186
187         if not IDirectoryNode.providedBy(upload_dirnode):
188             raise AssertionError("The URI in '%s' does not refer to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
191             raise AssertionError("The URI in '%s' is not a writecap to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193
194         self._upload_dirnode = upload_dirnode
195         self._inotify = get_inotify_module()
196         self._notifier = self._inotify.INotify()
197         self._pending = set()
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         if hasattr(self._notifier, 'wait_until_stopped'):
228             d = self._notifier.wait_until_stopped()
229         else:
230             d = defer.succeed(None)
231         d.addCallback(lambda ign: self._lazy_tail)
232         return d
233
234     def start_scanning(self):
235         self._log("start_scanning")
236         self.is_ready = True
237         self._pending = self._db.get_all_relpaths()
238         self._log("all_files %r" % (self._pending))
239         d = self._scan(u"")
240         def _add_pending(ign):
241             # This adds all of the files that were in the db but not already processed
242             # (normally because they have been deleted on disk).
243             self._log("adding %r" % (self._pending))
244             self._deque.extend(self._pending)
245         d.addCallback(_add_pending)
246         d.addCallback(lambda ign: self._turn_deque())
247         return d
248
249     def _scan(self, reldir_u):
250         self._log("scan %r" % (reldir_u,))
251         fp = self._get_filepath(reldir_u)
252         try:
253             children = listdir_filepath(fp)
254         except EnvironmentError:
255             raise Exception("WARNING: magic folder: permission denied on directory %s"
256                             % quote_filepath(fp))
257         except FilenameEncodingError:
258             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
259                             % quote_filepath(fp))
260
261         d = defer.succeed(None)
262         for child in children:
263             _assert(isinstance(child, unicode), child=child)
264             d.addCallback(lambda ign, child=child:
265                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
266             def _add_pending(relpath_u):
267                 if magicpath.should_ignore_file(relpath_u):
268                     return None
269
270                 self._pending.add(relpath_u)
271                 return relpath_u
272             d.addCallback(_add_pending)
273             # This call to _process doesn't go through the deque, and probably should.
274             d.addCallback(self._process)
275             d.addBoth(self._call_hook, 'processed')
276             d.addErrback(log.err)
277
278         return d
279
280     def is_pending(self, relpath_u):
281         return relpath_u in self._pending
282
283     def _notify(self, opaque, path, events_mask):
284         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
285         relpath_u = self._get_relpath(path)
286
287         # We filter out IN_CREATE events not associated with a directory.
288         # Acting on IN_CREATE for files could cause us to read and upload
289         # a possibly-incomplete file before the application has closed it.
290         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
291         # It isn't possible to avoid watching for IN_CREATE at all, because
292         # it is the only event notified for a directory creation.
293
294         if ((events_mask & self._inotify.IN_CREATE) != 0 and
295             (events_mask & self._inotify.IN_ISDIR) == 0):
296             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
297             return
298         if relpath_u in self._pending:
299             self._log("ignoring event for %r (already pending)" % (relpath_u,))
300             return
301         if magicpath.should_ignore_file(relpath_u):
302             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
303             return
304
305         self._log("appending %r to deque" % (relpath_u,))
306         self._deque.append(relpath_u)
307         self._pending.add(relpath_u)
308         self._count('objects_queued')
309         if self.is_ready:
310             if self._immediate:  # for tests
311                 self._turn_deque()
312             else:
313                 self._clock.callLater(0, self._turn_deque)
314
315     def _when_queue_is_empty(self):
316         return defer.succeed(None)
317
318     def _process(self, relpath_u):
319         # Uploader
320         self._log("_process(%r)" % (relpath_u,))
321         if relpath_u is None:
322             return
323         precondition(isinstance(relpath_u, unicode), relpath_u)
324         precondition(not relpath_u.endswith(u'/'), relpath_u)
325
326         d = defer.succeed(None)
327
328         def _maybe_upload(val, now=None):
329             if now is None:
330                 now = time.time()
331             fp = self._get_filepath(relpath_u)
332             pathinfo = get_pathinfo(unicode_from_filepath(fp))
333
334             self._log("about to remove %r from pending set %r" %
335                       (relpath_u, self._pending))
336             self._pending.remove(relpath_u)
337             encoded_path_u = magicpath.path2magic(relpath_u)
338
339             if not pathinfo.exists:
340                 # FIXME merge this with the 'isfile' case.
341                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
342                 self._count('objects_disappeared')
343
344                 db_entry = self._db.get_db_entry(relpath_u)
345                 if db_entry is None:
346                     return None
347
348                 last_downloaded_timestamp = now  # is this correct?
349
350                 if is_new_file(pathinfo, db_entry):
351                     new_version = db_entry.version + 1
352                 else:
353                     self._log("Not uploading %r" % (relpath_u,))
354                     self._count('objects_not_uploaded')
355                     return
356
357                 metadata = { 'version': new_version,
358                              'deleted': True,
359                              'last_downloaded_timestamp': last_downloaded_timestamp }
360                 if db_entry.last_downloaded_uri is not None:
361                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
362
363                 empty_uploadable = Data("", self._client.convergence)
364                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
365                                                    metadata=metadata, overwrite=True)
366
367                 def _add_db_entry(filenode):
368                     filecap = filenode.get_uri()
369                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
370                     self._db.did_upload_version(relpath_u, new_version, filecap,
371                                                 last_downloaded_uri, last_downloaded_timestamp,
372                                                 pathinfo)
373                     self._count('files_uploaded')
374                 d2.addCallback(_add_db_entry)
375                 return d2
376             elif pathinfo.islink:
377                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
378                 return None
379             elif pathinfo.isdir:
380                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
381                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
382
383                 uploadable = Data("", self._client.convergence)
384                 encoded_path_u += magicpath.path2magic(u"/")
385                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
386                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
387                 def _succeeded(ign):
388                     self._log("created subdirectory %r" % (relpath_u,))
389                     self._count('directories_created')
390                 def _failed(f):
391                     self._log("failed to create subdirectory %r" % (relpath_u,))
392                     return f
393                 upload_d.addCallbacks(_succeeded, _failed)
394                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
395                 return upload_d
396             elif pathinfo.isfile:
397                 db_entry = self._db.get_db_entry(relpath_u)
398
399                 last_downloaded_timestamp = now
400
401                 if db_entry is None:
402                     new_version = 0
403                 elif is_new_file(pathinfo, db_entry):
404                     new_version = db_entry.version + 1
405                 else:
406                     self._log("Not uploading %r" % (relpath_u,))
407                     self._count('objects_not_uploaded')
408                     return None
409
410                 metadata = { 'version': new_version,
411                              'last_downloaded_timestamp': last_downloaded_timestamp }
412                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
413                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
414
415                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
416                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
417                                                    metadata=metadata, overwrite=True)
418
419                 def _add_db_entry(filenode):
420                     filecap = filenode.get_uri()
421                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
422                     self._db.did_upload_version(relpath_u, new_version, filecap,
423                                                 last_downloaded_uri, last_downloaded_timestamp,
424                                                 pathinfo)
425                     self._count('files_uploaded')
426                 d2.addCallback(_add_db_entry)
427                 return d2
428             else:
429                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
430                 return None
431
432         d.addCallback(_maybe_upload)
433
434         def _succeeded(res):
435             self._count('objects_succeeded')
436             return res
437         def _failed(f):
438             self._count('objects_failed')
439             self._log("%s while processing %r" % (f, relpath_u))
440             return f
441         d.addCallbacks(_succeeded, _failed)
442         return d
443
444     def _get_metadata(self, encoded_path_u):
445         try:
446             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
447         except KeyError:
448             return Failure()
449         return d
450
451     def _get_filenode(self, encoded_path_u):
452         try:
453             d = self._upload_dirnode.get(encoded_path_u)
454         except KeyError:
455             return Failure()
456         return d
457
458
459 class WriteFileMixin(object):
460     FUDGE_SECONDS = 10.0
461
462     def _get_conflicted_filename(self, abspath_u):
463         return abspath_u + u".conflict"
464
465     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
466         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
467                   % (abspath_u, len(file_contents), is_conflict, now))
468
469         # 1. Write a temporary file, say .foo.tmp.
470         # 2. is_conflict determines whether this is an overwrite or a conflict.
471         # 3. Set the mtime of the replacement file to be T seconds before the
472         #    current local time.
473         # 4. Perform a file replacement with backup filename foo.backup,
474         #    replaced file foo, and replacement file .foo.tmp. If any step of
475         #    this operation fails, reclassify as a conflict and stop.
476         #
477         # Returns the path of the destination file.
478
479         precondition_abspath(abspath_u)
480         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
481         backup_path_u = abspath_u + u".backup"
482         if now is None:
483             now = time.time()
484
485         # ensure parent directory exists
486         head, tail = os.path.split(abspath_u)
487         mode = 0777 # XXX
488         fileutil.make_dirs(head, mode)
489
490         fileutil.write(replacement_path_u, file_contents)
491         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
492         if is_conflict:
493             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
494             return self._rename_conflicted_file(abspath_u, replacement_path_u)
495         else:
496             try:
497                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
498                 return abspath_u
499             except fileutil.ConflictError:
500                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
501
502     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
503         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
504
505         conflict_path_u = self._get_conflicted_filename(abspath_u)
506         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
507         if os.path.isfile(replacement_path_u):
508             print "%r exists" % (replacement_path_u,)
509         if os.path.isfile(conflict_path_u):
510             print "%r exists" % (conflict_path_u,)
511
512         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
513         return conflict_path_u
514
515     def _rename_deleted_file(self, abspath_u):
516         self._log('renaming deleted file to backup: %s' % (abspath_u,))
517         try:
518             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
519         except OSError:
520             self._log("Already gone: '%s'" % (abspath_u,))
521         return abspath_u
522
523
524 class Downloader(QueueMixin, WriteFileMixin):
525     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
526
527     def __init__(self, client, local_path_u, db, collective_dirnode,
528                  upload_readonly_dircap, clock, is_upload_pending):
529         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
530
531         if not IDirectoryNode.providedBy(collective_dirnode):
532             raise AssertionError("The URI in '%s' does not refer to a directory."
533                                  % os.path.join('private', 'collective_dircap'))
534         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
535             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
536                                  % os.path.join('private', 'collective_dircap'))
537
538         self._collective_dirnode = collective_dirnode
539         self._upload_readonly_dircap = upload_readonly_dircap
540         self._is_upload_pending = is_upload_pending
541
542         self._turn_delay = self.REMOTE_SCAN_INTERVAL
543
544     def start_scanning(self):
545         self._log("start_scanning")
546         files = self._db.get_all_relpaths()
547         self._log("all files %s" % files)
548
549         d = self._scan_remote_collective(scan_self=True)
550         d.addBoth(self._logcb, "after _scan_remote_collective 0")
551         self._turn_deque()
552         return d
553
554     def stop(self):
555         self._stopped = True
556         d = defer.succeed(None)
557         d.addCallback(lambda ign: self._lazy_tail)
558         return d
559
560     def _should_download(self, relpath_u, remote_version):
561         """
562         _should_download returns a bool indicating whether or not a remote object should be downloaded.
563         We check the remote metadata version against our magic-folder db version number;
564         latest version wins.
565         """
566         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
567         if magicpath.should_ignore_file(relpath_u):
568             self._log("nope")
569             return False
570         self._log("yep")
571         db_entry = self._db.get_db_entry(relpath_u)
572         if db_entry is None:
573             return True
574         self._log("version %r" % (db_entry.version,))
575         return (db_entry.version < remote_version)
576
577     def _get_local_latest(self, relpath_u):
578         """
579         _get_local_latest takes a unicode path string checks to see if this file object
580         exists in our magic-folder db; if not then return None
581         else check for an entry in our magic-folder db and return the version number.
582         """
583         if not self._get_filepath(relpath_u).exists():
584             return None
585         db_entry = self._db.get_db_entry(relpath_u)
586         return None if db_entry is None else db_entry.version
587
588     def _get_collective_latest_file(self, filename):
589         """
590         _get_collective_latest_file takes a file path pointing to a file managed by
591         magic-folder and returns a deferred that fires with the two tuple containing a
592         file node and metadata for the latest version of the file located in the
593         magic-folder collective directory.
594         """
595         collective_dirmap_d = self._collective_dirnode.list()
596         def scan_collective(result):
597             list_of_deferreds = []
598             for dir_name in result.keys():
599                 # XXX make sure it's a directory
600                 d = defer.succeed(None)
601                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
602                 list_of_deferreds.append(d)
603             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
604             return deferList
605         collective_dirmap_d.addCallback(scan_collective)
606         def highest_version(deferredList):
607             max_version = 0
608             metadata = None
609             node = None
610             for success, result in deferredList:
611                 if success:
612                     if result[1]['version'] > max_version:
613                         node, metadata = result
614                         max_version = result[1]['version']
615             return node, metadata
616         collective_dirmap_d.addCallback(highest_version)
617         return collective_dirmap_d
618
619     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
620         self._log("_scan_remote_dmd nickname %r" % (nickname,))
621         d = dirnode.list()
622         def scan_listing(listing_map):
623             for encoded_relpath_u in listing_map.keys():
624                 relpath_u = magicpath.magic2path(encoded_relpath_u)
625                 self._log("found %r" % (relpath_u,))
626
627                 file_node, metadata = listing_map[encoded_relpath_u]
628                 local_version = self._get_local_latest(relpath_u)
629                 remote_version = metadata.get('version', None)
630                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
631
632                 if local_version is None or remote_version is None or local_version < remote_version:
633                     self._log("%r added to download queue" % (relpath_u,))
634                     if scan_batch.has_key(relpath_u):
635                         scan_batch[relpath_u] += [(file_node, metadata)]
636                     else:
637                         scan_batch[relpath_u] = [(file_node, metadata)]
638
639         d.addCallback(scan_listing)
640         d.addBoth(self._logcb, "end of _scan_remote_dmd")
641         return d
642
643     def _scan_remote_collective(self, scan_self=False):
644         self._log("_scan_remote_collective")
645         scan_batch = {}  # path -> [(filenode, metadata)]
646
647         d = self._collective_dirnode.list()
648         def scan_collective(dirmap):
649             d2 = defer.succeed(None)
650             for dir_name in dirmap:
651                 (dirnode, metadata) = dirmap[dir_name]
652                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
653                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
654                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
655                     def _err(f, dir_name=dir_name):
656                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
657                         # XXX what should we do to make this failure more visible to users?
658                     d2.addErrback(_err)
659
660             return d2
661         d.addCallback(scan_collective)
662
663         def _filter_batch_to_deque(ign):
664             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
665             for relpath_u in scan_batch.keys():
666                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
667
668                 if self._should_download(relpath_u, metadata['version']):
669                     self._deque.append( (relpath_u, file_node, metadata) )
670                 else:
671                     self._log("Excluding %r" % (relpath_u,))
672                     self._count('objects_excluded')
673                     self._call_hook(None, 'processed')
674
675             self._log("deque after = %r" % (self._deque,))
676         d.addCallback(_filter_batch_to_deque)
677         return d
678
679     def _when_queue_is_empty(self):
680         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
681         d.addBoth(self._logcb, "after _scan_remote_collective 1")
682         d.addCallback(lambda ign: self._turn_deque())
683         return d
684
685     def _process(self, item, now=None):
686         # Downloader
687         self._log("_process(%r)" % (item,))
688         if now is None:
689             now = time.time()
690         (relpath_u, file_node, metadata) = item
691         fp = self._get_filepath(relpath_u)
692         abspath_u = unicode_from_filepath(fp)
693         conflict_path_u = self._get_conflicted_filename(abspath_u)
694
695         d = defer.succeed(None)
696
697         def do_update_db(written_abspath_u):
698             filecap = file_node.get_uri()
699             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
700             last_downloaded_uri = filecap
701             last_downloaded_timestamp = now
702             written_pathinfo = get_pathinfo(written_abspath_u)
703
704             if not written_pathinfo.exists and not metadata.get('deleted', False):
705                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
706
707             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
708                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
709             self._count('objects_downloaded')
710         def failed(f):
711             self._log("download failed: %s" % (str(f),))
712             self._count('objects_failed')
713             return f
714
715         if os.path.isfile(conflict_path_u):
716             def fail(res):
717                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
718             d.addCallback(fail)
719         else:
720             is_conflict = False
721             db_entry = self._db.get_db_entry(relpath_u)
722             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
723             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
724             if db_entry:
725                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
726                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
727                         is_conflict = True
728                         self._count('objects_conflicted')
729                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
730                     is_conflict = True
731                     self._count('objects_conflicted')
732                 elif self._is_upload_pending(relpath_u):
733                     is_conflict = True
734                     self._count('objects_conflicted')
735
736             if relpath_u.endswith(u"/"):
737                 if metadata.get('deleted', False):
738                     self._log("rmdir(%r) ignored" % (abspath_u,))
739                 else:
740                     self._log("mkdir(%r)" % (abspath_u,))
741                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
742                     d.addCallback(lambda ign: abspath_u)
743             else:
744                 if metadata.get('deleted', False):
745                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
746                 else:
747                     d.addCallback(lambda ign: file_node.download_best_version())
748                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
749                                                                                is_conflict=is_conflict))
750
751         d.addCallbacks(do_update_db, failed)
752
753         def trap_conflicts(f):
754             f.trap(ConflictError)
755             return None
756         d.addErrback(trap_conflicts)
757         return d