]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Add precondition to Uploader._process.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
69
70     def startService(self):
71         # TODO: why is this being called more than once?
72         if self.running:
73             return defer.succeed(None)
74         print "%r.startService" % (self,)
75         service.MultiService.startService(self)
76         return self.uploader.start_monitoring()
77
78     def ready(self):
79         """ready is used to signal us to start
80         processing the upload and download items...
81         """
82         self.is_ready = True
83         d = self.uploader.start_scanning()
84         d2 = self.downloader.start_scanning()
85         d.addCallback(lambda ign: d2)
86         return d
87
88     def finish(self):
89         print "finish"
90         d = self.uploader.stop()
91         d2 = self.downloader.stop()
92         d.addCallback(lambda ign: d2)
93         return d
94
95     def remove_service(self):
96         return service.MultiService.disownServiceParent(self)
97
98
99 class QueueMixin(HookMixin):
100     def __init__(self, client, local_path_u, db, name, clock):
101         self._client = client
102         self._local_path_u = local_path_u
103         self._local_filepath = to_filepath(local_path_u)
104         self._db = db
105         self._name = name
106         self._clock = clock
107         self._hooks = {'processed': None, 'started': None}
108         self.started_d = self.set_hook('started')
109
110         if not self._local_filepath.exists():
111             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112                                  "but there is no directory at that location."
113                                  % quote_local_unicode_path(self._local_path_u))
114         if not self._local_filepath.isdir():
115             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116                                  "but the thing at that location is not a directory."
117                                  % quote_local_unicode_path(self._local_path_u))
118
119         self._deque = deque()
120         self._lazy_tail = defer.succeed(None)
121         self._pending = set()
122         self._stopped = False
123         self._turn_delay = 0
124
125     def _get_filepath(self, relpath_u):
126         self._log("_get_filepath(%r)" % (relpath_u,))
127         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
128
129     def _get_relpath(self, filepath):
130         self._log("_get_relpath(%r)" % (filepath,))
131         segments = unicode_segments_from(filepath, self._local_filepath)
132         self._log("segments = %r" % (segments,))
133         return u"/".join(segments)
134
135     def _count(self, counter_name, delta=1):
136         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
137         self._log("%s += %r" % (counter_name, delta))
138         self._client.stats_provider.count(ctr, delta)
139
140     def _logcb(self, res, msg):
141         self._log("%s: %r" % (msg, res))
142         return res
143
144     def _log(self, msg):
145         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
146         self._client.log(s)
147         print s
148         #open("events", "ab+").write(msg)
149
150     def _append_to_deque(self, relpath_u):
151         self._log("_append_to_deque(%r)" % (relpath_u,))
152         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
153             return
154         self._deque.append(relpath_u)
155         self._pending.add(relpath_u)
156         self._count('objects_queued')
157         if self.is_ready:
158             self._clock.callLater(0, self._turn_deque)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
181         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
182
183         self.is_ready = False
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195
196         if hasattr(self._notifier, 'set_pending_delay'):
197             self._notifier.set_pending_delay(pending_delay)
198
199         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
200         #
201         self.mask = ( self._inotify.IN_CREATE
202                     | self._inotify.IN_CLOSE_WRITE
203                     | self._inotify.IN_MOVED_TO
204                     | self._inotify.IN_MOVED_FROM
205                     | self._inotify.IN_DELETE
206                     | self._inotify.IN_ONLYDIR
207                     | IN_EXCL_UNLINK
208                     )
209         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
210                              recursive=True)
211
212     def start_monitoring(self):
213         self._log("start_monitoring")
214         d = defer.succeed(None)
215         d.addCallback(lambda ign: self._notifier.startReading())
216         d.addCallback(lambda ign: self._count('dirs_monitored'))
217         d.addBoth(self._call_hook, 'started')
218         return d
219
220     def stop(self):
221         self._log("stop")
222         self._notifier.stopReading()
223         self._count('dirs_monitored', -1)
224         if hasattr(self._notifier, 'wait_until_stopped'):
225             d = self._notifier.wait_until_stopped()
226         else:
227             d = defer.succeed(None)
228         d.addCallback(lambda ign: self._lazy_tail)
229         return d
230
231     def start_scanning(self):
232         self._log("start_scanning")
233         self.is_ready = True
234         self._pending = self._db.get_all_relpaths()
235         self._log("all_files %r" % (self._pending))
236         d = self._scan(u"")
237         def _add_pending(ign):
238             # This adds all of the files that were in the db but not already processed
239             # (normally because they have been deleted on disk).
240             self._log("adding %r" % (self._pending))
241             self._deque.extend(self._pending)
242         d.addCallback(_add_pending)
243         d.addCallback(lambda ign: self._turn_deque())
244         return d
245
246     def _scan(self, reldir_u):
247         self._log("scan %r" % (reldir_u,))
248         fp = self._get_filepath(reldir_u)
249         try:
250             children = listdir_filepath(fp)
251         except EnvironmentError:
252             raise Exception("WARNING: magic folder: permission denied on directory %s"
253                             % quote_filepath(fp))
254         except FilenameEncodingError:
255             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
256                             % quote_filepath(fp))
257
258         d = defer.succeed(None)
259         for child in children:
260             _assert(isinstance(child, unicode), child=child)
261             d.addCallback(lambda ign, child=child:
262                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
263             def _add_pending(relpath_u):
264                 if magicpath.should_ignore_file(relpath_u):
265                     return None
266
267                 self._pending.add(relpath_u)
268                 return relpath_u
269             d.addCallback(_add_pending)
270             # This call to _process doesn't go through the deque, and probably should.
271             d.addCallback(self._process)
272             d.addBoth(self._call_hook, 'processed')
273             d.addErrback(log.err)
274
275         return d
276
277     def _notify(self, opaque, path, events_mask):
278         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
279
280         # We filter out IN_CREATE events not associated with a directory.
281         # Acting on IN_CREATE for files could cause us to read and upload
282         # a possibly-incomplete file before the application has closed it.
283         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
284         # It isn't possible to avoid watching for IN_CREATE at all, because
285         # it is the only event notified for a directory creation.
286
287         if ((events_mask & self._inotify.IN_CREATE) != 0 and
288             (events_mask & self._inotify.IN_ISDIR) == 0):
289             self._log("ignoring inotify event for creation of file %r\n" % (path,))
290             return
291
292         relpath_u = self._get_relpath(path)
293         self._append_to_deque(relpath_u)
294
295     def _when_queue_is_empty(self):
296         return defer.succeed(None)
297
298     def _process(self, relpath_u):
299         self._log("_process(%r)" % (relpath_u,))
300         if relpath_u is None:
301             return
302         precondition(isinstance(relpath_u, unicode), relpath_u)
303         precondition(not relpath_u.endswith(u'/'), relpath_u)
304
305         d = defer.succeed(None)
306
307         def _maybe_upload(val, now=None):
308             if now is None:
309                 now = time.time()
310             fp = self._get_filepath(relpath_u)
311             pathinfo = get_pathinfo(unicode_from_filepath(fp))
312
313             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
314             self._pending.remove(relpath_u)
315             encoded_path_u = magicpath.path2magic(relpath_u)
316
317             if not pathinfo.exists:
318                 # FIXME merge this with the 'isfile' case.
319                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
320                 self._count('objects_disappeared')
321                 if not self._db.check_file_db_exists(relpath_u):
322                     return None
323
324                 last_downloaded_timestamp = now
325                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
326
327                 current_version = self._db.get_local_file_version(relpath_u)
328                 if current_version is None:
329                     new_version = 0
330                 elif self._db.is_new_file(pathinfo, relpath_u):
331                     new_version = current_version + 1
332                 else:
333                     self._log("Not uploading %r" % (relpath_u,))
334                     self._count('objects_not_uploaded')
335                     return
336
337                 metadata = { 'version': new_version,
338                              'deleted': True,
339                              'last_downloaded_timestamp': last_downloaded_timestamp }
340                 if last_downloaded_uri is not None:
341                     metadata['last_downloaded_uri'] = last_downloaded_uri
342
343                 empty_uploadable = Data("", self._client.convergence)
344                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
345                                                    metadata=metadata, overwrite=True)
346
347                 def _add_db_entry(filenode):
348                     filecap = filenode.get_uri()
349                     self._db.did_upload_version(relpath_u, new_version, filecap,
350                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
351                     self._count('files_uploaded')
352                 d2.addCallback(_add_db_entry)
353                 return d2
354             elif pathinfo.islink:
355                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
356                 return None
357             elif pathinfo.isdir:
358                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
359                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
360
361                 uploadable = Data("", self._client.convergence)
362                 encoded_path_u += magicpath.path2magic(u"/")
363                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
364                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
365                 def _succeeded(ign):
366                     self._log("created subdirectory %r" % (relpath_u,))
367                     self._count('directories_created')
368                 def _failed(f):
369                     self._log("failed to create subdirectory %r" % (relpath_u,))
370                     return f
371                 upload_d.addCallbacks(_succeeded, _failed)
372                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
373                 return upload_d
374             elif pathinfo.isfile:
375                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
376                 last_downloaded_timestamp = now
377
378                 current_version = self._db.get_local_file_version(relpath_u)
379                 if current_version is None:
380                     new_version = 0
381                 elif self._db.is_new_file(pathinfo, relpath_u):
382                     new_version = current_version + 1
383                 else:
384                     self._log("Not uploading %r" % (relpath_u,))
385                     self._count('objects_not_uploaded')
386                     return None
387
388                 metadata = { 'version': new_version,
389                              'last_downloaded_timestamp': last_downloaded_timestamp }
390                 if last_downloaded_uri is not None:
391                     metadata['last_downloaded_uri'] = last_downloaded_uri
392
393                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
394                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
395                                                    metadata=metadata, overwrite=True)
396
397                 def _add_db_entry(filenode):
398                     filecap = filenode.get_uri()
399                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
400                     self._db.did_upload_version(relpath_u, new_version, filecap,
401                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
402                     self._count('files_uploaded')
403                 d2.addCallback(_add_db_entry)
404                 return d2
405             else:
406                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
407                 return None
408
409         d.addCallback(_maybe_upload)
410
411         def _succeeded(res):
412             self._count('objects_succeeded')
413             return res
414         def _failed(f):
415             self._count('objects_failed')
416             self._log("%s while processing %r" % (f, relpath_u))
417             return f
418         d.addCallbacks(_succeeded, _failed)
419         return d
420
421     def _get_metadata(self, encoded_path_u):
422         try:
423             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
424         except KeyError:
425             return Failure()
426         return d
427
428     def _get_filenode(self, encoded_path_u):
429         try:
430             d = self._upload_dirnode.get(encoded_path_u)
431         except KeyError:
432             return Failure()
433         return d
434
435
436 class WriteFileMixin(object):
437     FUDGE_SECONDS = 10.0
438
439     def _get_conflicted_filename(self, abspath_u):
440         return abspath_u + u".conflict"
441
442     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
443         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
444                   % (abspath_u, len(file_contents), is_conflict, now))
445
446         # 1. Write a temporary file, say .foo.tmp.
447         # 2. is_conflict determines whether this is an overwrite or a conflict.
448         # 3. Set the mtime of the replacement file to be T seconds before the
449         #    current local time.
450         # 4. Perform a file replacement with backup filename foo.backup,
451         #    replaced file foo, and replacement file .foo.tmp. If any step of
452         #    this operation fails, reclassify as a conflict and stop.
453         #
454         # Returns the path of the destination file.
455
456         precondition_abspath(abspath_u)
457         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
458         backup_path_u = abspath_u + u".backup"
459         if now is None:
460             now = time.time()
461
462         # ensure parent directory exists
463         head, tail = os.path.split(abspath_u)
464         mode = 0777 # XXX
465         fileutil.make_dirs(head, mode)
466
467         fileutil.write(replacement_path_u, file_contents)
468         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
469         if is_conflict:
470             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
471             return self._rename_conflicted_file(abspath_u, replacement_path_u)
472         else:
473             try:
474                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
475                 return abspath_u
476             except fileutil.ConflictError:
477                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
478
479     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
480         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
481
482         conflict_path_u = self._get_conflicted_filename(abspath_u)
483         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
484         if os.path.isfile(replacement_path_u):
485             print "%r exists" % (replacement_path_u,)
486         if os.path.isfile(conflict_path_u):
487             print "%r exists" % (conflict_path_u,)
488
489         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
490         return conflict_path_u
491
492     def _rename_deleted_file(self, abspath_u):
493         self._log('renaming deleted file to backup: %s' % (abspath_u,))
494         try:
495             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
496         except IOError:
497             # XXX is this the correct error?
498             self._log("Already gone: '%s'" % (abspath_u,))
499         return abspath_u
500
501
502 class Downloader(QueueMixin, WriteFileMixin):
503     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
504
505     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
506         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
507
508         if not IDirectoryNode.providedBy(collective_dirnode):
509             raise AssertionError("The URI in '%s' does not refer to a directory."
510                                  % os.path.join('private', 'collective_dircap'))
511         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
512             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
513                                  % os.path.join('private', 'collective_dircap'))
514
515         self._collective_dirnode = collective_dirnode
516         self._upload_readonly_dircap = upload_readonly_dircap
517
518         self._turn_delay = self.REMOTE_SCAN_INTERVAL
519         self._download_scan_batch = {} # path -> [(filenode, metadata)]
520
521     def start_scanning(self):
522         self._log("start_scanning")
523         files = self._db.get_all_relpaths()
524         self._log("all files %s" % files)
525
526         d = self._scan_remote_collective()
527         d.addBoth(self._logcb, "after _scan_remote_collective 0")
528         self._turn_deque()
529         return d
530
531     def stop(self):
532         self._stopped = True
533         d = defer.succeed(None)
534         d.addCallback(lambda ign: self._lazy_tail)
535         return d
536
537     def _should_download(self, relpath_u, remote_version):
538         """
539         _should_download returns a bool indicating whether or not a remote object should be downloaded.
540         We check the remote metadata version against our magic-folder db version number;
541         latest version wins.
542         """
543         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
544         if magicpath.should_ignore_file(relpath_u):
545             self._log("nope")
546             return False
547         self._log("yep")
548         v = self._db.get_local_file_version(relpath_u)
549         self._log("v = %r" % (v,))
550         return (v is None or v < remote_version)
551
552     def _get_local_latest(self, relpath_u):
553         """
554         _get_local_latest takes a unicode path string checks to see if this file object
555         exists in our magic-folder db; if not then return None
556         else check for an entry in our magic-folder db and return the version number.
557         """
558         if not self._get_filepath(relpath_u).exists():
559             return None
560         return self._db.get_local_file_version(relpath_u)
561
562     def _get_collective_latest_file(self, filename):
563         """
564         _get_collective_latest_file takes a file path pointing to a file managed by
565         magic-folder and returns a deferred that fires with the two tuple containing a
566         file node and metadata for the latest version of the file located in the
567         magic-folder collective directory.
568         """
569         collective_dirmap_d = self._collective_dirnode.list()
570         def scan_collective(result):
571             list_of_deferreds = []
572             for dir_name in result.keys():
573                 # XXX make sure it's a directory
574                 d = defer.succeed(None)
575                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
576                 list_of_deferreds.append(d)
577             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
578             return deferList
579         collective_dirmap_d.addCallback(scan_collective)
580         def highest_version(deferredList):
581             max_version = 0
582             metadata = None
583             node = None
584             for success, result in deferredList:
585                 if success:
586                     if result[1]['version'] > max_version:
587                         node, metadata = result
588                         max_version = result[1]['version']
589             return node, metadata
590         collective_dirmap_d.addCallback(highest_version)
591         return collective_dirmap_d
592
593     def _append_to_batch(self, name, file_node, metadata):
594         if self._download_scan_batch.has_key(name):
595             self._download_scan_batch[name] += [(file_node, metadata)]
596         else:
597             self._download_scan_batch[name] = [(file_node, metadata)]
598
599     def _scan_remote(self, nickname, dirnode):
600         self._log("_scan_remote nickname %r" % (nickname,))
601         d = dirnode.list()
602         def scan_listing(listing_map):
603             for encoded_relpath_u in listing_map.keys():
604                 relpath_u = magicpath.magic2path(encoded_relpath_u)
605                 self._log("found %r" % (relpath_u,))
606
607                 file_node, metadata = listing_map[encoded_relpath_u]
608                 local_version = self._get_local_latest(relpath_u)
609                 remote_version = metadata.get('version', None)
610                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
611                 if local_version is None or remote_version is None or local_version < remote_version:
612                     self._log("%r added to download queue" % (relpath_u,))
613                     self._append_to_batch(relpath_u, file_node, metadata)
614         d.addCallback(scan_listing)
615         d.addBoth(self._logcb, "end of _scan_remote")
616         return d
617
618     def _scan_remote_collective(self):
619         self._log("_scan_remote_collective")
620         self._download_scan_batch = {} # XXX
621
622         d = self._collective_dirnode.list()
623         def scan_collective(dirmap):
624             d2 = defer.succeed(None)
625             for dir_name in dirmap:
626                 (dirnode, metadata) = dirmap[dir_name]
627                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
628                     d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
629                     def _err(f):
630                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
631                         # XXX what should we do to make this failure more visible to users?
632                     d2.addErrback(_err)
633             return d2
634         d.addCallback(scan_collective)
635         d.addCallback(self._filter_scan_batch)
636         d.addCallback(self._add_batch_to_download_queue)
637         return d
638
639     def _add_batch_to_download_queue(self, result):
640         self._log("result = %r" % (result,))
641         self._log("deque = %r" % (self._deque,))
642         self._deque.extend(result)
643         self._log("deque after = %r" % (self._deque,))
644         self._count('objects_queued', len(result))
645         self._log("pending = %r" % (self._pending,))
646         self._pending.update(map(lambda x: x[0], result))
647         self._log("pending after = %r" % (self._pending,))
648
649     def _filter_scan_batch(self, result):
650         self._log("_filter_scan_batch")
651         extension = [] # consider whether this should be a dict
652         for relpath_u in self._download_scan_batch.keys():
653             if relpath_u in self._pending:
654                 continue
655             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
656             if self._should_download(relpath_u, metadata['version']):
657                 extension += [(relpath_u, file_node, metadata)]
658             else:
659                 self._log("Excluding %r" % (relpath_u,))
660                 self._count('objects_excluded')
661                 self._call_hook(None, 'processed')
662         return extension
663
664     def _when_queue_is_empty(self):
665         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
666         d.addBoth(self._logcb, "after _scan_remote_collective 1")
667         d.addCallback(lambda ign: self._turn_deque())
668         return d
669
670     def _process(self, item, now=None):
671         self._log("_process(%r)" % (item,))
672         if now is None:
673             now = time.time()
674         (relpath_u, file_node, metadata) = item
675         fp = self._get_filepath(relpath_u)
676         abspath_u = unicode_from_filepath(fp)
677         conflict_path_u = self._get_conflicted_filename(abspath_u)
678         d = defer.succeed(None)
679
680         def do_update_db(written_abspath_u):
681             filecap = file_node.get_uri()
682             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
683             last_downloaded_uri = filecap
684             last_downloaded_timestamp = now
685             written_pathinfo = get_pathinfo(written_abspath_u)
686
687             if not written_pathinfo.exists and not metadata.get('deleted', False):
688                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
689
690             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
691                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
692             self._count('objects_downloaded')
693         def failed(f):
694             self._log("download failed: %s" % (str(f),))
695             self._count('objects_failed')
696             return f
697
698         if os.path.isfile(conflict_path_u):
699             def fail(res):
700                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
701             d.addCallback(fail)
702         else:
703             is_conflict = False
704             if self._db.check_file_db_exists(relpath_u):
705                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
706                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
707                 print "metadata %r" % (metadata,)
708                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
709                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
710                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
711                         is_conflict = True
712                         self._count('objects_conflicted')
713
714                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
715                 #local_last_uploaded_uri = ...
716
717             if relpath_u.endswith(u"/"):
718                 if metadata.get('deleted', False):
719                     self._log("rmdir(%r) ignored" % (abspath_u,))
720                 else:
721                     self._log("mkdir(%r)" % (abspath_u,))
722                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
723                     d.addCallback(lambda ign: abspath_u)
724             else:
725                 if metadata.get('deleted', False):
726                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
727                 else:
728                     d.addCallback(lambda ign: file_node.download_best_version())
729                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
730                                                                                is_conflict=is_conflict))
731
732         d.addCallbacks(do_update_db, failed)
733
734         def remove_from_pending(res):
735             self._pending.remove(relpath_u)
736             return res
737         d.addBoth(remove_from_pending)
738         def trap_conflicts(f):
739             f.trap(ConflictError)
740             return None
741         d.addErrback(trap_conflicts)
742         return d