]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Update magic-folder db schema to use REAL instead of NUMBER of TIMESTAMP
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         d = self.uploader.start_scanning()
95         d2 = self.downloader.start_scanning()
96         d.addCallback(lambda ign: d2)
97         return d
98
99     def finish(self):
100         print "finish"
101         d = self.uploader.stop()
102         d2 = self.downloader.stop()
103         d.addCallback(lambda ign: d2)
104         return d
105
106     def remove_service(self):
107         return service.MultiService.disownServiceParent(self)
108
109
110 class QueueMixin(HookMixin):
111     def __init__(self, client, local_path_u, db, name, clock):
112         self._client = client
113         self._local_path_u = local_path_u
114         self._local_filepath = to_filepath(local_path_u)
115         self._db = db
116         self._name = name
117         self._clock = clock
118         self._hooks = {'processed': None, 'started': None}
119         self.started_d = self.set_hook('started')
120
121         if not self._local_filepath.exists():
122             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
123                                  "but there is no directory at that location."
124                                  % quote_local_unicode_path(self._local_path_u))
125         if not self._local_filepath.isdir():
126             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
127                                  "but the thing at that location is not a directory."
128                                  % quote_local_unicode_path(self._local_path_u))
129
130         self._deque = deque()
131         self._lazy_tail = defer.succeed(None)
132         self._stopped = False
133         self._turn_delay = 0
134
135     def _get_filepath(self, relpath_u):
136         self._log("_get_filepath(%r)" % (relpath_u,))
137         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
138
139     def _get_relpath(self, filepath):
140         self._log("_get_relpath(%r)" % (filepath,))
141         segments = unicode_segments_from(filepath, self._local_filepath)
142         self._log("segments = %r" % (segments,))
143         return u"/".join(segments)
144
145     def _count(self, counter_name, delta=1):
146         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
147         self._log("%s += %r" % (counter_name, delta))
148         self._client.stats_provider.count(ctr, delta)
149
150     def _logcb(self, res, msg):
151         self._log("%s: %r" % (msg, res))
152         return res
153
154     def _log(self, msg):
155         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156         self._client.log(s)
157         print s
158         #open("events", "ab+").write(msg)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
181                  immediate=False):
182         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
183
184         self.is_ready = False
185         self._immediate = immediate
186
187         if not IDirectoryNode.providedBy(upload_dirnode):
188             raise AssertionError("The URI in '%s' does not refer to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
191             raise AssertionError("The URI in '%s' is not a writecap to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193
194         self._upload_dirnode = upload_dirnode
195         self._inotify = get_inotify_module()
196         self._notifier = self._inotify.INotify()
197         self._pending = set()
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         if hasattr(self._notifier, 'wait_until_stopped'):
228             d = self._notifier.wait_until_stopped()
229         else:
230             d = defer.succeed(None)
231         d.addCallback(lambda ign: self._lazy_tail)
232         return d
233
234     def start_scanning(self):
235         self._log("start_scanning")
236         self.is_ready = True
237         self._pending = self._db.get_all_relpaths()
238         self._log("all_files %r" % (self._pending))
239         d = self._scan(u"")
240         def _add_pending(ign):
241             # This adds all of the files that were in the db but not already processed
242             # (normally because they have been deleted on disk).
243             self._log("adding %r" % (self._pending))
244             self._deque.extend(self._pending)
245         d.addCallback(_add_pending)
246         d.addCallback(lambda ign: self._turn_deque())
247         return d
248
249     def _scan(self, reldir_u):
250         self._log("scan %r" % (reldir_u,))
251         fp = self._get_filepath(reldir_u)
252         try:
253             children = listdir_filepath(fp)
254         except EnvironmentError:
255             raise Exception("WARNING: magic folder: permission denied on directory %s"
256                             % quote_filepath(fp))
257         except FilenameEncodingError:
258             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
259                             % quote_filepath(fp))
260
261         d = defer.succeed(None)
262         for child in children:
263             _assert(isinstance(child, unicode), child=child)
264             d.addCallback(lambda ign, child=child:
265                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
266             def _add_pending(relpath_u):
267                 if magicpath.should_ignore_file(relpath_u):
268                     return None
269
270                 self._pending.add(relpath_u)
271                 return relpath_u
272             d.addCallback(_add_pending)
273             # This call to _process doesn't go through the deque, and probably should.
274             d.addCallback(self._process)
275             d.addBoth(self._call_hook, 'processed')
276             d.addErrback(log.err)
277
278         return d
279
280     def is_pending(self, relpath_u):
281         return relpath_u in self._pending
282
283     def _notify(self, opaque, path, events_mask):
284         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
285         relpath_u = self._get_relpath(path)
286
287         # We filter out IN_CREATE events not associated with a directory.
288         # Acting on IN_CREATE for files could cause us to read and upload
289         # a possibly-incomplete file before the application has closed it.
290         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
291         # It isn't possible to avoid watching for IN_CREATE at all, because
292         # it is the only event notified for a directory creation.
293
294         if ((events_mask & self._inotify.IN_CREATE) != 0 and
295             (events_mask & self._inotify.IN_ISDIR) == 0):
296             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
297             return
298         if relpath_u in self._pending:
299             self._log("ignoring event for %r (already pending)" % (relpath_u,))
300             return
301         if magicpath.should_ignore_file(relpath_u):
302             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
303             return
304
305         self._log("appending %r to deque" % (relpath_u,))
306         self._deque.append(relpath_u)
307         self._pending.add(relpath_u)
308         self._count('objects_queued')
309         if self.is_ready:
310             if self._immediate:  # for tests
311                 self._turn_deque()
312             else:
313                 self._clock.callLater(0, self._turn_deque)
314
315     def _when_queue_is_empty(self):
316         return defer.succeed(None)
317
318     def _process(self, relpath_u):
319         # Uploader
320         self._log("_process(%r)" % (relpath_u,))
321         if relpath_u is None:
322             return
323         precondition(isinstance(relpath_u, unicode), relpath_u)
324         precondition(not relpath_u.endswith(u'/'), relpath_u)
325
326         d = defer.succeed(None)
327
328         def _maybe_upload(val, now=None):
329             if now is None:
330                 now = time.time()
331             fp = self._get_filepath(relpath_u)
332             pathinfo = get_pathinfo(unicode_from_filepath(fp))
333
334             self._log("about to remove %r from pending set %r" %
335                       (relpath_u, self._pending))
336             self._pending.remove(relpath_u)
337             encoded_path_u = magicpath.path2magic(relpath_u)
338
339             if not pathinfo.exists:
340                 # FIXME merge this with the 'isfile' case.
341                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
342                 self._count('objects_disappeared')
343
344                 db_entry = self._db.get_db_entry(relpath_u)
345                 if db_entry is None:
346                     return None
347
348                 last_downloaded_timestamp = now  # is this correct?
349
350                 if is_new_file(pathinfo, db_entry):
351                     new_version = db_entry.version + 1
352                 else:
353                     self._log("Not uploading %r" % (relpath_u,))
354                     self._count('objects_not_uploaded')
355                     return
356
357                 metadata = { 'version': new_version,
358                              'deleted': True,
359                              'last_downloaded_timestamp': last_downloaded_timestamp }
360                 if db_entry.last_downloaded_uri is not None:
361                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
362
363                 empty_uploadable = Data("", self._client.convergence)
364                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
365                                                    metadata=metadata, overwrite=True)
366
367                 def _add_db_entry(filenode):
368                     filecap = filenode.get_uri()
369                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
370                     self._db.did_upload_version(relpath_u, new_version, filecap,
371                                                 last_downloaded_uri, last_downloaded_timestamp,
372                                                 pathinfo)
373                     self._count('files_uploaded')
374                 d2.addCallback(_add_db_entry)
375                 return d2
376             elif pathinfo.islink:
377                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
378                 return None
379             elif pathinfo.isdir:
380                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
381                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
382
383                 uploadable = Data("", self._client.convergence)
384                 encoded_path_u += magicpath.path2magic(u"/")
385                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
386                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
387                 def _succeeded(ign):
388                     self._log("created subdirectory %r" % (relpath_u,))
389                     self._count('directories_created')
390                 def _failed(f):
391                     self._log("failed to create subdirectory %r" % (relpath_u,))
392                     return f
393                 upload_d.addCallbacks(_succeeded, _failed)
394                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
395                 return upload_d
396             elif pathinfo.isfile:
397                 db_entry = self._db.get_db_entry(relpath_u)
398
399                 last_downloaded_timestamp = now
400
401                 if db_entry is None:
402                     new_version = 0
403                 elif is_new_file(pathinfo, db_entry):
404                     new_version = db_entry.version + 1
405                 else:
406                     self._log("Not uploading %r" % (relpath_u,))
407                     self._count('objects_not_uploaded')
408                     return None
409
410                 metadata = { 'version': new_version,
411                              'last_downloaded_timestamp': last_downloaded_timestamp }
412                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
413                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
414
415                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
416                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
417                                                    metadata=metadata, overwrite=True)
418
419                 def _add_db_entry(filenode):
420                     filecap = filenode.get_uri()
421                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
422                     self._db.did_upload_version(relpath_u, new_version, filecap,
423                                                 last_downloaded_uri, last_downloaded_timestamp,
424                                                 pathinfo)
425                     self._count('files_uploaded')
426                 d2.addCallback(_add_db_entry)
427                 return d2
428             else:
429                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
430                 return None
431
432         d.addCallback(_maybe_upload)
433
434         def _succeeded(res):
435             self._count('objects_succeeded')
436             return res
437         def _failed(f):
438             self._count('objects_failed')
439             self._log("%s while processing %r" % (f, relpath_u))
440             return f
441         d.addCallbacks(_succeeded, _failed)
442         return d
443
444     def _get_metadata(self, encoded_path_u):
445         try:
446             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
447         except KeyError:
448             return Failure()
449         return d
450
451     def _get_filenode(self, encoded_path_u):
452         try:
453             d = self._upload_dirnode.get(encoded_path_u)
454         except KeyError:
455             return Failure()
456         return d
457
458
459 class WriteFileMixin(object):
460     FUDGE_SECONDS = 10.0
461
462     def _get_conflicted_filename(self, abspath_u):
463         return abspath_u + u".conflict"
464
465     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
466         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
467                   % (abspath_u, len(file_contents), is_conflict, now))
468
469         # 1. Write a temporary file, say .foo.tmp.
470         # 2. is_conflict determines whether this is an overwrite or a conflict.
471         # 3. Set the mtime of the replacement file to be T seconds before the
472         #    current local time.
473         # 4. Perform a file replacement with backup filename foo.backup,
474         #    replaced file foo, and replacement file .foo.tmp. If any step of
475         #    this operation fails, reclassify as a conflict and stop.
476         #
477         # Returns the path of the destination file.
478
479         precondition_abspath(abspath_u)
480         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
481         backup_path_u = abspath_u + u".backup"
482         if now is None:
483             now = time.time()
484
485         # ensure parent directory exists
486         head, tail = os.path.split(abspath_u)
487
488         old_mask = os.umask(self._umask)
489         try:
490             fileutil.make_dirs(head, (~ self._umask) & 0777)
491             fileutil.write(replacement_path_u, file_contents)
492         finally:
493             os.umask(old_mask)
494
495         # FUDGE_SECONDS is used to determine if another process
496         # has written to the same file concurrently. This is described
497         # in the Earth Dragon section of our design document:
498         # docs/proposed/magic-folder/remote-to-local-sync.rst
499         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
500         if is_conflict:
501             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
502             return self._rename_conflicted_file(abspath_u, replacement_path_u)
503         else:
504             try:
505                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
506                 return abspath_u
507             except fileutil.ConflictError:
508                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
509
510     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
511         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
512
513         conflict_path_u = self._get_conflicted_filename(abspath_u)
514         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
515         if os.path.isfile(replacement_path_u):
516             print "%r exists" % (replacement_path_u,)
517         if os.path.isfile(conflict_path_u):
518             print "%r exists" % (conflict_path_u,)
519
520         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
521         return conflict_path_u
522
523     def _rename_deleted_file(self, abspath_u):
524         self._log('renaming deleted file to backup: %s' % (abspath_u,))
525         try:
526             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
527         except OSError:
528             self._log("Already gone: '%s'" % (abspath_u,))
529         return abspath_u
530
531
532 class Downloader(QueueMixin, WriteFileMixin):
533     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
534
535     def __init__(self, client, local_path_u, db, collective_dirnode,
536                  upload_readonly_dircap, clock, is_upload_pending, umask):
537         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
538
539         if not IDirectoryNode.providedBy(collective_dirnode):
540             raise AssertionError("The URI in '%s' does not refer to a directory."
541                                  % os.path.join('private', 'collective_dircap'))
542         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
543             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
544                                  % os.path.join('private', 'collective_dircap'))
545
546         self._collective_dirnode = collective_dirnode
547         self._upload_readonly_dircap = upload_readonly_dircap
548         self._is_upload_pending = is_upload_pending
549         self._umask = umask
550
551     def start_scanning(self):
552         self._log("start_scanning")
553         files = self._db.get_all_relpaths()
554         self._log("all files %s" % files)
555
556         d = self._scan_remote_collective(scan_self=True)
557         d.addBoth(self._logcb, "after _scan_remote_collective 0")
558         self._turn_deque()
559         return d
560
561     def stop(self):
562         self._stopped = True
563         d = defer.succeed(None)
564         d.addCallback(lambda ign: self._lazy_tail)
565         return d
566
567     def _should_download(self, relpath_u, remote_version):
568         """
569         _should_download returns a bool indicating whether or not a remote object should be downloaded.
570         We check the remote metadata version against our magic-folder db version number;
571         latest version wins.
572         """
573         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
574         if magicpath.should_ignore_file(relpath_u):
575             self._log("nope")
576             return False
577         self._log("yep")
578         db_entry = self._db.get_db_entry(relpath_u)
579         if db_entry is None:
580             return True
581         self._log("version %r" % (db_entry.version,))
582         return (db_entry.version < remote_version)
583
584     def _get_local_latest(self, relpath_u):
585         """
586         _get_local_latest takes a unicode path string checks to see if this file object
587         exists in our magic-folder db; if not then return None
588         else check for an entry in our magic-folder db and return the version number.
589         """
590         if not self._get_filepath(relpath_u).exists():
591             return None
592         db_entry = self._db.get_db_entry(relpath_u)
593         return None if db_entry is None else db_entry.version
594
595     def _get_collective_latest_file(self, filename):
596         """
597         _get_collective_latest_file takes a file path pointing to a file managed by
598         magic-folder and returns a deferred that fires with the two tuple containing a
599         file node and metadata for the latest version of the file located in the
600         magic-folder collective directory.
601         """
602         collective_dirmap_d = self._collective_dirnode.list()
603         def scan_collective(result):
604             list_of_deferreds = []
605             for dir_name in result.keys():
606                 # XXX make sure it's a directory
607                 d = defer.succeed(None)
608                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
609                 list_of_deferreds.append(d)
610             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
611             return deferList
612         collective_dirmap_d.addCallback(scan_collective)
613         def highest_version(deferredList):
614             max_version = 0
615             metadata = None
616             node = None
617             for success, result in deferredList:
618                 if success:
619                     if 'version' not in result[1]:
620                         self._log("invalid remote metadata detected")
621                         continue
622                     else:
623                         if result[1]['version'] > max_version:
624                             node, metadata = result
625                             max_version = result[1]['version']
626             return node, metadata
627         collective_dirmap_d.addCallback(highest_version)
628         return collective_dirmap_d
629
630     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
631         self._log("_scan_remote_dmd nickname %r" % (nickname,))
632         d = dirnode.list()
633         def scan_listing(listing_map):
634             for encoded_relpath_u in listing_map.keys():
635                 relpath_u = magicpath.magic2path(encoded_relpath_u)
636                 self._log("found %r" % (relpath_u,))
637
638                 file_node, metadata = listing_map[encoded_relpath_u]
639                 local_version = self._get_local_latest(relpath_u)
640                 if 'version' not in metadata:
641                     self._log("invalid remote metadata detected for %r" % (relpath_u,))
642                     continue
643                 else:
644                     remote_version = metadata['version']
645                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
646
647                 if local_version is None or local_version < remote_version:
648                     self._log("%r added to download queue" % (relpath_u,))
649                     if scan_batch.has_key(relpath_u):
650                         scan_batch[relpath_u] += [(file_node, metadata)]
651                     else:
652                         scan_batch[relpath_u] = [(file_node, metadata)]
653
654         d.addCallback(scan_listing)
655         d.addBoth(self._logcb, "end of _scan_remote_dmd")
656         return d
657
658     def _scan_remote_collective(self, scan_self=False):
659         self._log("_scan_remote_collective")
660         scan_batch = {}  # path -> [(filenode, metadata)]
661
662         d = self._collective_dirnode.list()
663         def scan_collective(dirmap):
664             d2 = defer.succeed(None)
665             for dir_name in dirmap:
666                 (dirnode, metadata) = dirmap[dir_name]
667                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
668                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
669                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
670                     def _err(f, dir_name=dir_name):
671                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
672                         # XXX what should we do to make this failure more visible to users?
673                     d2.addErrback(_err)
674
675             return d2
676         d.addCallback(scan_collective)
677
678         def _filter_batch_to_deque(ign):
679             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
680             for relpath_u in scan_batch.keys():
681                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
682
683                 if self._should_download(relpath_u, metadata['version']):
684                     self._deque.append( (relpath_u, file_node, metadata) )
685                 else:
686                     self._log("Excluding %r" % (relpath_u,))
687                     self._call_hook(None, 'processed')
688
689             self._log("deque after = %r" % (self._deque,))
690         d.addCallback(_filter_batch_to_deque)
691         return d
692
693     def _when_queue_is_empty(self):
694         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
695         d.addBoth(self._logcb, "after _scan_remote_collective 1")
696         d.addCallback(lambda ign: self._turn_deque())
697         return d
698
699     def _process(self, item, now=None):
700         # Downloader
701         self._log("_process(%r)" % (item,))
702         if now is None:
703             now = time.time()
704         (relpath_u, file_node, metadata) = item
705         fp = self._get_filepath(relpath_u)
706         abspath_u = unicode_from_filepath(fp)
707         conflict_path_u = self._get_conflicted_filename(abspath_u)
708
709         d = defer.succeed(None)
710
711         def do_update_db(written_abspath_u):
712             filecap = file_node.get_uri()
713             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
714             last_downloaded_uri = filecap
715             last_downloaded_timestamp = now
716             written_pathinfo = get_pathinfo(written_abspath_u)
717
718             if not written_pathinfo.exists and not metadata.get('deleted', False):
719                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
720
721             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
722                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
723             self._count('objects_downloaded')
724         def failed(f):
725             self._log("download failed: %s" % (str(f),))
726             self._count('objects_failed')
727             return f
728
729         if os.path.isfile(conflict_path_u):
730             def fail(res):
731                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
732             d.addCallback(fail)
733         else:
734             is_conflict = False
735             db_entry = self._db.get_db_entry(relpath_u)
736             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
737             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
738             if db_entry:
739                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
740                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
741                         is_conflict = True
742                         self._count('objects_conflicted')
743                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
744                     is_conflict = True
745                     self._count('objects_conflicted')
746                 elif self._is_upload_pending(relpath_u):
747                     is_conflict = True
748                     self._count('objects_conflicted')
749
750             if relpath_u.endswith(u"/"):
751                 if metadata.get('deleted', False):
752                     self._log("rmdir(%r) ignored" % (abspath_u,))
753                 else:
754                     self._log("mkdir(%r)" % (abspath_u,))
755                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
756                     d.addCallback(lambda ign: abspath_u)
757             else:
758                 if metadata.get('deleted', False):
759                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
760                 else:
761                     d.addCallback(lambda ign: file_node.download_best_version())
762                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
763                                                                                is_conflict=is_conflict))
764
765         d.addCallbacks(do_update_db, failed)
766
767         def trap_conflicts(f):
768             f.trap(ConflictError)
769             return None
770         d.addErrback(trap_conflicts)
771         return d