import sys
+from collections import deque
from twisted.internet import defer
from twisted.python.filepath import FilePath
from twisted.application import service
-from foolscap.api import eventually
from allmydata.interfaces import IDirectoryNode
class DropUploader(service.MultiService):
name = 'drop-upload'
- def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None):
+ def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None, deque_max_len=100):
service.MultiService.__init__(self)
try:
"could not be represented in the filesystem encoding."
% quote_output(local_dir_utf8))
+ self._pending = set()
self._client = client
self._stats_provider = client.stats_provider
self._convergence = client.convergence
self._local_path = FilePath(local_dir)
+ self._upload_deque = deque(maxlen=deque_max_len)
self.is_upload_ready = False
if inotify is None:
processing the upload items...
"""
self.is_upload_ready = True
+ self._process_deque()
+
+ def _append_to_deque(self, func, path, event_mask):
+ thunk = (func, path, event_mask)
+ self._upload_deque.append(thunk)
+ self._pending.add(path)
+ if self.is_upload_ready:
+ self._process_deque()
+
+ def _process_deque(self):
+ while True:
+ try:
+ fields = self._upload_deque.pop()
+ func = fields[0]
+ func(*fields[1:])
+ except IndexError:
+ break
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
-
self._stats_provider.count('drop_upload.files_queued', 1)
- eventually(self._process, opaque, path, events_mask)
+ if path not in self._pending:
+ self._append_to_deque(self._process, path, events_mask)
+ self._pending.add(path)
- def _process(self, opaque, path, events_mask):
+ def _process(self, path, events_mask):
d = defer.succeed(None)
# FIXME: if this already exists as a mutable file, we replace the directory entry,
name = name.decode(get_filesystem_encoding())
u = FileName(path.path, self._convergence)
+ self._pending.remove(path)
return self._parent.add_file(name, u)
d.addCallback(_add_file)