]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
8fa3143c341f79dbf6f8a517ab4e9c968f99a988
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=None):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         immediate = clock is not None
55         clock = clock or reactor
56         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
57         if db is None:
58             return Failure(Exception('ERROR: Unable to load magic folder db.'))
59
60         # for tests
61         self._client = client
62         self._db = db
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
69                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending)
70
71     def startService(self):
72         # TODO: why is this being called more than once?
73         if self.running:
74             return defer.succeed(None)
75         print "%r.startService" % (self,)
76         service.MultiService.startService(self)
77         return self.uploader.start_monitoring()
78
79     def ready(self):
80         """ready is used to signal us to start
81         processing the upload and download items...
82         """
83         d = self.uploader.start_scanning()
84         d2 = self.downloader.start_scanning()
85         d.addCallback(lambda ign: d2)
86         return d
87
88     def finish(self):
89         print "finish"
90         d = self.uploader.stop()
91         d2 = self.downloader.stop()
92         d.addCallback(lambda ign: d2)
93         return d
94
95     def remove_service(self):
96         return service.MultiService.disownServiceParent(self)
97
98
99 class QueueMixin(HookMixin):
100     def __init__(self, client, local_path_u, db, name, clock):
101         self._client = client
102         self._local_path_u = local_path_u
103         self._local_filepath = to_filepath(local_path_u)
104         self._db = db
105         self._name = name
106         self._clock = clock
107         self._hooks = {'processed': None, 'started': None}
108         self.started_d = self.set_hook('started')
109
110         if not self._local_filepath.exists():
111             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112                                  "but there is no directory at that location."
113                                  % quote_local_unicode_path(self._local_path_u))
114         if not self._local_filepath.isdir():
115             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116                                  "but the thing at that location is not a directory."
117                                  % quote_local_unicode_path(self._local_path_u))
118
119         self._deque = deque()
120         self._lazy_tail = defer.succeed(None)
121         self._stopped = False
122         self._turn_delay = 0
123
124     def _get_filepath(self, relpath_u):
125         self._log("_get_filepath(%r)" % (relpath_u,))
126         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
127
128     def _get_relpath(self, filepath):
129         self._log("_get_relpath(%r)" % (filepath,))
130         segments = unicode_segments_from(filepath, self._local_filepath)
131         self._log("segments = %r" % (segments,))
132         return u"/".join(segments)
133
134     def _count(self, counter_name, delta=1):
135         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
136         self._log("%s += %r" % (counter_name, delta))
137         self._client.stats_provider.count(ctr, delta)
138
139     def _logcb(self, res, msg):
140         self._log("%s: %r" % (msg, res))
141         return res
142
143     def _log(self, msg):
144         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
145         self._client.log(s)
146         print s
147         #open("events", "ab+").write(msg)
148
149     def _turn_deque(self):
150         self._log("_turn_deque")
151         if self._stopped:
152             self._log("stopped")
153             return
154         try:
155             item = self._deque.pop()
156             self._log("popped %r" % (item,))
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
170                  immediate=False):
171         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
172
173         self.is_ready = False
174         self._immediate = immediate
175
176         if not IDirectoryNode.providedBy(upload_dirnode):
177             raise AssertionError("The URI in '%s' does not refer to a directory."
178                                  % os.path.join('private', 'magic_folder_dircap'))
179         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
180             raise AssertionError("The URI in '%s' is not a writecap to a directory."
181                                  % os.path.join('private', 'magic_folder_dircap'))
182
183         self._upload_dirnode = upload_dirnode
184         self._inotify = get_inotify_module()
185         self._notifier = self._inotify.INotify()
186         self._pending = set()
187
188         if hasattr(self._notifier, 'set_pending_delay'):
189             self._notifier.set_pending_delay(pending_delay)
190
191         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
192         #
193         self.mask = ( self._inotify.IN_CREATE
194                     | self._inotify.IN_CLOSE_WRITE
195                     | self._inotify.IN_MOVED_TO
196                     | self._inotify.IN_MOVED_FROM
197                     | self._inotify.IN_DELETE
198                     | self._inotify.IN_ONLYDIR
199                     | IN_EXCL_UNLINK
200                     )
201         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
202                              recursive=True)
203
204     def start_monitoring(self):
205         self._log("start_monitoring")
206         d = defer.succeed(None)
207         d.addCallback(lambda ign: self._notifier.startReading())
208         d.addCallback(lambda ign: self._count('dirs_monitored'))
209         d.addBoth(self._call_hook, 'started')
210         return d
211
212     def stop(self):
213         self._log("stop")
214         self._notifier.stopReading()
215         self._count('dirs_monitored', -1)
216         if hasattr(self._notifier, 'wait_until_stopped'):
217             d = self._notifier.wait_until_stopped()
218         else:
219             d = defer.succeed(None)
220         d.addCallback(lambda ign: self._lazy_tail)
221         return d
222
223     def start_scanning(self):
224         self._log("start_scanning")
225         self.is_ready = True
226         self._pending = self._db.get_all_relpaths()
227         self._log("all_files %r" % (self._pending))
228         d = self._scan(u"")
229         def _add_pending(ign):
230             # This adds all of the files that were in the db but not already processed
231             # (normally because they have been deleted on disk).
232             self._log("adding %r" % (self._pending))
233             self._deque.extend(self._pending)
234         d.addCallback(_add_pending)
235         d.addCallback(lambda ign: self._turn_deque())
236         return d
237
238     def _scan(self, reldir_u):
239         self._log("scan %r" % (reldir_u,))
240         fp = self._get_filepath(reldir_u)
241         try:
242             children = listdir_filepath(fp)
243         except EnvironmentError:
244             raise Exception("WARNING: magic folder: permission denied on directory %s"
245                             % quote_filepath(fp))
246         except FilenameEncodingError:
247             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
248                             % quote_filepath(fp))
249
250         d = defer.succeed(None)
251         for child in children:
252             _assert(isinstance(child, unicode), child=child)
253             d.addCallback(lambda ign, child=child:
254                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
255             def _add_pending(relpath_u):
256                 if magicpath.should_ignore_file(relpath_u):
257                     return None
258
259                 self._pending.add(relpath_u)
260                 return relpath_u
261             d.addCallback(_add_pending)
262             # This call to _process doesn't go through the deque, and probably should.
263             d.addCallback(self._process)
264             d.addBoth(self._call_hook, 'processed')
265             d.addErrback(log.err)
266
267         return d
268
269     def is_pending(self, relpath_u):
270         return relpath_u in self._pending
271
272     def _notify(self, opaque, path, events_mask):
273         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
274         relpath_u = self._get_relpath(path)
275
276         # We filter out IN_CREATE events not associated with a directory.
277         # Acting on IN_CREATE for files could cause us to read and upload
278         # a possibly-incomplete file before the application has closed it.
279         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280         # It isn't possible to avoid watching for IN_CREATE at all, because
281         # it is the only event notified for a directory creation.
282
283         if ((events_mask & self._inotify.IN_CREATE) != 0 and
284             (events_mask & self._inotify.IN_ISDIR) == 0):
285             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
286             return
287         if relpath_u in self._pending:
288             self._log("ignoring event for %r (already pending)" % (relpath_u,))
289             return
290         if magicpath.should_ignore_file(relpath_u):
291             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
292             return
293
294         self._log("appending %r to deque" % (relpath_u,))
295         self._deque.append(relpath_u)
296         self._pending.add(relpath_u)
297         self._count('objects_queued')
298         if self.is_ready:
299             if self._immediate:  # for tests
300                 self._turn_deque()
301             else:
302                 self._clock.callLater(0, self._turn_deque)
303
304     def _when_queue_is_empty(self):
305         return defer.succeed(None)
306
307     def _process(self, relpath_u):
308         # Uploader
309         self._log("_process(%r)" % (relpath_u,))
310         if relpath_u is None:
311             return
312         precondition(isinstance(relpath_u, unicode), relpath_u)
313         precondition(not relpath_u.endswith(u'/'), relpath_u)
314
315         d = defer.succeed(None)
316
317         def _maybe_upload(val, now=None):
318             if now is None:
319                 now = time.time()
320             fp = self._get_filepath(relpath_u)
321             pathinfo = get_pathinfo(unicode_from_filepath(fp))
322
323             self._log("about to remove %r from pending set %r" %
324                       (relpath_u, self._pending))
325             self._pending.remove(relpath_u)
326             encoded_path_u = magicpath.path2magic(relpath_u)
327
328             if not pathinfo.exists:
329                 # FIXME merge this with the 'isfile' case.
330                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
331                 self._count('objects_disappeared')
332
333                 db_entry = self._db.get_db_entry(relpath_u)
334                 if db_entry is None:
335                     return None
336
337                 last_downloaded_timestamp = now  # is this correct?
338
339                 if self._db.is_new_file(pathinfo, relpath_u):
340                     new_version = db_entry.version + 1
341                 else:
342                     self._log("Not uploading %r" % (relpath_u,))
343                     self._count('objects_not_uploaded')
344                     return
345
346                 metadata = { 'version': new_version,
347                              'deleted': True,
348                              'last_downloaded_timestamp': last_downloaded_timestamp }
349                 if db_entry.last_downloaded_uri is not None:
350                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
351
352                 empty_uploadable = Data("", self._client.convergence)
353                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
354                                                    metadata=metadata, overwrite=True)
355
356                 def _add_db_entry(filenode):
357                     filecap = filenode.get_uri()
358                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
359                     self._db.did_upload_version(relpath_u, new_version, filecap,
360                                                 last_downloaded_uri, last_downloaded_timestamp,
361                                                 pathinfo)
362                     self._count('files_uploaded')
363                 d2.addCallback(_add_db_entry)
364                 return d2
365             elif pathinfo.islink:
366                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
367                 return None
368             elif pathinfo.isdir:
369                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
370                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
371
372                 uploadable = Data("", self._client.convergence)
373                 encoded_path_u += magicpath.path2magic(u"/")
374                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
375                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
376                 def _succeeded(ign):
377                     self._log("created subdirectory %r" % (relpath_u,))
378                     self._count('directories_created')
379                 def _failed(f):
380                     self._log("failed to create subdirectory %r" % (relpath_u,))
381                     return f
382                 upload_d.addCallbacks(_succeeded, _failed)
383                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
384                 return upload_d
385             elif pathinfo.isfile:
386                 db_entry = self._db.get_db_entry(relpath_u)
387
388                 last_downloaded_timestamp = now
389
390                 if db_entry is None:
391                     new_version = 0
392                 elif self._db.is_new_file(pathinfo, relpath_u):
393                     new_version = db_entry.version + 1
394                 else:
395                     self._log("Not uploading %r" % (relpath_u,))
396                     self._count('objects_not_uploaded')
397                     return None
398
399                 metadata = { 'version': new_version,
400                              'last_downloaded_timestamp': last_downloaded_timestamp }
401                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
402                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
403
404                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
405                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
406                                                    metadata=metadata, overwrite=True)
407
408                 def _add_db_entry(filenode):
409                     filecap = filenode.get_uri()
410                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
411                     self._db.did_upload_version(relpath_u, new_version, filecap,
412                                                 last_downloaded_uri, last_downloaded_timestamp,
413                                                 pathinfo)
414                     self._count('files_uploaded')
415                 d2.addCallback(_add_db_entry)
416                 return d2
417             else:
418                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
419                 return None
420
421         d.addCallback(_maybe_upload)
422
423         def _succeeded(res):
424             self._count('objects_succeeded')
425             return res
426         def _failed(f):
427             self._count('objects_failed')
428             self._log("%s while processing %r" % (f, relpath_u))
429             return f
430         d.addCallbacks(_succeeded, _failed)
431         return d
432
433     def _get_metadata(self, encoded_path_u):
434         try:
435             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
436         except KeyError:
437             return Failure()
438         return d
439
440     def _get_filenode(self, encoded_path_u):
441         try:
442             d = self._upload_dirnode.get(encoded_path_u)
443         except KeyError:
444             return Failure()
445         return d
446
447
448 class WriteFileMixin(object):
449     FUDGE_SECONDS = 10.0
450
451     def _get_conflicted_filename(self, abspath_u):
452         return abspath_u + u".conflict"
453
454     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
455         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
456                   % (abspath_u, len(file_contents), is_conflict, now))
457
458         # 1. Write a temporary file, say .foo.tmp.
459         # 2. is_conflict determines whether this is an overwrite or a conflict.
460         # 3. Set the mtime of the replacement file to be T seconds before the
461         #    current local time.
462         # 4. Perform a file replacement with backup filename foo.backup,
463         #    replaced file foo, and replacement file .foo.tmp. If any step of
464         #    this operation fails, reclassify as a conflict and stop.
465         #
466         # Returns the path of the destination file.
467
468         precondition_abspath(abspath_u)
469         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
470         backup_path_u = abspath_u + u".backup"
471         if now is None:
472             now = time.time()
473
474         # ensure parent directory exists
475         head, tail = os.path.split(abspath_u)
476         mode = 0777 # XXX
477         fileutil.make_dirs(head, mode)
478
479         fileutil.write(replacement_path_u, file_contents)
480         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
481         if is_conflict:
482             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
483             return self._rename_conflicted_file(abspath_u, replacement_path_u)
484         else:
485             try:
486                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
487                 return abspath_u
488             except fileutil.ConflictError:
489                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
490
491     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
492         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
493
494         conflict_path_u = self._get_conflicted_filename(abspath_u)
495         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
496         if os.path.isfile(replacement_path_u):
497             print "%r exists" % (replacement_path_u,)
498         if os.path.isfile(conflict_path_u):
499             print "%r exists" % (conflict_path_u,)
500
501         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
502         return conflict_path_u
503
504     def _rename_deleted_file(self, abspath_u):
505         self._log('renaming deleted file to backup: %s' % (abspath_u,))
506         try:
507             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
508         except OSError:
509             self._log("Already gone: '%s'" % (abspath_u,))
510         return abspath_u
511
512
513 class Downloader(QueueMixin, WriteFileMixin):
514     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
515
516     def __init__(self, client, local_path_u, db, collective_dirnode,
517                  upload_readonly_dircap, clock, is_upload_pending):
518         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
519
520         if not IDirectoryNode.providedBy(collective_dirnode):
521             raise AssertionError("The URI in '%s' does not refer to a directory."
522                                  % os.path.join('private', 'collective_dircap'))
523         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
524             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
525                                  % os.path.join('private', 'collective_dircap'))
526
527         self._collective_dirnode = collective_dirnode
528         self._upload_readonly_dircap = upload_readonly_dircap
529         self._is_upload_pending = is_upload_pending
530
531         self._turn_delay = self.REMOTE_SCAN_INTERVAL
532
533     def start_scanning(self):
534         self._log("start_scanning")
535         files = self._db.get_all_relpaths()
536         self._log("all files %s" % files)
537
538         d = self._scan_remote_collective(scan_self=True)
539         d.addBoth(self._logcb, "after _scan_remote_collective 0")
540         self._turn_deque()
541         return d
542
543     def stop(self):
544         self._stopped = True
545         d = defer.succeed(None)
546         d.addCallback(lambda ign: self._lazy_tail)
547         return d
548
549     def _should_download(self, relpath_u, remote_version):
550         """
551         _should_download returns a bool indicating whether or not a remote object should be downloaded.
552         We check the remote metadata version against our magic-folder db version number;
553         latest version wins.
554         """
555         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
556         if magicpath.should_ignore_file(relpath_u):
557             self._log("nope")
558             return False
559         self._log("yep")
560         db_entry = self._db.get_db_entry(relpath_u)
561         if db_entry is None:
562             return True
563         self._log("version %r" % (db_entry.version,))
564         return (db_entry.version < remote_version)
565
566     def _get_local_latest(self, relpath_u):
567         """
568         _get_local_latest takes a unicode path string checks to see if this file object
569         exists in our magic-folder db; if not then return None
570         else check for an entry in our magic-folder db and return the version number.
571         """
572         if not self._get_filepath(relpath_u).exists():
573             return None
574         db_entry = self._db.get_db_entry(relpath_u)
575         return None if db_entry is None else db_entry.version
576
577     def _get_collective_latest_file(self, filename):
578         """
579         _get_collective_latest_file takes a file path pointing to a file managed by
580         magic-folder and returns a deferred that fires with the two tuple containing a
581         file node and metadata for the latest version of the file located in the
582         magic-folder collective directory.
583         """
584         collective_dirmap_d = self._collective_dirnode.list()
585         def scan_collective(result):
586             list_of_deferreds = []
587             for dir_name in result.keys():
588                 # XXX make sure it's a directory
589                 d = defer.succeed(None)
590                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
591                 list_of_deferreds.append(d)
592             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
593             return deferList
594         collective_dirmap_d.addCallback(scan_collective)
595         def highest_version(deferredList):
596             max_version = 0
597             metadata = None
598             node = None
599             for success, result in deferredList:
600                 if success:
601                     if result[1]['version'] > max_version:
602                         node, metadata = result
603                         max_version = result[1]['version']
604             return node, metadata
605         collective_dirmap_d.addCallback(highest_version)
606         return collective_dirmap_d
607
608     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
609         self._log("_scan_remote_dmd nickname %r" % (nickname,))
610         d = dirnode.list()
611         def scan_listing(listing_map):
612             for encoded_relpath_u in listing_map.keys():
613                 relpath_u = magicpath.magic2path(encoded_relpath_u)
614                 self._log("found %r" % (relpath_u,))
615
616                 file_node, metadata = listing_map[encoded_relpath_u]
617                 local_version = self._get_local_latest(relpath_u)
618                 remote_version = metadata.get('version', None)
619                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
620
621                 if local_version is None or remote_version is None or local_version < remote_version:
622                     self._log("%r added to download queue" % (relpath_u,))
623                     if scan_batch.has_key(relpath_u):
624                         scan_batch[relpath_u] += [(file_node, metadata)]
625                     else:
626                         scan_batch[relpath_u] = [(file_node, metadata)]
627
628         d.addCallback(scan_listing)
629         d.addBoth(self._logcb, "end of _scan_remote_dmd")
630         return d
631
632     def _scan_remote_collective(self, scan_self=False):
633         self._log("_scan_remote_collective")
634         scan_batch = {}  # path -> [(filenode, metadata)]
635
636         d = self._collective_dirnode.list()
637         def scan_collective(dirmap):
638             d2 = defer.succeed(None)
639             for dir_name in dirmap:
640                 (dirnode, metadata) = dirmap[dir_name]
641                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
642                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
643                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
644                     def _err(f, dir_name=dir_name):
645                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
646                         # XXX what should we do to make this failure more visible to users?
647                     d2.addErrback(_err)
648
649             return d2
650         d.addCallback(scan_collective)
651
652         def _filter_batch_to_deque(ign):
653             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
654             for relpath_u in scan_batch.keys():
655                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
656
657                 if self._should_download(relpath_u, metadata['version']):
658                     self._deque.append( (relpath_u, file_node, metadata) )
659                 else:
660                     self._log("Excluding %r" % (relpath_u,))
661                     self._count('objects_excluded')
662                     self._call_hook(None, 'processed')
663
664             self._log("deque after = %r" % (self._deque,))
665         d.addCallback(_filter_batch_to_deque)
666         return d
667
668     def _when_queue_is_empty(self):
669         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
670         d.addBoth(self._logcb, "after _scan_remote_collective 1")
671         d.addCallback(lambda ign: self._turn_deque())
672         return d
673
674     def _process(self, item, now=None):
675         # Downloader
676         self._log("_process(%r)" % (item,))
677         if now is None:
678             now = time.time()
679         (relpath_u, file_node, metadata) = item
680         fp = self._get_filepath(relpath_u)
681         abspath_u = unicode_from_filepath(fp)
682         conflict_path_u = self._get_conflicted_filename(abspath_u)
683
684         d = defer.succeed(None)
685
686         def do_update_db(written_abspath_u):
687             filecap = file_node.get_uri()
688             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
689             last_downloaded_uri = filecap
690             last_downloaded_timestamp = now
691             written_pathinfo = get_pathinfo(written_abspath_u)
692
693             if not written_pathinfo.exists and not metadata.get('deleted', False):
694                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
695
696             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
697                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
698             self._count('objects_downloaded')
699         def failed(f):
700             self._log("download failed: %s" % (str(f),))
701             self._count('objects_failed')
702             return f
703
704         if os.path.isfile(conflict_path_u):
705             def fail(res):
706                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
707             d.addCallback(fail)
708         else:
709             is_conflict = False
710             db_entry = self._db.get_db_entry(relpath_u)
711             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
712             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
713             if db_entry:
714                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
715                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
716                         is_conflict = True
717                         self._count('objects_conflicted')
718                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
719                     is_conflict = True
720                     self._count('objects_conflicted')
721                 elif self._is_upload_pending(relpath_u):
722                     is_conflict = True
723                     self._count('objects_conflicted')
724
725             if relpath_u.endswith(u"/"):
726                 if metadata.get('deleted', False):
727                     self._log("rmdir(%r) ignored" % (abspath_u,))
728                 else:
729                     self._log("mkdir(%r)" % (abspath_u,))
730                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
731                     d.addCallback(lambda ign: abspath_u)
732             else:
733                 if metadata.get('deleted', False):
734                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
735                 else:
736                     d.addCallback(lambda ign: file_node.download_best_version())
737                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
738                                                                                is_conflict=is_conflict))
739
740         d.addCallbacks(do_update_db, failed)
741
742         def trap_conflicts(f):
743             f.trap(ConflictError)
744             return None
745         d.addErrback(trap_conflicts)
746         return d