From: Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
Date: Thu, 10 Mar 2016 14:09:11 +0000 (+0530)
Subject: tracker: refactor around Http and Udp (to be worked on) modules
X-Git-Url: https://git.rkrishnan.org/components/%22news.html/simplejson/%22file:/?a=commitdiff_plain;h=5b18521efb136dfa7d0676f195a2cdf38744d660;p=functorrent.git

tracker: refactor around Http and Udp (to be worked on) modules

Tracker, like the FileSystem module is also an independent thread
that responds to messages.
---

diff --git a/src/FuncTorrent/FileSystem.hs b/src/FuncTorrent/FileSystem.hs
index fdf89fa..3c625e6 100644
--- a/src/FuncTorrent/FileSystem.hs
+++ b/src/FuncTorrent/FileSystem.hs
@@ -6,14 +6,16 @@ module FuncTorrent.FileSystem
         createMsgChannel,
         writePieceToDisk,
         Piece(..),
-        pieceMapFromFile
+        pieceMapFromFile,
+        Stats(..),
+        getStats
        )
        where
 
 import           Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
-import           Control.Concurrent.MVar (MVar, putMVar)
+import           Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar)
 import           Control.Monad (forever)
-import           Control.Monad.State (StateT, liftIO, runStateT, modify)
+import           Control.Monad.State (StateT, liftIO, get, runStateT, modify)
 import qualified Data.ByteString as BS
 import           Data.Map (traverseWithKey, (!))
 import           System.IO (Handle, IOMode (ReadWriteMode), withFile)
@@ -28,6 +30,7 @@ data Piece = Piece PieceNum BS.ByteString
 data Msg = ReadPiece PieceNum Integer (MVar Piece)
          | WritePiece Piece
          | VerifyPiece PieceNum (MVar Bool)
+         | GetStats (MVar Stats)
 
 type MsgChannel = Chan Msg
 
@@ -47,38 +50,41 @@ run pieceMap c handle = forever $ do
 
 run' :: PieceMap -> MsgChannel -> Handle -> StateT Stats IO ()
 run' pieceMap c handle = do
+  stats <- get
   msg <- liftIO recvMsg
-  liftIO $ sendResponse msg
+  liftIO $ sendResponse msg stats
   updateStats msg
-  where
-    recvMsg = readChan c
-    sendResponse msg =
-      case msg of
-      ReadPiece n len' var -> do
-        bs <- readPiece n len'
-        putMVar var (Piece n bs)
-      WritePiece (Piece n bs) ->
-        writePiece n bs
-      VerifyPiece n var -> do
-        isHashValid <- verifyPiece n
-        putMVar var isHashValid
-    readPiece n len' = do
-      let offset = pieceNumToOffset pieceMap n
-      readFileAtOffset handle offset len'
-    writePiece n piece = do
-      let offset = pieceNumToOffset pieceMap n
-      writeFileAtOffset handle offset piece
-    verifyPiece n = do
-      let offset = pieceNumToOffset pieceMap n
-          hash'  = hash (pieceMap ! n)
-          len'   = len (pieceMap ! n)
-      bs' <- readFileAtOffset handle offset len'
-      return $ verifyHash bs' hash'
-    updateStats (ReadPiece _ l _) =
-      modify (\st -> st {bytesRead = bytesRead st + l})
-    updateStats (WritePiece (Piece _ bs)) =
-      modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)})
-    updateStats _ = modify id
+    where
+      recvMsg = readChan c
+      sendResponse msg stats =
+        case msg of
+          ReadPiece n len' var -> do
+            bs <- readPiece n len'
+            putMVar var (Piece n bs)
+          WritePiece (Piece n bs) ->
+            writePiece n bs
+          VerifyPiece n var -> do
+            isHashValid <- verifyPiece n
+            putMVar var isHashValid
+          GetStats var -> do
+            putMVar var stats
+      readPiece n len' = do
+        let offset = pieceNumToOffset pieceMap n
+        readFileAtOffset handle offset len'
+      writePiece n piece = do
+        let offset = pieceNumToOffset pieceMap n
+        writeFileAtOffset handle offset piece
+      verifyPiece n = do
+        let offset = pieceNumToOffset pieceMap n
+            hash'  = hash (pieceMap ! n)
+            len'   = len (pieceMap ! n)
+        bs' <- readFileAtOffset handle offset len'
+        return $ verifyHash bs' hash'
+      updateStats (ReadPiece _ l _) =
+        modify (\st -> st {bytesRead = bytesRead st + l})
+      updateStats (WritePiece (Piece _ bs)) =
+        modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)})
+      updateStats _ = modify id
 
 pieceMapFromFile :: FilePath -> Integer -> PieceMap -> IO PieceMap
 pieceMapFromFile filePath fileLen pieceMap = do
