From: Ramakrishnan Muthukrishnan Date: Fri, 2 Oct 2015 09:03:59 +0000 (+0530) Subject: Tracker is a separate thread now X-Git-Url: https://git.rkrishnan.org/specifications/components/com_hotproperty/reliability?a=commitdiff_plain;h=cfd13b8f2f31f0273331c8209d410748cfb39dc8;p=functorrent.git Tracker is a separate thread now --- diff --git a/src/FuncTorrent/Network.hs b/src/FuncTorrent/Network.hs index eac69c9..1a569a5 100644 --- a/src/FuncTorrent/Network.hs +++ b/src/FuncTorrent/Network.hs @@ -1,7 +1,7 @@ {-# LANGUAGE OverloadedStrings #-} module FuncTorrent.Network ( - get, + httpget, mkParams ) where @@ -17,8 +17,8 @@ import Network.URI (parseURI) mkParams :: [(String, ByteString)] -> ByteString mkParams params = BC.intercalate "&" [concat [pack f, "=", s] | (f,s) <- params] -get :: String -> [(String, ByteString)] -> IO ByteString -get url args = simpleHTTP (defaultGETRequest_ url') >>= getResponseBody +httpget :: String -> [(String, ByteString)] -> IO ByteString +httpget url args = simpleHTTP (defaultGETRequest_ url') >>= getResponseBody where url' = case parseURI $ unpack $ concat [pack url, "?", qstr] of Just x -> x _ -> error "Bad tracker URL" diff --git a/src/FuncTorrent/Tracker.hs b/src/FuncTorrent/Tracker.hs index ab14b09..cf1648e 100644 --- a/src/FuncTorrent/Tracker.hs +++ b/src/FuncTorrent/Tracker.hs @@ -1,11 +1,15 @@ {-# LANGUAGE OverloadedStrings #-} module FuncTorrent.Tracker - (TrackerResponse(..), - getTrackerResponse + (TState(..), + initialTrackerState, + trackerLoop, ) where import Prelude hiding (lookup, splitAt) +import Control.Concurrent (threadDelay) +import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, readMVar, putMVar, takeMVar) +import Control.Monad.State import Data.ByteString (ByteString) import Data.ByteString.Char8 as BC (pack, unpack, splitAt) import Data.Char (chr) @@ -17,17 +21,41 @@ import qualified Data.ByteString.Base16 as B16 (encode) import FuncTorrent.Bencode (BVal(..), decode) import FuncTorrent.Metainfo (Info(..), Metainfo(..)) -import FuncTorrent.Network (get) +import FuncTorrent.Network (httpget) import FuncTorrent.Peer (Peer(..)) import FuncTorrent.Utils (splitN) -- | Tracker response data TrackerResponse = TrackerResponse { - interval :: Maybe Integer - , peers :: [Peer] - , complete :: Maybe Integer - , incomplete :: Maybe Integer - } deriving (Show, Eq) + interval :: Integer + , peers :: [Peer] + , complete :: Maybe Integer + , incomplete :: Maybe Integer + } deriving (Show, Eq) + +data TrackerEventState = Started + | Stopped + | Completed + deriving (Show, Eq) + +data TState = TState { + uploaded :: MVar Integer + , downloaded :: MVar Integer + , left :: Integer + , currentState :: TrackerEventState + , connectedPeers :: MVar [Peer] + } + +initialTrackerState :: Integer -> IO TState +initialTrackerState sz = do + ps <- newEmptyMVar + up <- newMVar 0 + down <- newMVar 0 + return $ TState { currentState = Started + , connectedPeers = ps + , uploaded = up + , downloaded = down + , left = sz } -- | Deserialize tracker response mkTrackerResponse :: BVal -> Either ByteString TrackerResponse @@ -40,7 +68,7 @@ mkTrackerResponse resp = (Just (Bstr peersBS)) = lookup "peers" body pl = map makePeer (splitN 6 peersBS) in Right TrackerResponse { - interval = Just i + interval = i , peers = pl , complete = Nothing , incomplete = Nothing @@ -63,18 +91,6 @@ mkTrackerResponse resp = makePeer peer = Peer "" (toIP ip') (toPort port') where (ip', port') = splitAt 4 peer --- | Connect to a tracker and get peer info -tracker :: PortNumber -> String -> Metainfo -> IO ByteString -tracker port peer_id m = - get (head . announceList $ m) $ mkArgs port peer_id m - -getTrackerResponse :: PortNumber -> String -> Metainfo -> IO (Either ByteString TrackerResponse) -getTrackerResponse port peerId m = do - resp <- tracker port peerId m - case decode resp of - Right trackerInfo -> return $ mkTrackerResponse trackerInfo - Left e -> return $ Left (pack (show e)) - --- | URL encode hash as per RFC1738 --- TODO: Add tests --- REVIEW: Why is this not written in terms of `Network.HTTP.Base.urlEncode` or @@ -91,14 +107,32 @@ urlEncodeHash bs = concatMap (encode' . unpack) (splitN 2 bs) -- | Make arguments that should be posted to tracker. -- This is a separate pure function for testability. -mkArgs :: PortNumber -> String -> Metainfo -> [(String, ByteString)] -mkArgs port peer_id m = [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m), - ("peer_id", pack . urlEncode $ peer_id), - ("port", pack $ show port), - ("uploaded", "0"), - ("downloaded", "0"), - ("left", pack . show . lengthInBytes $ info m), - ("compact", "1"), - ("event", "started")] - +mkArgs :: PortNumber -> String -> Integer -> Integer -> Metainfo -> [(String, ByteString)] +mkArgs port peer_id up down m = + let fileSize = lengthInBytes $ info m + bytesLeft = fileSize - down + in + [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m), + ("peer_id", pack . urlEncode $ peer_id), + ("port", pack $ show port), + ("uploaded", pack $ show up), + ("downloaded", pack $ show down), + ("left", pack $ show bytesLeft), + ("compact", "1"), + ("event", "started")] + +trackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO ByteString +trackerLoop port peerId m st = do + up <- liftIO $ readMVar $ uploaded st + down <- liftIO $ readMVar $ downloaded st + resp <- liftIO $ httpget (head . announceList $ m) $ mkArgs port peerId up down m + case decode resp of + Left e -> return $ pack (show e) + Right trackerInfo -> + case mkTrackerResponse trackerInfo of + Left e -> return e + Right tresp -> do + _ <- threadDelay $ fromIntegral (interval tresp) + _ <- putMVar (connectedPeers st) (peers tresp) + trackerLoop port peerId m st diff --git a/src/main/Main.hs b/src/main/Main.hs index b67b6b2..4dd4c68 100644 --- a/src/main/Main.hs +++ b/src/main/Main.hs @@ -4,7 +4,8 @@ module Main where import Prelude hiding (log, length, readFile, getContents, replicate, writeFile) import Control.Concurrent (forkIO) -import Data.ByteString.Char8 (ByteString, getContents, readFile, writeFile, unpack, replicate) +import Control.Concurrent.MVar (readMVar) +import Data.ByteString.Char8 (ByteString, getContents, readFile, writeFile, replicate) import Network (PortID (PortNumber)) import System.Environment (getArgs) import System.Exit (exitSuccess) @@ -15,7 +16,7 @@ import FuncTorrent.Logger (initLogger, logMessage, logStop) import FuncTorrent.Metainfo (Info(..), Metainfo(..), torrentToMetainfo) import FuncTorrent.Peer (initPieceMap, handlePeerMsgs, pieceMapFromFile) import qualified FuncTorrent.Server as Server -import FuncTorrent.Tracker (peers, getTrackerResponse) +import FuncTorrent.Tracker (connectedPeers, initialTrackerState, trackerLoop) logError :: String -> (String -> IO ()) -> IO () logError e logMsg = logMsg $ "parse error: \n" ++ e @@ -60,7 +61,7 @@ main = do -- if we had downloaded the file before (partly or completely) -- then we should check the current directory for the existence -- of the file and then update the map of each piece' availability. - -- This can be donw by reading each piece and verifying the checksum. + -- This can be done by reading each piece and verifying the checksum. -- If the checksum does not match, we don't have that piece. let filePath = name (info m) -- really this is just the file name, not file path fileLen = lengthInBytes (info m) @@ -80,13 +81,13 @@ main = do (serverSock, (PortNumber portnum)) <- Server.start log $ "server started on " ++ show portnum log "Trying to fetch peers" - forkIO $ Server.run serverSock peerId m pieceMap + _ <- forkIO $ Server.run serverSock peerId m pieceMap log $ "Trackers: " ++ head (announceList m) - trackerResp <- getTrackerResponse portnum peerId m - case trackerResp of - Left e -> log $ "Error" ++ unpack e - Right peerList -> do - log $ "Peers List : " ++ (show . peers $ peerList) - let p1 = head (peers peerList) - handlePeerMsgs p1 peerId m pieceMap True - logStop logR + -- (tstate, errstr) <- runTracker portnum peerId m + tstate <- initialTrackerState $ lengthInBytes $ info m + _ <- forkIO $ trackerLoop portnum peerId m tstate >> return () + ps <- readMVar (connectedPeers tstate) + log $ "Peers List : " ++ (show ps) + let p1 = head ps + handlePeerMsgs p1 peerId m pieceMap True + logStop logR