From: Ramakrishnan Muthukrishnan Date: Thu, 10 Mar 2016 14:09:11 +0000 (+0530) Subject: tracker: refactor around Http and Udp (to be worked on) modules X-Git-Url: https://git.rkrishnan.org/vdrive/components/statistics?a=commitdiff_plain;h=5b18521efb136dfa7d0676f195a2cdf38744d660;p=functorrent.git tracker: refactor around Http and Udp (to be worked on) modules Tracker, like the FileSystem module is also an independent thread that responds to messages. --- diff --git a/src/FuncTorrent/FileSystem.hs b/src/FuncTorrent/FileSystem.hs index fdf89fa..3c625e6 100644 --- a/src/FuncTorrent/FileSystem.hs +++ b/src/FuncTorrent/FileSystem.hs @@ -6,14 +6,16 @@ module FuncTorrent.FileSystem createMsgChannel, writePieceToDisk, Piece(..), - pieceMapFromFile + pieceMapFromFile, + Stats(..), + getStats ) where import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) -import Control.Concurrent.MVar (MVar, putMVar) +import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar) import Control.Monad (forever) -import Control.Monad.State (StateT, liftIO, runStateT, modify) +import Control.Monad.State (StateT, liftIO, get, runStateT, modify) import qualified Data.ByteString as BS import Data.Map (traverseWithKey, (!)) import System.IO (Handle, IOMode (ReadWriteMode), withFile) @@ -28,6 +30,7 @@ data Piece = Piece PieceNum BS.ByteString data Msg = ReadPiece PieceNum Integer (MVar Piece) | WritePiece Piece | VerifyPiece PieceNum (MVar Bool) + | GetStats (MVar Stats) type MsgChannel = Chan Msg @@ -47,38 +50,41 @@ run pieceMap c handle = forever $ do run' :: PieceMap -> MsgChannel -> Handle -> StateT Stats IO () run' pieceMap c handle = do + stats <- get msg <- liftIO recvMsg - liftIO $ sendResponse msg + liftIO $ sendResponse msg stats updateStats msg - where - recvMsg = readChan c - sendResponse msg = - case msg of - ReadPiece n len' var -> do - bs <- readPiece n len' - putMVar var (Piece n bs) - WritePiece (Piece n bs) -> - writePiece n bs - VerifyPiece n var -> do - isHashValid <- verifyPiece n - putMVar var isHashValid - readPiece n len' = do - let offset = pieceNumToOffset pieceMap n - readFileAtOffset handle offset len' - writePiece n piece = do - let offset = pieceNumToOffset pieceMap n - writeFileAtOffset handle offset piece - verifyPiece n = do - let offset = pieceNumToOffset pieceMap n - hash' = hash (pieceMap ! n) - len' = len (pieceMap ! n) - bs' <- readFileAtOffset handle offset len' - return $ verifyHash bs' hash' - updateStats (ReadPiece _ l _) = - modify (\st -> st {bytesRead = bytesRead st + l}) - updateStats (WritePiece (Piece _ bs)) = - modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)}) - updateStats _ = modify id + where + recvMsg = readChan c + sendResponse msg stats = + case msg of + ReadPiece n len' var -> do + bs <- readPiece n len' + putMVar var (Piece n bs) + WritePiece (Piece n bs) -> + writePiece n bs + VerifyPiece n var -> do + isHashValid <- verifyPiece n + putMVar var isHashValid + GetStats var -> do + putMVar var stats + readPiece n len' = do + let offset = pieceNumToOffset pieceMap n + readFileAtOffset handle offset len' + writePiece n piece = do + let offset = pieceNumToOffset pieceMap n + writeFileAtOffset handle offset piece + verifyPiece n = do + let offset = pieceNumToOffset pieceMap n + hash' = hash (pieceMap ! n) + len' = len (pieceMap ! n) + bs' <- readFileAtOffset handle offset len' + return $ verifyHash bs' hash' + updateStats (ReadPiece _ l _) = + modify (\st -> st {bytesRead = bytesRead st + l}) + updateStats (WritePiece (Piece _ bs)) = + modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)}) + updateStats _ = modify id pieceMapFromFile :: FilePath -> Integer -> PieceMap -> IO PieceMap pieceMapFromFile filePath fileLen pieceMap = do @@ -98,3 +104,8 @@ writePieceToDisk :: MsgChannel -> PieceNum -> BS.ByteString -> IO () writePieceToDisk c pieceNum bs = writeChan c $ WritePiece (Piece pieceNum bs) +getStats :: MsgChannel -> IO (MVar Stats) +getStats c = do + v <- newEmptyMVar + writeChan c $ GetStats v + return v diff --git a/src/FuncTorrent/Peer.hs b/src/FuncTorrent/Peer.hs index 9dd307d..c855fa2 100644 --- a/src/FuncTorrent/Peer.hs +++ b/src/FuncTorrent/Peer.hs @@ -19,7 +19,7 @@ import FuncTorrent.Metainfo (Metainfo(..)) import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg) import FuncTorrent.Utils (splitNum, verifyHash) import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability) -import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk, Piece(..)) +import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk) data PState = PState { handle :: Handle , peer :: Peer diff --git a/src/FuncTorrent/Tracker.hs b/src/FuncTorrent/Tracker.hs index 64a2ce9..1578808 100644 --- a/src/FuncTorrent/Tracker.hs +++ b/src/FuncTorrent/Tracker.hs @@ -1,29 +1,69 @@ {-# LANGUAGE OverloadedStrings #-} module FuncTorrent.Tracker - (TState(..), - initialTrackerState, - trackerLoop - ) where + (runTracker + , getConnectedPeers + , newTracker + ) where -import Control.Concurrent.MVar (newEmptyMVar, newMVar) +import Control.Concurrent(forkIO) +import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) +import Control.Concurrent.MVar (newEmptyMVar, putMVar, readMVar) +import Control.Monad.State (StateT, liftIO, get, runStateT) +import Control.Monad (forever) +import Data.ByteString.Char8 (ByteString) import Data.List (isPrefixOf) +import Network (PortNumber) -import FuncTorrent.Tracker.Http(trackerLoop) -import FuncTorrent.Tracker.Types(TState(..), TrackerEventState(..), TrackerProtocol(..)) +import FuncTorrent.Tracker.Http (trackerLoop) +import FuncTorrent.Tracker.Types (TState(..), TrackerEventState(..), TrackerProtocol(..), TrackerMsg(..)) +import qualified FuncTorrent.FileSystem as FS (MsgChannel) +import FuncTorrent.Peer (Peer) -initialTrackerState :: Integer -> IO TState -initialTrackerState sz = do +type MsgChannel = Chan TrackerMsg + +newTracker :: IO MsgChannel +newTracker = newChan + +runTracker :: MsgChannel -> FS.MsgChannel -> ByteString -> PortNumber + -> String -> [String] -> Integer -> IO () +runTracker msgChannel fsChan infohash port peerId announceList sz = do ps <- newEmptyMVar - up <- newMVar 0 - down <- newMVar 0 - return $ TState { currentState = None - , connectedPeers = ps - , uploaded = up - , downloaded = down - , left = sz } + let initialTState = TState { currentState = None + , connectedPeers = ps + , left = sz } + turl = head announceList + case (getTrackerType turl) of + Http -> do + _ <- forkIO $ trackerLoop turl port peerId infohash fsChan initialTState + runStateT (msgHandler msgChannel) initialTState + return () + _ -> do + error "Tracker Protocol unimplemented" getTrackerType :: String -> TrackerProtocol getTrackerType url | isPrefixOf "http://" url = Http | isPrefixOf "udp://" url = Udp | otherwise = UnknownProtocol + +msgHandler :: MsgChannel -> StateT TState IO () +msgHandler c = forever $ do + st <- get + peers <- liftIO $ readMVar (connectedPeers st) + msg <- liftIO recvMsg + liftIO $ sendResponse msg peers + where + recvMsg = readChan c + sendResponse msg peers = + case msg of + GetConnectedPeersMsg var -> do + putMVar var peers + _ -> do + putStrLn "Unhandled Tracker Msg" + +getConnectedPeers :: MsgChannel -> IO [Peer] +getConnectedPeers c = do + v <- newEmptyMVar + writeChan c (GetConnectedPeersMsg v) + ps <- readMVar v + return ps diff --git a/src/FuncTorrent/Tracker/Http.hs b/src/FuncTorrent/Tracker/Http.hs index dacadbf..d19a013 100644 --- a/src/FuncTorrent/Tracker/Http.hs +++ b/src/FuncTorrent/Tracker/Http.hs @@ -1,12 +1,13 @@ {-# LANGUAGE OverloadedStrings #-} module FuncTorrent.Tracker.Http - ( trackerLoop + (trackerLoop ) where import Prelude hiding (lookup, splitAt) import Control.Concurrent (threadDelay) import Control.Concurrent.MVar (readMVar, putMVar) +import Control.Monad (forever) import qualified Data.ByteString.Base16 as B16 (encode) import Data.ByteString (ByteString) import Data.ByteString.Char8 as BC (pack, unpack, splitAt) @@ -18,7 +19,7 @@ import Network.HTTP.Base (urlEncode) import qualified FuncTorrent.Bencode as Benc import FuncTorrent.Bencode (BVal(..)) -import FuncTorrent.Metainfo (Info(..), Metainfo(..)) +import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats) import FuncTorrent.Network (sendGetRequest) import FuncTorrent.Peer (Peer(..)) import FuncTorrent.Utils (splitN) @@ -41,34 +42,33 @@ urlEncodeHash bs = concatMap (encode' . unpack) (splitN 2 bs) -- | Make arguments that should be posted to tracker. -- This is a separate pure function for testability. -mkArgs :: PortNumber -> String -> Integer -> Integer -> Metainfo -> [(String, ByteString)] -mkArgs port peer_id up down m = - let fileSize = lengthInBytes $ info m - bytesLeft = fileSize - down - in - [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m), - ("peer_id", pack . urlEncode $ peer_id), - ("port", pack $ show port), - ("uploaded", pack $ show up), - ("downloaded", pack $ show down), - ("left", pack $ show bytesLeft), - ("compact", "1"), - ("event", "started")] +mkArgs :: PortNumber -> String -> Integer -> Integer -> Integer -> ByteString -> [(String, ByteString)] +mkArgs port peer_id up down left' infoHash = + [("info_hash", pack . urlEncodeHash . B16.encode $ infoHash), + ("peer_id", pack . urlEncode $ peer_id), + ("port", pack $ show port), + ("uploaded", pack $ show up), + ("downloaded", pack $ show down), + ("left", pack $ show left'), + ("compact", "1"), + ("event", "started")] -trackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO ByteString -trackerLoop port peerId m st = do - up <- readMVar $ uploaded st - down <- readMVar $ downloaded st - resp <- sendGetRequest (head . announceList $ m) $ mkArgs port peerId up down m +trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO () +trackerLoop url port peerId infohash fschan tstate = forever $ do + st' <- FS.getStats fschan + st <- readMVar st' + let up = FS.bytesRead st + down = FS.bytesWritten st + resp <- sendGetRequest url $ mkArgs port peerId up down (left tstate) infohash case Benc.decode resp of - Left e -> return $ pack (show e) + Left e -> return () -- $ pack (show e) Right trackerInfo -> case parseTrackerResponse trackerInfo of - Left e -> return e + Left e -> return () -- e Right tresp -> do _ <- threadDelay $ fromIntegral (interval tresp) - _ <- putMVar (connectedPeers st) (peers tresp) - trackerLoop port peerId m st + _ <- putMVar (connectedPeers tstate) (peers tresp) + return () -- trackerLoop port peerId st parseTrackerResponse :: BVal -> Either ByteString TrackerResponse parseTrackerResponse resp = diff --git a/src/FuncTorrent/Tracker/Types.hs b/src/FuncTorrent/Tracker/Types.hs index 6ca5ddb..49e63e3 100644 --- a/src/FuncTorrent/Tracker/Types.hs +++ b/src/FuncTorrent/Tracker/Types.hs @@ -4,6 +4,7 @@ module FuncTorrent.Tracker.Types , TrackerResponse(..) , TrackerEventState(..) , TState(..) + , TrackerMsg(..) , IP , Port ) where @@ -25,14 +26,13 @@ data TrackerEventState = None | Stopped | Completed deriving (Show, Eq) +data TrackerMsg = GetStatusMsg + | GetConnectedPeersMsg (MVar [Peer]) -data TState = TState { - uploaded :: MVar Integer - , downloaded :: MVar Integer - , left :: Integer - , currentState :: TrackerEventState - , connectedPeers :: MVar [Peer] - } +data TState = TState { left :: Integer + , currentState :: TrackerEventState + , connectedPeers :: MVar [Peer] + } -- | Tracker response data TrackerResponse = TrackerResponse { diff --git a/src/main/Main.hs b/src/main/Main.hs index 268e9a4..97c4351 100644 --- a/src/main/Main.hs +++ b/src/main/Main.hs @@ -12,7 +12,7 @@ import FuncTorrent.Metainfo (Info(..), Metainfo(..), torrentToMetainfo import FuncTorrent.Peer (handlePeerMsgs) import FuncTorrent.PieceManager (initPieceMap) import qualified FuncTorrent.Server as Server -import FuncTorrent.Tracker (connectedPeers, initialTrackerState, trackerLoop) +import FuncTorrent.Tracker (runTracker, getConnectedPeers, newTracker) import Network (PortID (PortNumber)) import System.IO (withFile, IOMode (ReadWriteMode)) import System.Directory (doesFileExist) @@ -69,6 +69,7 @@ main = do fileLen = lengthInBytes (info m) pieceHash = pieces (info m) pLen = pieceLength (info m) + infohash = infoHash m defaultPieceMap = initPieceMap pieceHash fileLen pLen log $ "create FS msg channel" fsMsgChannel <- FS.createMsgChannel @@ -82,10 +83,9 @@ main = do log "Trying to fetch peers" _ <- forkIO $ Server.run serverSock peerId m pieceMap fsMsgChannel log $ "Trackers: " ++ head (announceList m) - -- (tstate, errstr) <- runTracker portnum peerId m - tstate <- initialTrackerState $ lengthInBytes $ info m - _ <- forkIO $ trackerLoop portnum peerId m tstate >> return () - ps <- readMVar (connectedPeers tstate) + trackerMsgChan <- newTracker + _ <- forkIO $ runTracker trackerMsgChan fsMsgChannel infohash portnum peerId (announceList m) fileLen + ps <- getConnectedPeers trackerMsgChan log $ "Peers List : " ++ (show ps) let p1 = head ps handlePeerMsgs p1 peerId m pieceMap True fsMsgChannel