]> git.rkrishnan.org Git - functorrent.git/commitdiff
tracker: refactor around Http and Udp (to be worked on) modules
authorRamakrishnan Muthukrishnan <ram@rkrishnan.org>
Thu, 10 Mar 2016 14:09:11 +0000 (19:39 +0530)
committerRamakrishnan Muthukrishnan <ram@rkrishnan.org>
Thu, 10 Mar 2016 14:09:17 +0000 (19:39 +0530)
Tracker, like the FileSystem module is also an independent thread
that responds to messages.

src/FuncTorrent/FileSystem.hs
src/FuncTorrent/Peer.hs
src/FuncTorrent/Tracker.hs
src/FuncTorrent/Tracker/Http.hs
src/FuncTorrent/Tracker/Types.hs
src/main/Main.hs

index fdf89fa94bae2415d9fa12d572cade889823eebf..3c625e69e0825af4f660ee783498fcaba1d8f782 100644 (file)
@@ -6,14 +6,16 @@ module FuncTorrent.FileSystem
         createMsgChannel,
         writePieceToDisk,
         Piece(..),
         createMsgChannel,
         writePieceToDisk,
         Piece(..),
-        pieceMapFromFile
+        pieceMapFromFile,
+        Stats(..),
+        getStats
        )
        where
 
 import           Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
        )
        where
 
 import           Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
-import           Control.Concurrent.MVar (MVar, putMVar)
+import           Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar)
 import           Control.Monad (forever)
 import           Control.Monad (forever)
-import           Control.Monad.State (StateT, liftIO, runStateT, modify)
+import           Control.Monad.State (StateT, liftIO, get, runStateT, modify)
 import qualified Data.ByteString as BS
 import           Data.Map (traverseWithKey, (!))
 import           System.IO (Handle, IOMode (ReadWriteMode), withFile)
 import qualified Data.ByteString as BS
 import           Data.Map (traverseWithKey, (!))
 import           System.IO (Handle, IOMode (ReadWriteMode), withFile)
@@ -28,6 +30,7 @@ data Piece = Piece PieceNum BS.ByteString
 data Msg = ReadPiece PieceNum Integer (MVar Piece)
          | WritePiece Piece
          | VerifyPiece PieceNum (MVar Bool)
 data Msg = ReadPiece PieceNum Integer (MVar Piece)
          | WritePiece Piece
          | VerifyPiece PieceNum (MVar Bool)
+         | GetStats (MVar Stats)
 
 type MsgChannel = Chan Msg
 
 
 type MsgChannel = Chan Msg
 
@@ -47,38 +50,41 @@ run pieceMap c handle = forever $ do
 
 run' :: PieceMap -> MsgChannel -> Handle -> StateT Stats IO ()
 run' pieceMap c handle = do
 
 run' :: PieceMap -> MsgChannel -> Handle -> StateT Stats IO ()
 run' pieceMap c handle = do
+  stats <- get
   msg <- liftIO recvMsg
   msg <- liftIO recvMsg
-  liftIO $ sendResponse msg
+  liftIO $ sendResponse msg stats
   updateStats msg
   updateStats msg