@@ -98,3 +104,8 @@ writePieceToDisk :: MsgChannel -> PieceNum -> BS.ByteString -> IO ()
 writePieceToDisk c pieceNum bs =
   writeChan c $ WritePiece (Piece pieceNum bs)
 
+getStats :: MsgChannel -> IO (MVar Stats)
+getStats c = do
+  v <- newEmptyMVar
+  writeChan c $ GetStats v
+  return v
diff --git a/src/FuncTorrent/Peer.hs b/src/FuncTorrent/Peer.hs
index 9dd307d..c855fa2 100644
--- a/src/FuncTorrent/Peer.hs
+++ b/src/FuncTorrent/Peer.hs
@@ -19,7 +19,7 @@ import FuncTorrent.Metainfo (Metainfo(..))
 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
 import FuncTorrent.Utils (splitNum, verifyHash)
 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
-import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk, Piece(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
 
 data PState = PState { handle :: Handle
                      , peer :: Peer
diff --git a/src/FuncTorrent/Tracker.hs b/src/FuncTorrent/Tracker.hs
index 64a2ce9..1578808 100644
--- a/src/FuncTorrent/Tracker.hs
+++ b/src/FuncTorrent/Tracker.hs
@@ -1,29 +1,69 @@
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker
-    (TState(..),
-     initialTrackerState,
-     trackerLoop
-    ) where
+       (runTracker
+       , getConnectedPeers
+       , newTracker
+       ) where
 
-import Control.Concurrent.MVar (newEmptyMVar, newMVar)
+import Control.Concurrent(forkIO)
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
+import Control.Concurrent.MVar (newEmptyMVar, putMVar, readMVar)
+import Control.Monad.State (StateT, liftIO, get, runStateT)
+import Control.Monad (forever)
+import Data.ByteString.Char8 (ByteString)
 import Data.List (isPrefixOf)
+import Network (PortNumber)
 
-import FuncTorrent.Tracker.Http(trackerLoop)
-import FuncTorrent.Tracker.Types(TState(..), TrackerEventState(..), TrackerProtocol(..))
+import FuncTorrent.Tracker.Http (trackerLoop)
+import FuncTorrent.Tracker.Types (TState(..), TrackerEventState(..), TrackerProtocol(..), TrackerMsg(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel)
+import FuncTorrent.Peer (Peer)
 
-initialTrackerState :: Integer -> IO TState
-initialTrackerState sz = do
+type MsgChannel = Chan TrackerMsg
+
+newTracker :: IO MsgChannel
+newTracker = newChan
+
+runTracker :: MsgChannel -> FS.MsgChannel -> ByteString -> PortNumber
+           -> String -> [String] -> Integer -> IO ()
+runTracker msgChannel fsChan infohash port peerId announceList sz = do
   ps <- newEmptyMVar
-  up <- newMVar 0
-  down <- newMVar 0
-  return $ TState { currentState = None
-                  , connectedPeers = ps
-                  , uploaded = up
-                  , downloaded = down
-                  , left = sz }
+  let initialTState = TState { currentState = None
+                             , connectedPeers = ps
+                             , left = sz }
+      turl = head announceList
+  case (getTrackerType turl) of
+    Http -> do
+      _ <- forkIO $ trackerLoop turl port peerId infohash fsChan initialTState
+      runStateT (msgHandler msgChannel) initialTState
+      return ()
+    _ -> do
+      error "Tracker Protocol unimplemented"
 
 getTrackerType :: String -> TrackerProtocol
 getTrackerType url | isPrefixOf "http://" url = Http
                    | isPrefixOf "udp://" url  = Udp
                    | otherwise                = UnknownProtocol
 
+
+msgHandler :: MsgChannel -> StateT TState IO ()
+msgHandler c = forever $ do
+  st <- get
+  peers <- liftIO $ readMVar (connectedPeers st)
+  msg <- liftIO recvMsg
+  liftIO $ sendResponse msg peers
+    where
+      recvMsg = readChan c
+      sendResponse msg peers =
+        case msg of
+          GetConnectedPeersMsg var -> do
+            putMVar var peers
+          _ -> do
+            putStrLn "Unhandled Tracker Msg"
+
+getConnectedPeers :: MsgChannel -> IO [Peer]
+getConnectedPeers c = do
+  v <- newEmptyMVar
+  writeChan c (GetConnectedPeersMsg v)
+  ps <- readMVar v
+  return ps
diff --git a/src/FuncTorrent/Tracker/Http.hs b/src/FuncTorrent/Tracker/Http.hs
index dacadbf..d19a013 100644
--- a/src/FuncTorrent/Tracker/Http.hs
+++ b/src/FuncTorrent/Tracker/Http.hs
@@ -1,12 +1,13 @@
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker.Http
-       ( trackerLoop
+       (trackerLoop
        ) where
 
 import Prelude hiding (lookup, splitAt)
 
 import Control.Concurrent (threadDelay)
 import Control.Concurrent.MVar (readMVar, putMVar)
+import Control.Monad (forever)
 import qualified Data.ByteString.Base16 as B16 (encode)
 import Data.ByteString (ByteString)
 import Data.ByteString.Char8 as BC (pack, unpack, splitAt)
@@ -18,7 +19,7 @@ import Network.HTTP.Base (urlEncode)
 
 import qualified FuncTorrent.Bencode as Benc
 import FuncTorrent.Bencode (BVal(..))
-import FuncTorrent.Metainfo (Info(..), Metainfo(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
 import FuncTorrent.Network (sendGetRequest)
 import FuncTorrent.Peer (Peer(..))
 import FuncTorrent.Utils (splitN)
@@ -41,34 +42,33 @@ urlEncodeHash bs = concatMap (encode' . unpack) (splitN 2 bs)
 
 -- | Make arguments that should be posted to tracker.
 -- This is a separate pure function for testability.
-mkArgs :: PortNumber -> String -> Integer -> Integer -> Metainfo -> [(String, ByteString)]
-mkArgs port peer_id up down m =
-  let fileSize = lengthInBytes $ info m
-      bytesLeft = fileSize - down
-  in
-    [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m),
-     ("peer_id", pack . urlEncode $ peer_id),
-     ("port", pack $ show port),
-     ("uploaded", pack $ show up),
-     ("downloaded", pack $ show down),
-     ("left", pack $ show bytesLeft),
-     ("compact", "1"),
-     ("event", "started")]
+mkArgs :: PortNumber -> String -> Integer -> Integer -> Integer -> ByteString -> [(String, ByteString)]
+mkArgs port peer_id up down left' infoHash =
+  [("info_hash", pack . urlEncodeHash . B16.encode $ infoHash),
+   ("peer_id", pack . urlEncode $ peer_id),
+   ("port", pack $ show port),
+   ("uploaded", pack $ show up),
+   ("downloaded", pack $ show down),
+   ("left", pack $ show left'),
+   ("compact", "1"),
+   ("event", "started")]
 
-trackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO ByteString
-trackerLoop port peerId m st = do
-  up <- readMVar $ uploaded st
-  down <- readMVar $ downloaded st
-  resp <- sendGetRequest (head . announceList $ m) $ mkArgs port peerId up down m
+trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
+trackerLoop url port peerId infohash fschan tstate = forever $ do
+  st' <- FS.getStats fschan
+  st <- readMVar st'
+  let up = FS.bytesRead st
+      down = FS.bytesWritten st
+  resp <- sendGetRequest url $ mkArgs port peerId up down (left tstate) infohash
   case Benc.decode resp of
-    Left e -> return $ pack (show e)
+    Left e -> return () -- $ pack (show e)
     Right trackerInfo ->
       case parseTrackerResponse trackerInfo of
-        Left e -> return e
+        Left e -> return () -- e
         Right tresp -> do
           _ <- threadDelay $ fromIntegral (interval tresp)
-          _ <- putMVar (connectedPeers st) (peers tresp)
-          trackerLoop port peerId m st
+          _ <- putMVar (connectedPeers tstate) (peers tresp)
+          return () -- trackerLoop port peerId st
 
 parseTrackerResponse :: BVal -> Either ByteString TrackerResponse
 parseTrackerResponse resp =
diff --git a/src/FuncTorrent/Tracker/Types.hs b/src/FuncTorrent/Tracker/Types.hs
index 6ca5ddb..49e63e3 100644
--- a/src/FuncTorrent/Tracker/Types.hs
+++ b/src/FuncTorrent/Tracker/Types.hs
@@ -4,6 +4,7 @@ module FuncTorrent.Tracker.Types
        , TrackerResponse(..)
        , TrackerEventState(..)
        , TState(..)
+       , TrackerMsg(..)
        , IP
        , Port
        ) where
@@ -25,14 +26,13 @@ data TrackerEventState = None
                        | Stopped
                        | Completed
                        deriving (Show, Eq)
+data TrackerMsg = GetStatusMsg
+                | GetConnectedPeersMsg (MVar [Peer])
 
-data TState = TState {
-    uploaded :: MVar Integer
-  , downloaded :: MVar Integer
-  , left :: Integer
-  , currentState :: TrackerEventState
-  , connectedPeers :: MVar [Peer]
-  }
+data TState = TState { left :: Integer
+                     , currentState :: TrackerEventState
+                     , connectedPeers :: MVar [Peer]
+                     }
 
 -- | Tracker response
 data TrackerResponse = TrackerResponse {
diff --git a/src/main/Main.hs b/src/main/Main.hs
index 268e9a4..97c4351 100644
--- a/src/main/Main.hs
+++ b/src/main/Main.hs
@@ -12,7 +12,7 @@ import           FuncTorrent.Metainfo (Info(..), Metainfo(..), torrentToMetainfo
 import           FuncTorrent.Peer (handlePeerMsgs)
 import           FuncTorrent.PieceManager (initPieceMap)
 import qualified FuncTorrent.Server as Server
-import           FuncTorrent.Tracker (connectedPeers, initialTrackerState, trackerLoop)
+import           FuncTorrent.Tracker (runTracker, getConnectedPeers, newTracker)
 import           Network (PortID (PortNumber))
 import           System.IO (withFile, IOMode (ReadWriteMode))
 import           System.Directory (doesFileExist)
@@ -69,6 +69,7 @@ main = do
            fileLen = lengthInBytes (info m)
            pieceHash = pieces (info m)
            pLen = pieceLength (info m)
+           infohash = infoHash m
            defaultPieceMap = initPieceMap pieceHash fileLen pLen
        log $ "create FS msg channel"
        fsMsgChannel <- FS.createMsgChannel
@@ -82,10 +83,9 @@ main = do
        log "Trying to fetch peers"
        _ <- forkIO $ Server.run serverSock peerId m pieceMap fsMsgChannel
        log $ "Trackers: " ++ head (announceList m)
-       -- (tstate, errstr) <- runTracker portnum peerId m
-       tstate <- initialTrackerState $ lengthInBytes $ info m
-       _ <- forkIO $ trackerLoop portnum peerId m tstate >> return ()
-       ps <- readMVar (connectedPeers tstate)
+       trackerMsgChan <- newTracker
+       _ <- forkIO $ runTracker trackerMsgChan fsMsgChannel infohash portnum peerId (announceList m) fileLen
+       ps <- getConnectedPeers trackerMsgChan
        log $ "Peers List : " ++ (show ps)
        let p1 = head ps
        handlePeerMsgs p1 peerId m pieceMap True fsMsgChannel