From: Ramakrishnan Muthukrishnan Date: Sun, 19 Jun 2016 06:30:19 +0000 (+0530) Subject: Tracker/Udp: refactor the trackerloop, add timeouts X-Git-Url: https://git.rkrishnan.org/pf/content/en/service/module-simplejson._speedups.html?a=commitdiff_plain;h=2ddd7237e3615d4a55460ca86de22a669c90232c;p=functorrent.git Tracker/Udp: refactor the trackerloop, add timeouts --- diff --git a/src/FuncTorrent/PeerMsgs.hs b/src/FuncTorrent/PeerMsgs.hs index 79c41c2..cfefce1 100644 --- a/src/FuncTorrent/PeerMsgs.hs +++ b/src/FuncTorrent/PeerMsgs.hs @@ -24,6 +24,7 @@ module FuncTorrent.PeerMsgs sendMsg, getMsg, Peer(..), + makePeer, PeerMsg(..) ) where @@ -32,6 +33,7 @@ import Prelude hiding (lookup, concat, replicate, splitAt, take) import System.IO (Handle) import Data.ByteString (ByteString, pack, unpack, concat, hGet, hPut, singleton) import Data.ByteString.Lazy (fromStrict, fromChunks, toStrict) +import Data.ByteString.Char8 as BC (splitAt) import qualified Data.ByteString.Char8 as BC (replicate, pack) import Control.Monad (replicateM, liftM) import Control.Applicative (liftA3) @@ -40,6 +42,8 @@ import Data.Binary (Binary(..), decode, encode) import Data.Binary.Put (putWord32be, putWord16be, putWord8) import Data.Binary.Get (getWord32be, getWord16be, getWord8, runGet) +import FuncTorrent.Utils (toIP, toPort) + -- | Peer is a PeerID, IP address, port tuple data Peer = Peer ID IP Port deriving (Show, Eq) @@ -142,3 +146,7 @@ genHandshakeMsg infoHash peer_id = concat [pstrlen, pstr, reserved, infoHash, pe bsToInt :: ByteString -> Int bsToInt x = fromIntegral (runGet getWord32be (fromChunks (return x))) + +makePeer :: ByteString -> Peer +makePeer peer = Peer "" (toIP ip') (toPort port') + where (ip', port') = splitAt 4 peer diff --git a/src/FuncTorrent/Tracker.hs b/src/FuncTorrent/Tracker.hs index 9873fe1..8f6a5cc 100644 --- a/src/FuncTorrent/Tracker.hs +++ b/src/FuncTorrent/Tracker.hs @@ -61,6 +61,7 @@ runTracker msgChannel fsChan infohash port peerId announceList sz = do return () Udp -> do _ <- forkIO $ UT.trackerLoop turl (fromIntegral port) peerId infohash fsChan initialTState + runStateT (msgHandler msgChannel) initialTState return () _ -> error "Tracker Protocol unimplemented" diff --git a/src/FuncTorrent/Tracker/Http.hs b/src/FuncTorrent/Tracker/Http.hs index abb4b32..f911b2f 100644 --- a/src/FuncTorrent/Tracker/Http.hs +++ b/src/FuncTorrent/Tracker/Http.hs @@ -41,8 +41,8 @@ import qualified FuncTorrent.Bencode as Benc import FuncTorrent.Bencode (BVal(..)) import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats) import FuncTorrent.Network (sendGetRequest) -import FuncTorrent.Peer (Peer(..)) -import FuncTorrent.Utils (splitN, toIP, toPort, IP, Port) +import FuncTorrent.PeerMsgs (Peer(..), makePeer) +import FuncTorrent.Utils (splitN, IP, Port) import FuncTorrent.Tracker.Types(TState(..), TrackerResponse(..)) @@ -74,12 +74,12 @@ mkArgs port peer_id up down left' infoHash = ("event", "started")] trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO () -trackerLoop url port peerId infohash fschan tstate = forever $ do +trackerLoop url sport peerId infohash fschan tstate = forever $ do st' <- FS.getStats fschan st <- readMVar st' let up = FS.bytesRead st down = FS.bytesWritten st - resp <- sendGetRequest url $ mkArgs port peerId up down (left tstate) infohash + resp <- sendGetRequest url $ mkArgs sport peerId up down (left tstate) infohash case Benc.decode resp of Left e -> return () -- $ pack (show e) @@ -113,6 +113,3 @@ parseTrackerResponse resp = where (Bdict body) = resp -makePeer :: ByteString -> Peer -makePeer peer = Peer "" (toIP ip') (toPort port') - where (ip', port') = splitAt 4 peer diff --git a/src/FuncTorrent/Tracker/Udp.hs b/src/FuncTorrent/Tracker/Udp.hs index 37979c4..fe4d9e0 100644 --- a/src/FuncTorrent/Tracker/Udp.hs +++ b/src/FuncTorrent/Tracker/Udp.hs @@ -23,8 +23,9 @@ module FuncTorrent.Tracker.Udp ) where import Control.Applicative (liftA2) -import Control.Monad (liftM) -import Control.Concurrent.MVar (readMVar) +import Control.Monad (liftM, forever, void) +import Control.Concurrent (threadDelay) +import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar) import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO) import Data.Binary (Binary(..), encode, decode) import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString) @@ -36,7 +37,9 @@ import Data.Word (Word16, Word32, Word64) import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..)) import Network.Socket.ByteString (sendTo, recvFrom) import System.Random (randomIO) +import System.Timeout (timeout) +import FuncTorrent.Peer (Peer(..)) import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..)) import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort) import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats) @@ -53,7 +56,7 @@ data UDPRequest = ConnectReq Word32 deriving (Show, Eq) data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id - | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)] + | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)] | ScrapeResp Integer Integer Integer Integer | ErrorResp Integer String deriving (Show, Eq) @@ -170,7 +173,8 @@ announceRequest cid infohash peerId up down left port = do data PeerStats = PeerStats { leechers :: Word32 , seeders :: Word32 - , peers :: [(IP, Port)] + , interval :: Word32 + , peers :: [Peer] } deriving (Show) announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats @@ -182,12 +186,12 @@ announceResponse tid = do if tidr == tid then do liftIO $ putStrLn "announce succeeded" - return $ PeerStats ls ss xs + return $ PeerStats ls ss interval xs else - return $ PeerStats 0 0 [] - _ -> return $ PeerStats 0 0 [] + return $ PeerStats 0 0 0 [] + _ -> return $ PeerStats 0 0 0 [] -getIPPortPairs :: Get [(IP, Port)] +getIPPortPairs :: Get [Peer] getIPPortPairs = do empty <- isEmpty if empty @@ -196,7 +200,7 @@ getIPPortPairs = do ip <- toIP <$> getByteString 4 port <- toPort <$> getByteString 2 ipportpairs <- getIPPortPairs - return $ (ip, port) : ipportpairs + return $ (Peer "" ip port) : ipportpairs startSession :: String -> Port -> IO UDPTrackerHandle startSession host port = do @@ -211,19 +215,29 @@ closeSession :: UDPTrackerHandle -> IO () closeSession (UDPTrackerHandle s _ _) = close s trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO () -trackerLoop url sport peerId infohash fschan tstate = do - st' <- FS.getStats fschan - st <- readMVar st' - let up = FS.bytesRead st - down = FS.bytesWritten st - port = getPort url - host = getHostname url - putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port) +trackerLoop url sport peerId infohash fschan tstate = forever $ do + st <- fmap readMVar $ FS.getStats fschan + up <- fmap FS.bytesRead st + down <- fmap FS.bytesWritten st handle <- startSession host port - flip runReaderT handle $ do - t1 <- connectRequest - cid <- connectResponse t1 - t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport) - stats <- announceResponse t2 - liftIO $ print stats --- _ <- threadDelay $ + stats <- timeout (15*(10^6)) $ worker handle up down + case stats of + Nothing -> closeSession handle + Just stats' -> do + ps <- isEmptyMVar $ connectedPeers tstate + if ps + then + putMVar (connectedPeers tstate) (peers stats') + else + void $ swapMVar (connectedPeers tstate) (peers stats') + threadDelay $ fromIntegral (interval stats') * (10^6) + return () + where + port = getPort url + host = getHostname url + worker handle up down = flip runReaderT handle $ do + t1 <- connectRequest + cid <- connectResponse t1 + t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport) + stats <- announceResponse t2 + return stats