-  where
-    recvMsg = readChan c
-    sendResponse msg =
-      case msg of
-      ReadPiece n len' var -> do
-        bs <- readPiece n len'
-        putMVar var (Piece n bs)
-      WritePiece (Piece n bs) ->
-        writePiece n bs
-      VerifyPiece n var -> do
-        isHashValid <- verifyPiece n
-        putMVar var isHashValid
-    readPiece n len' = do
-      let offset = pieceNumToOffset pieceMap n
-      readFileAtOffset handle offset len'
-    writePiece n piece = do
-      let offset = pieceNumToOffset pieceMap n
-      writeFileAtOffset handle offset piece
-    verifyPiece n = do
-      let offset = pieceNumToOffset pieceMap n
-          hash'  = hash (pieceMap ! n)
-          len'   = len (pieceMap ! n)
-      bs' <- readFileAtOffset handle offset len'
-      return $ verifyHash bs' hash'
-    updateStats (ReadPiece _ l _) =
-      modify (\st -> st {bytesRead = bytesRead st + l})
-    updateStats (WritePiece (Piece _ bs)) =
-      modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)})
-    updateStats _ = modify id
+    where
+      recvMsg = readChan c
+      sendResponse msg stats =
+        case msg of
+          ReadPiece n len' var -> do
+            bs <- readPiece n len'
+            putMVar var (Piece n bs)
+          WritePiece (Piece n bs) ->
+            writePiece n bs
+          VerifyPiece n var -> do
+            isHashValid <- verifyPiece n
+            putMVar var isHashValid
+          GetStats var -> do
+            putMVar var stats
+      readPiece n len' = do
+        let offset = pieceNumToOffset pieceMap n
+        readFileAtOffset handle offset len'
+      writePiece n piece = do
+        let offset = pieceNumToOffset pieceMap n
+        writeFileAtOffset handle offset piece
+      verifyPiece n = do
+        let offset = pieceNumToOffset pieceMap n
+            hash'  = hash (pieceMap ! n)
+            len'   = len (pieceMap ! n)
+        bs' <- readFileAtOffset handle offset len'
+        return $ verifyHash bs' hash'
+      updateStats (ReadPiece _ l _) =
+        modify (\st -> st {bytesRead = bytesRead st + l})
+      updateStats (WritePiece (Piece _ bs)) =
+        modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)})
+      updateStats _ = modify id
 
 pieceMapFromFile :: FilePath -> Integer -> PieceMap -> IO PieceMap
 pieceMapFromFile filePath fileLen pieceMap = do
 
 pieceMapFromFile :: FilePath -> Integer -> PieceMap -> IO PieceMap
 pieceMapFromFile filePath fileLen pieceMap = do
@@ -98,3 +104,8 @@ writePieceToDisk :: MsgChannel -> PieceNum -> BS.ByteString -> IO ()
 writePieceToDisk c pieceNum bs =
   writeChan c $ WritePiece (Piece pieceNum bs)
 
 writePieceToDisk c pieceNum bs =
   writeChan c $ WritePiece (Piece pieceNum bs)
 
+getStats :: MsgChannel -> IO (MVar Stats)
+getStats c = do
+  v <- newEmptyMVar
+  writeChan c $ GetStats v
+  return v
index 9dd307d45c7a68898768d5b0c320440ed6409f6e..c855fa25a68906b6de8c279d5f09ee7ae0a4c795 100644 (file)
@@ -19,7 +19,7 @@ import FuncTorrent.Metainfo (Metainfo(..))
 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
 import FuncTorrent.Utils (splitNum, verifyHash)
 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
 import FuncTorrent.Utils (splitNum, verifyHash)
 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
