]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
0a5211aefe30552cb2a97eecf214b3e09e111438
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
63         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
64
65         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
66         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
67
68     def startService(self):
69         # TODO: why is this being called more than once?
70         if self.running:
71             return defer.succeed(None)
72         print "%r.startService" % (self,)
73         service.MultiService.startService(self)
74         return self.uploader.start_monitoring()
75
76     def ready(self):
77         """ready is used to signal us to start
78         processing the upload and download items...
79         """
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._stopped = False
119         self._turn_delay = 0
120
121     def _get_filepath(self, relpath_u):
122         self._log("_get_filepath(%r)" % (relpath_u,))
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _logcb(self, res, msg):
137         self._log("%s: %r" % (msg, res))
138         return res
139
140     def _log(self, msg):
141         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
142         self._client.log(s)
143         print s
144         #open("events", "ab+").write(msg)
145
146     def _turn_deque(self):
147         self._log("_turn_deque")
148         if self._stopped:
149             self._log("stopped")
150             return
151         try:
152             item = self._deque.pop()
153             self._log("popped %r" % (item,))
154             self._count('objects_queued', -1)
155         except IndexError:
156             self._log("deque is now empty")
157             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
158         else:
159             self._lazy_tail.addCallback(lambda ign: self._process(item))
160             self._lazy_tail.addBoth(self._call_hook, 'processed')
161             self._lazy_tail.addErrback(log.err)
162             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
163
164
165 class Uploader(QueueMixin):
166     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
167         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
168
169         self.is_ready = False
170
171         if not IDirectoryNode.providedBy(upload_dirnode):
172             raise AssertionError("The URI in '%s' does not refer to a directory."
173                                  % os.path.join('private', 'magic_folder_dircap'))
174         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
175             raise AssertionError("The URI in '%s' is not a writecap to a directory."
176                                  % os.path.join('private', 'magic_folder_dircap'))
177
178         self._upload_dirnode = upload_dirnode
179         self._inotify = get_inotify_module()
180         self._notifier = self._inotify.INotify()
181         self._pending = set()
182
183         if hasattr(self._notifier, 'set_pending_delay'):
184             self._notifier.set_pending_delay(pending_delay)
185
186         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
187         #
188         self.mask = ( self._inotify.IN_CREATE
189                     | self._inotify.IN_CLOSE_WRITE
190                     | self._inotify.IN_MOVED_TO
191                     | self._inotify.IN_MOVED_FROM
192                     | self._inotify.IN_DELETE
193                     | self._inotify.IN_ONLYDIR
194                     | IN_EXCL_UNLINK
195                     )
196         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
197                              recursive=True)
198
199     def start_monitoring(self):
200         self._log("start_monitoring")
201         d = defer.succeed(None)
202         d.addCallback(lambda ign: self._notifier.startReading())
203         d.addCallback(lambda ign: self._count('dirs_monitored'))
204         d.addBoth(self._call_hook, 'started')
205         return d
206
207     def stop(self):
208         self._log("stop")
209         self._notifier.stopReading()
210         self._count('dirs_monitored', -1)
211         if hasattr(self._notifier, 'wait_until_stopped'):
212             d = self._notifier.wait_until_stopped()
213         else:
214             d = defer.succeed(None)
215         d.addCallback(lambda ign: self._lazy_tail)
216         return d
217
218     def start_scanning(self):
219         self._log("start_scanning")
220         self.is_ready = True
221         self._pending = self._db.get_all_relpaths()
222         self._log("all_files %r" % (self._pending))
223         d = self._scan(u"")
224         def _add_pending(ign):
225             # This adds all of the files that were in the db but not already processed
226             # (normally because they have been deleted on disk).
227             self._log("adding %r" % (self._pending))
228             self._deque.extend(self._pending)
229         d.addCallback(_add_pending)
230         d.addCallback(lambda ign: self._turn_deque())
231         return d
232
233     def _scan(self, reldir_u):
234         self._log("scan %r" % (reldir_u,))
235         fp = self._get_filepath(reldir_u)
236         try:
237             children = listdir_filepath(fp)
238         except EnvironmentError:
239             raise Exception("WARNING: magic folder: permission denied on directory %s"
240                             % quote_filepath(fp))
241         except FilenameEncodingError:
242             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
243                             % quote_filepath(fp))
244
245         d = defer.succeed(None)
246         for child in children:
247             _assert(isinstance(child, unicode), child=child)
248             d.addCallback(lambda ign, child=child:
249                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
250             def _add_pending(relpath_u):
251                 if magicpath.should_ignore_file(relpath_u):
252                     return None
253
254                 self._pending.add(relpath_u)
255                 return relpath_u
256             d.addCallback(_add_pending)
257             # This call to _process doesn't go through the deque, and probably should.
258             d.addCallback(self._process)
259             d.addBoth(self._call_hook, 'processed')
260             d.addErrback(log.err)
261
262         return d
263
264     def _notify(self, opaque, path, events_mask):
265         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
266         relpath_u = self._get_relpath(path)
267
268         # We filter out IN_CREATE events not associated with a directory.
269         # Acting on IN_CREATE for files could cause us to read and upload
270         # a possibly-incomplete file before the application has closed it.
271         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
272         # It isn't possible to avoid watching for IN_CREATE at all, because
273         # it is the only event notified for a directory creation.
274
275         if ((events_mask & self._inotify.IN_CREATE) != 0 and
276             (events_mask & self._inotify.IN_ISDIR) == 0):
277             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
278             return
279         if relpath_u in self._pending:
280             self._log("ignoring event for %r (already pending)" % (relpath_u,))
281             return
282         if magicpath.should_ignore_file(relpath_u):
283             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
284             return
285
286         self._log("appending %r to deque" % (relpath_u,))
287         self._deque.append(relpath_u)
288         self._pending.add(relpath_u)
289         self._count('objects_queued')
290         if self.is_ready:
291             self._clock.callLater(0, self._turn_deque)
292
293     def _when_queue_is_empty(self):
294         return defer.succeed(None)
295
296     def _process(self, relpath_u):
297         # Uploader
298         self._log("_process(%r)" % (relpath_u,))
299         if relpath_u is None:
300             return
301         precondition(isinstance(relpath_u, unicode), relpath_u)
302         precondition(not relpath_u.endswith(u'/'), relpath_u)
303
304         d = defer.succeed(None)
305
306         def _maybe_upload(val, now=None):
307             if now is None:
308                 now = time.time()
309             fp = self._get_filepath(relpath_u)
310             pathinfo = get_pathinfo(unicode_from_filepath(fp))
311
312             self._log("about to remove %r from pending set %r" %
313                       (relpath_u, self._pending))
314             self._pending.remove(relpath_u)
315             encoded_path_u = magicpath.path2magic(relpath_u)
316
317             if not pathinfo.exists:
318                 # FIXME merge this with the 'isfile' case.
319                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
320                 self._count('objects_disappeared')
321                 if not self._db.check_file_db_exists(relpath_u):
322                     return None
323
324                 last_downloaded_timestamp = now
325                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
326
327                 current_version = self._db.get_local_file_version(relpath_u)
328                 if current_version is None:
329                     new_version = 0
330                 elif self._db.is_new_file(pathinfo, relpath_u):
331                     new_version = current_version + 1
332                 else:
333                     self._log("Not uploading %r" % (relpath_u,))
334                     self._count('objects_not_uploaded')
335                     return
336
337                 metadata = { 'version': new_version,
338                              'deleted': True,
339                              'last_downloaded_timestamp': last_downloaded_timestamp }
340                 if last_downloaded_uri is not None:
341                     metadata['last_downloaded_uri'] = last_downloaded_uri
342
343                 empty_uploadable = Data("", self._client.convergence)
344                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
345                                                    metadata=metadata, overwrite=True)
346
347                 def _add_db_entry(filenode):
348                     filecap = filenode.get_uri()
349                     self._db.did_upload_version(relpath_u, new_version, filecap,
350                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
351                     self._count('files_uploaded')
352                 d2.addCallback(_add_db_entry)
353                 return d2
354             elif pathinfo.islink:
355                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
356                 return None
357             elif pathinfo.isdir:
358                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
359                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
360
361                 uploadable = Data("", self._client.convergence)
362                 encoded_path_u += magicpath.path2magic(u"/")
363                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
364                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
365                 def _succeeded(ign):
366                     self._log("created subdirectory %r" % (relpath_u,))
367                     self._count('directories_created')
368                 def _failed(f):
369                     self._log("failed to create subdirectory %r" % (relpath_u,))
370                     return f
371                 upload_d.addCallbacks(_succeeded, _failed)
372                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
373                 return upload_d
374             elif pathinfo.isfile:
375                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
376                 last_downloaded_timestamp = now
377
378                 current_version = self._db.get_local_file_version(relpath_u)
379                 if current_version is None:
380                     new_version = 0
381                 elif self._db.is_new_file(pathinfo, relpath_u):
382                     new_version = current_version + 1
383                 else:
384                     self._log("Not uploading %r" % (relpath_u,))
385                     self._count('objects_not_uploaded')
386                     return None
387
388                 metadata = { 'version': new_version,
389                              'last_downloaded_timestamp': last_downloaded_timestamp }
390                 if last_downloaded_uri is not None:
391                     metadata['last_downloaded_uri'] = last_downloaded_uri
392
393                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
394                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
395                                                    metadata=metadata, overwrite=True)
396
397                 def _add_db_entry(filenode):
398                     filecap = filenode.get_uri()
399                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
400                     self._db.did_upload_version(relpath_u, new_version, filecap,
401                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
402                     self._count('files_uploaded')
403                 d2.addCallback(_add_db_entry)
404                 return d2
405             else:
406                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
407                 return None
408
409         d.addCallback(_maybe_upload)
410
411         def _succeeded(res):
412             self._count('objects_succeeded')
413             return res
414         def _failed(f):
415             self._count('objects_failed')
416             self._log("%s while processing %r" % (f, relpath_u))
417             return f
418         d.addCallbacks(_succeeded, _failed)
419         return d
420
421     def _get_metadata(self, encoded_path_u):
422         try:
423             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
424         except KeyError:
425             return Failure()
426         return d
427
428     def _get_filenode(self, encoded_path_u):
429         try:
430             d = self._upload_dirnode.get(encoded_path_u)
431         except KeyError:
432             return Failure()
433         return d
434
435
436 class WriteFileMixin(object):
437     FUDGE_SECONDS = 10.0
438
439     def _get_conflicted_filename(self, abspath_u):
440         return abspath_u + u".conflict"
441
442     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
443         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
444                   % (abspath_u, len(file_contents), is_conflict, now))
445
446         # 1. Write a temporary file, say .foo.tmp.
447         # 2. is_conflict determines whether this is an overwrite or a conflict.
448         # 3. Set the mtime of the replacement file to be T seconds before the
449         #    current local time.
450         # 4. Perform a file replacement with backup filename foo.backup,
451         #    replaced file foo, and replacement file .foo.tmp. If any step of
452         #    this operation fails, reclassify as a conflict and stop.
453         #
454         # Returns the path of the destination file.
455
456         precondition_abspath(abspath_u)
457         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
458         backup_path_u = abspath_u + u".backup"
459         if now is None:
460             now = time.time()
461
462         # ensure parent directory exists
463         head, tail = os.path.split(abspath_u)
464         mode = 0777 # XXX
465         fileutil.make_dirs(head, mode)
466
467         fileutil.write(replacement_path_u, file_contents)
468         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
469         if is_conflict:
470             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
471             return self._rename_conflicted_file(abspath_u, replacement_path_u)
472         else:
473             try:
474                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
475                 return abspath_u
476             except fileutil.ConflictError:
477                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
478
479     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
480         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
481
482         conflict_path_u = self._get_conflicted_filename(abspath_u)
483         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
484         if os.path.isfile(replacement_path_u):
485             print "%r exists" % (replacement_path_u,)
486         if os.path.isfile(conflict_path_u):
487             print "%r exists" % (conflict_path_u,)
488
489         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
490         return conflict_path_u
491
492     def _rename_deleted_file(self, abspath_u):
493         self._log('renaming deleted file to backup: %s' % (abspath_u,))
494         try:
495             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
496         except IOError:
497             # XXX is this the correct error?
498             self._log("Already gone: '%s'" % (abspath_u,))
499         return abspath_u
500
501
502 class Downloader(QueueMixin, WriteFileMixin):
503     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
504
505     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
506         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
507
508         if not IDirectoryNode.providedBy(collective_dirnode):
509             raise AssertionError("The URI in '%s' does not refer to a directory."
510                                  % os.path.join('private', 'collective_dircap'))
511         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
512             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
513                                  % os.path.join('private', 'collective_dircap'))
514
515         self._collective_dirnode = collective_dirnode
516         self._upload_readonly_dircap = upload_readonly_dircap
517
518         self._turn_delay = self.REMOTE_SCAN_INTERVAL
519
520     def start_scanning(self):
521         self._log("start_scanning")
522         files = self._db.get_all_relpaths()
523         self._log("all files %s" % files)
524
525         d = self._scan_remote_collective()
526         d.addBoth(self._logcb, "after _scan_remote_collective 0")
527         self._turn_deque()
528         return d
529
530     def stop(self):
531         self._stopped = True
532         d = defer.succeed(None)
533         d.addCallback(lambda ign: self._lazy_tail)
534         return d
535
536     def _should_download(self, relpath_u, remote_version):
537         """
538         _should_download returns a bool indicating whether or not a remote object should be downloaded.
539         We check the remote metadata version against our magic-folder db version number;
540         latest version wins.
541         """
542         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
543         if magicpath.should_ignore_file(relpath_u):
544             self._log("nope")
545             return False
546         self._log("yep")
547         v = self._db.get_local_file_version(relpath_u)
548         self._log("v = %r" % (v,))
549         return (v is None or v < remote_version)
550
551     def _get_local_latest(self, relpath_u):
552         """
553         _get_local_latest takes a unicode path string checks to see if this file object
554         exists in our magic-folder db; if not then return None
555         else check for an entry in our magic-folder db and return the version number.
556         """
557         if not self._get_filepath(relpath_u).exists():
558             return None
559         return self._db.get_local_file_version(relpath_u)
560
561     def _get_collective_latest_file(self, filename):
562         """
563         _get_collective_latest_file takes a file path pointing to a file managed by
564         magic-folder and returns a deferred that fires with the two tuple containing a
565         file node and metadata for the latest version of the file located in the
566         magic-folder collective directory.
567         """
568         collective_dirmap_d = self._collective_dirnode.list()
569         def scan_collective(result):
570             list_of_deferreds = []
571             for dir_name in result.keys():
572                 # XXX make sure it's a directory
573                 d = defer.succeed(None)
574                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
575                 list_of_deferreds.append(d)
576             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
577             return deferList
578         collective_dirmap_d.addCallback(scan_collective)
579         def highest_version(deferredList):
580             max_version = 0
581             metadata = None
582             node = None
583             for success, result in deferredList:
584                 if success:
585                     if result[1]['version'] > max_version:
586                         node, metadata = result
587                         max_version = result[1]['version']
588             return node, metadata
589         collective_dirmap_d.addCallback(highest_version)
590         return collective_dirmap_d
591
592     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
593         self._log("_scan_remote_dmd nickname %r" % (nickname,))
594         d = dirnode.list()
595         def scan_listing(listing_map):
596             for encoded_relpath_u in listing_map.keys():
597                 relpath_u = magicpath.magic2path(encoded_relpath_u)
598                 self._log("found %r" % (relpath_u,))
599
600                 file_node, metadata = listing_map[encoded_relpath_u]
601                 local_version = self._get_local_latest(relpath_u)
602                 remote_version = metadata.get('version', None)
603                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
604
605                 if local_version is None or remote_version is None or local_version < remote_version:
606                     self._log("%r added to download queue" % (relpath_u,))
607                     if scan_batch.has_key(relpath_u):
608                         scan_batch[relpath_u] += [(file_node, metadata)]
609                     else:
610                         scan_batch[relpath_u] = [(file_node, metadata)]
611
612         d.addCallback(scan_listing)
613         d.addBoth(self._logcb, "end of _scan_remote_dmd")
614         return d
615
616     def _scan_remote_collective(self):
617         self._log("_scan_remote_collective")
618         scan_batch = {}  # path -> [(filenode, metadata)]
619
620         d = self._collective_dirnode.list()
621         def scan_collective(dirmap):
622             d2 = defer.succeed(None)
623             for dir_name in dirmap:
624                 (dirnode, metadata) = dirmap[dir_name]
625                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
626                     d2.addCallback(lambda ign, dir_name=dir_name:
627                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
628                     def _err(f):
629                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
630                         # XXX what should we do to make this failure more visible to users?
631                     d2.addErrback(_err)
632
633             return d2
634         d.addCallback(scan_collective)
635
636         def _filter_batch_to_deque(ign):
637             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
638             for relpath_u in scan_batch.keys():
639                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
640
641                 if self._should_download(relpath_u, metadata['version']):
642                     self._deque.append( (relpath_u, file_node, metadata) )
643                 else:
644                     self._log("Excluding %r" % (relpath_u,))
645                     self._count('objects_excluded')
646                     self._call_hook(None, 'processed')
647
648             self._log("deque after = %r" % (self._deque,))
649         d.addCallback(_filter_batch_to_deque)
650         return d
651
652     def _when_queue_is_empty(self):
653         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
654         d.addBoth(self._logcb, "after _scan_remote_collective 1")
655         d.addCallback(lambda ign: self._turn_deque())
656         return d
657
658     def _process(self, item, now=None):
659         # Downloader
660         self._log("_process(%r)" % (item,))
661         if now is None:
662             now = time.time()
663         (relpath_u, file_node, metadata) = item
664         fp = self._get_filepath(relpath_u)
665         abspath_u = unicode_from_filepath(fp)
666         conflict_path_u = self._get_conflicted_filename(abspath_u)
667
668         d = defer.succeed(None)
669
670         def do_update_db(written_abspath_u):
671             filecap = file_node.get_uri()
672             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
673             last_downloaded_uri = filecap
674             last_downloaded_timestamp = now
675             written_pathinfo = get_pathinfo(written_abspath_u)
676
677             if not written_pathinfo.exists and not metadata.get('deleted', False):
678                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
679
680             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
681                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
682             self._count('objects_downloaded')
683         def failed(f):
684             self._log("download failed: %s" % (str(f),))
685             self._count('objects_failed')
686             return f
687
688         if os.path.isfile(conflict_path_u):
689             def fail(res):
690                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
691             d.addCallback(fail)
692         else:
693             is_conflict = False
694             if self._db.check_file_db_exists(relpath_u):
695                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
696                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
697                 print "metadata %r" % (metadata,)
698                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
699                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
700                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
701                         is_conflict = True
702                         self._count('objects_conflicted')
703
704                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
705                 #local_last_uploaded_uri = ...
706
707             if relpath_u.endswith(u"/"):
708                 if metadata.get('deleted', False):
709                     self._log("rmdir(%r) ignored" % (abspath_u,))
710                 else:
711                     self._log("mkdir(%r)" % (abspath_u,))
712                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
713                     d.addCallback(lambda ign: abspath_u)
714             else:
715                 if metadata.get('deleted', False):
716                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
717                 else:
718                     d.addCallback(lambda ign: file_node.download_best_version())
719                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
720                                                                                is_conflict=is_conflict))
721
722         d.addCallbacks(do_update_db, failed)
723
724         def trap_conflicts(f):
725             f.trap(ConflictError)
726             return None
727         d.addErrback(trap_conflicts)
728         return d