-import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk, Piece(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
 
 data PState = PState { handle :: Handle
                      , peer :: Peer
 
 data PState = PState { handle :: Handle
                      , peer :: Peer
index 64a2ce9d3e8c96a5a29c5176bd770be0ba05bb52..157880842cc23951f909dda0e7a489d57bdd1c5f 100644 (file)
@@ -1,29 +1,69 @@
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker
-    (TState(..),
-     initialTrackerState,
-     trackerLoop
-    ) where
+       (runTracker
+       , getConnectedPeers
+       , newTracker
+       ) where
 
 
-import Control.Concurrent.MVar (newEmptyMVar, newMVar)
+import Control.Concurrent(forkIO)
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
+import Control.Concurrent.MVar (newEmptyMVar, putMVar, readMVar)
+import Control.Monad.State (StateT, liftIO, get, runStateT)
+import Control.Monad (forever)
+import Data.ByteString.Char8 (ByteString)
 import Data.List (isPrefixOf)
 import Data.List (isPrefixOf)
+import Network (PortNumber)
 
 
-import FuncTorrent.Tracker.Http(trackerLoop)
-import FuncTorrent.Tracker.Types(TState(..), TrackerEventState(..), TrackerProtocol(..))
+import FuncTorrent.Tracker.Http (trackerLoop)
+import FuncTorrent.Tracker.Types (TState(..), TrackerEventState(..), TrackerProtocol(..), TrackerMsg(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel)
+import FuncTorrent.Peer (Peer)
 
 
-initialTrackerState :: Integer -> IO TState
-initialTrackerState sz = do
+type MsgChannel = Chan TrackerMsg
+
+newTracker :: IO MsgChannel
+newTracker = newChan
+
+runTracker :: MsgChannel -> FS.MsgChannel -> ByteString -> PortNumber
+           -> String -> [String] -> Integer -> IO ()
+runTracker msgChannel fsChan infohash port peerId announceList sz = do
   ps <- newEmptyMVar
   ps <- newEmptyMVar
-  up <- newMVar 0
-  down <- newMVar 0
-  return $ TState { currentState = None
-                  , connectedPeers = ps
-                  , uploaded = up
-                  , downloaded = down
-                  , left = sz }
+  let initialTState = TState { currentState = None
+                             , connectedPeers = ps
+                             , left = sz }
+      turl = head announceList
+  case (getTrackerType turl) of
+    Http -> do
+      _ <- forkIO $ trackerLoop turl port peerId infohash fsChan initialTState
+      runStateT (msgHandler msgChannel) initialTState
+      return ()
+    _ -> do
+      error "Tracker Protocol unimplemented"
 
 getTrackerType :: String -> TrackerProtocol
 getTrackerType url | isPrefixOf "http://" url = Http
                    | isPrefixOf "udp://" url  = Udp
                    | otherwise                = UnknownProtocol
 
 
 getTrackerType :: String -> TrackerProtocol
 getTrackerType url | isPrefixOf "http://" url = Http
                    | isPrefixOf "udp://" url  = Udp
                    | otherwise                = UnknownProtocol
 
+
+msgHandler :: MsgChannel -> StateT TState IO ()
+msgHandler c = forever $ do
+  st <- get
+  peers <- liftIO $ readMVar (connectedPeers st)
+  msg <- liftIO recvMsg
+  liftIO $ sendResponse msg peers
+    where
+      recvMsg = readChan c
+      sendResponse msg peers =
+        case msg of
+          GetConnectedPeersMsg var -> do
+            putMVar var peers
+          _ -> do
+            putStrLn "Unhandled Tracker Msg"
+
+getConnectedPeers :: MsgChannel -> IO [Peer]
+getConnectedPeers c = do
+  v <- newEmptyMVar
+  writeChan c (GetConnectedPeersMsg v)
+  ps <- readMVar v
+  return ps
index dacadbf278534c3b509af2c7fa44d1d0c9a9ca1e..d19a013b081489ec2716132a70162705c5dcefa8 100644 (file)
@@ -1,12 +1,13 @@
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker.Http
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker.Http
-       ( trackerLoop
+       (trackerLoop
        ) where
 
 import Prelude hiding (lookup, splitAt)
 
 import Control.Concurrent (threadDelay)
 import Control.Concurrent.MVar (readMVar, putMVar)
        ) where
 
 import Prelude hiding (lookup, splitAt)
 
 import Control.Concurrent (threadDelay)
 import Control.Concurrent.MVar (readMVar, putMVar)
+import Control.Monad (forever)
 import qualified Data.ByteString.Base16 as B16 (encode)
 import Data.ByteString (ByteString)
 import Data.ByteString.Char8 as BC (pack, unpack, splitAt)
 import qualified Data.ByteString.Base16 as B16 (encode)
 import Data.ByteString (ByteString)
 import Data.ByteString.Char8 as BC (pack, unpack, splitAt)
@@ -18,7 +19,7 @@ import Network.HTTP.Base (urlEncode)
 
 import qualified FuncTorrent.Bencode as Benc
 import FuncTorrent.Bencode (BVal(..))
 
 import qualified FuncTorrent.Bencode as Benc
 import FuncTorrent.Bencode (BVal(..))
-import FuncTorrent.Metainfo (Info(..), Metainfo(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
 import FuncTorrent.Network (sendGetRequest)
 import FuncTorrent.Peer (Peer(..))
 import FuncTorrent.Utils (splitN)
 import FuncTorrent.Network (sendGetRequest)
 import FuncTorrent.Peer (Peer(..))
 import FuncTorrent.Utils (splitN)
@@ -41,34 +42,33 @@ urlEncodeHash bs = concatMap (encode' . unpack) (splitN 2 bs)
 
 -- | Make arguments that should be posted to tracker.
 -- This is a separate pure function for testability.
 
 -- | Make arguments that should be posted to tracker.
 -- This is a separate pure function for testability.
-mkArgs :: PortNumber -> String -> Integer -> Integer -> Metainfo -> [(String, ByteString)]
-mkArgs port peer_id up down m =
-  let fileSize = lengthInBytes $ info m
-      bytesLeft = fileSize - down
-  in
-    [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m),
-     ("peer_id", pack . urlEncode $ peer_id),
-     ("port", pack $ show port),
-     ("uploaded", pack $ show up),
-     ("downloaded", pack $ show down),
-     ("left", pack $ show bytesLeft),
-     ("compact", "1"),
-     ("event", "started")]
+mkArgs :: PortNumber -> String -> Integer -> Integer -> Integer -> ByteString -> [(String, ByteString)]
+mkArgs port peer_id up down left' infoHash =
+  [("info_hash", pack . urlEncodeHash . B16.encode $ infoHash),
+   ("peer_id", pack . urlEncode $ peer_id),
+   ("port", pack $ show port),
+   ("uploaded", pack $ show up),
+   ("downloaded", pack $ show down),
+   ("left", pack $ show left'),
+   ("compact", "1"),
+   ("event", "started")]
 
 
-trackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO ByteString
-trackerLoop port peerId m st = do
-  up <- readMVar $ uploaded st
-  down <- readMVar $ downloaded st
-  resp <- sendGetRequest (head . announceList $ m) $ mkArgs port peerId up down m
+trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
+trackerLoop url port peerId infohash fschan tstate = forever $ do
+  st' <- FS.getStats fschan
+  st <- readMVar st'
+  let up = FS.bytesRead st
+      down = FS.bytesWritten st
+  resp <- sendGetRequest url $ mkArgs port peerId up down (left tstate) infohash
   case Benc.decode resp of
   case Benc.decode resp of
-    Left e -> return $ pack (show e)
+    Left e -> return () -- $ pack (show e)
     Right trackerInfo ->
       case parseTrackerResponse trackerInfo of
     Right trackerInfo ->
       case parseTrackerResponse trackerInfo of
-        Left e -> return e
+        Left e -> return () -- e
         Right tresp -> do
           _ <- threadDelay $ fromIntegral (interval tresp)
         Right tresp -> do
           _ <- threadDelay $ fromIntegral (interval tresp)
-          _ <- putMVar (connectedPeers st) (peers tresp)
-          trackerLoop port peerId m st
+          _ <- putMVar (connectedPeers tstate) (peers tresp)
+          return () -- trackerLoop port peerId st
 
 parseTrackerResponse :: BVal -> Either ByteString TrackerResponse
 parseTrackerResponse resp =
 
 parseTrackerResponse :: BVal -> Either ByteString TrackerResponse
 parseTrackerResponse resp =
index 6ca5ddb849e21d4691e1f836d5c83b93ca3347a4..49e63e32c86f80f1081b718a931c2493b88ebc29 100644 (file)
@@ -4,6 +4,7 @@ module FuncTorrent.Tracker.Types
        , TrackerResponse(..)
        , TrackerEventState(..)
        , TState(..)
        , TrackerResponse(..)
        , TrackerEventState(..)
        , TState(..)
+       , TrackerMsg(..)
        , IP
        , Port
        ) where
        , IP
        , Port
        ) where
@@ -25,14 +26,13 @@ data TrackerEventState = None
                        | Stopped
                        | Completed
                        deriving (Show, Eq)
                        | Stopped
                        | Completed
                        deriving (Show, Eq)
+data TrackerMsg = GetStatusMsg
+                | GetConnectedPeersMsg (MVar [Peer])
 
 
-data TState = TState {
-    uploaded :: MVar Integer
-  , downloaded :: MVar Integer
-  , left :: Integer
-  , currentState :: TrackerEventState
-  , connectedPeers :: MVar [Peer]
-  }
+data TState = TState { left :: Integer
+                     , currentState :: TrackerEventState
+                     , connectedPeers :: MVar [Peer]
+                     }
 
 -- | Tracker response
 data TrackerResponse = TrackerResponse {
 
 -- | Tracker response
 data TrackerResponse = TrackerResponse {
index 268e9a4e7c509ffc0c2edbbdcab1fce95d54101a..97c43513cc1f40569916be147362c4ce36bb1c57 100644 (file)
@@ -12,7 +12,7 @@ import           FuncTorrent.Metainfo (Info(..), Metainfo(..), torrentToMetainfo
 import           FuncTorrent.Peer (handlePeerMsgs)
 import           FuncTorrent.PieceManager (initPieceMap)
 import qualified FuncTorrent.Server as Server
 import           FuncTorrent.Peer (handlePeerMsgs)
 import           FuncTorrent.PieceManager (initPieceMap)
 import qualified FuncTorrent.Server as Server
-import           FuncTorrent.Tracker (connectedPeers, initialTrackerState, trackerLoop)
+import           FuncTorrent.Tracker (runTracker, getConnectedPeers, newTracker)
 import           Network (PortID (PortNumber))
 import           System.IO (withFile, IOMode (ReadWriteMode))
 import           System.Directory (doesFileExist)
 import           Network (PortID (PortNumber))
 import           System.IO (withFile, IOMode (ReadWriteMode))
 import           System.Directory (doesFileExist)
@@ -69,6 +69,7 @@ main = do
            fileLen = lengthInBytes (info m)
            pieceHash = pieces (info m)
            pLen = pieceLength (info m)
            fileLen = lengthInBytes (info m)
            pieceHash = pieces (info m)
            pLen = pieceLength (info m)
+           infohash = infoHash m
            defaultPieceMap = initPieceMap pieceHash fileLen pLen
        log $ "create FS msg channel"
        fsMsgChannel <- FS.createMsgChannel
            defaultPieceMap = initPieceMap pieceHash fileLen pLen
        log $ "create FS msg channel"
        fsMsgChannel <- FS.createMsgChannel
@@ -82,10 +83,9 @@ main = do
        log "Trying to fetch peers"
        _ <- forkIO $ Server.run serverSock peerId m pieceMap fsMsgChannel
        log $ "Trackers: " ++ head (announceList m)
        log "Trying to fetch peers"
        _ <- forkIO $ Server.run serverSock peerId m pieceMap fsMsgChannel
        log $ "Trackers: " ++ head (announceList m)
-       -- (tstate, errstr) <- runTracker portnum peerId m
-       tstate <- initialTrackerState $ lengthInBytes $ info m
-       _ <- forkIO $ trackerLoop portnum peerId m tstate >> return ()
-       ps <- readMVar (connectedPeers tstate)
+       trackerMsgChan <- newTracker
+       _ <- forkIO $ runTracker trackerMsgChan fsMsgChannel infohash portnum peerId (announceList m) fileLen
+       ps <- getConnectedPeers trackerMsgChan
        log $ "Peers List : " ++ (show ps)
        let p1 = head ps
        handlePeerMsgs p1 peerId m pieceMap True fsMsgChannel
        log $ "Peers List : " ++ (show ps)
        let p1 = head ps
        handlePeerMsgs p1 peerId m pieceMap True fsMsgChannel