From: Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
Date: Fri, 2 Oct 2015 09:03:59 +0000 (+0530)
Subject: Tracker is a separate thread now
X-Git-Url: https://git.rkrishnan.org/%5B/frontends/flags/%22file:/%22news.html/?a=commitdiff_plain;h=cfd13b8f2f31f0273331c8209d410748cfb39dc8;p=functorrent.git

Tracker is a separate thread now
---

diff --git a/src/FuncTorrent/Network.hs b/src/FuncTorrent/Network.hs
index eac69c9..1a569a5 100644
--- a/src/FuncTorrent/Network.hs
+++ b/src/FuncTorrent/Network.hs
@@ -1,7 +1,7 @@
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Network
     (
-     get,
+     httpget,
      mkParams
     ) where
 
@@ -17,8 +17,8 @@ import Network.URI (parseURI)
 mkParams :: [(String, ByteString)] -> ByteString
 mkParams params = BC.intercalate "&" [concat [pack f, "=", s] | (f,s) <- params]
 
-get :: String -> [(String, ByteString)] -> IO ByteString
-get url args = simpleHTTP (defaultGETRequest_ url') >>= getResponseBody
+httpget :: String -> [(String, ByteString)] -> IO ByteString
+httpget url args = simpleHTTP (defaultGETRequest_ url') >>= getResponseBody
     where url' = case parseURI $ unpack $ concat [pack url, "?", qstr] of
                    Just x -> x
                    _ -> error "Bad tracker URL"
diff --git a/src/FuncTorrent/Tracker.hs b/src/FuncTorrent/Tracker.hs
index ab14b09..cf1648e 100644
--- a/src/FuncTorrent/Tracker.hs
+++ b/src/FuncTorrent/Tracker.hs
@@ -1,11 +1,15 @@
 {-# LANGUAGE OverloadedStrings #-}
 module FuncTorrent.Tracker
-    (TrackerResponse(..),
-     getTrackerResponse
+    (TState(..),
+     initialTrackerState,
+     trackerLoop,
     ) where
 
 import Prelude hiding (lookup, splitAt)
 
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, readMVar, putMVar, takeMVar)
+import Control.Monad.State
 import Data.ByteString (ByteString)
 import Data.ByteString.Char8 as BC (pack, unpack, splitAt)
 import Data.Char (chr)
@@ -17,17 +21,41 @@ import qualified Data.ByteString.Base16 as B16 (encode)
 
 import FuncTorrent.Bencode (BVal(..), decode)
 import FuncTorrent.Metainfo (Info(..), Metainfo(..))
-import FuncTorrent.Network (get)
+import FuncTorrent.Network (httpget)
 import FuncTorrent.Peer (Peer(..))
 import FuncTorrent.Utils (splitN)
 
 -- | Tracker response
 data TrackerResponse = TrackerResponse {
-      interval :: Maybe Integer
-    , peers :: [Peer]
-    , complete :: Maybe Integer
-    , incomplete :: Maybe Integer
-    } deriving (Show, Eq)
+  interval :: Integer
+  , peers :: [Peer]
+  , complete :: Maybe Integer
+  , incomplete :: Maybe Integer
+  } deriving (Show, Eq)
+
+data TrackerEventState = Started
+                       | Stopped
+                       | Completed
+                       deriving (Show, Eq)
+
+data TState = TState {
+    uploaded :: MVar Integer
+  , downloaded :: MVar Integer
+  , left :: Integer
+  , currentState :: TrackerEventState
+  , connectedPeers :: MVar [Peer]
+  }
+
+initialTrackerState :: Integer -> IO TState
+initialTrackerState sz = do
+  ps <- newEmptyMVar
+  up <- newMVar 0
+  down <- newMVar 0
+  return $ TState { currentState = Started
+                  , connectedPeers = ps
+                  , uploaded = up
+                  , downloaded = down
+                  , left = sz }
 
 -- | Deserialize tracker response
 mkTrackerResponse :: BVal -> Either ByteString TrackerResponse
@@ -40,7 +68,7 @@ mkTrackerResponse resp =
               (Just (Bstr peersBS)) = lookup "peers" body
               pl = map makePeer (splitN 6 peersBS)
           in Right TrackerResponse {
-                   interval = Just i
+                   interval = i
                  , peers = pl
                  , complete = Nothing
                  , incomplete = Nothing
@@ -63,18 +91,6 @@ mkTrackerResponse resp =
       makePeer peer = Peer "" (toIP ip') (toPort port')
           where (ip', port') = splitAt 4 peer
 
--- | Connect to a tracker and get peer info
-tracker :: PortNumber -> String -> Metainfo -> IO ByteString
-tracker port peer_id m =
-  get (head . announceList $ m) $ mkArgs port peer_id m
-
-getTrackerResponse :: PortNumber -> String -> Metainfo -> IO (Either ByteString TrackerResponse)
-getTrackerResponse port peerId m = do
-  resp <- tracker port peerId m
-  case decode resp of
-   Right trackerInfo -> return $ mkTrackerResponse trackerInfo
-   Left e -> return $ Left (pack (show e))
-
 --- | URL encode hash as per RFC1738
 --- TODO: Add tests
 --- REVIEW: Why is this not written in terms of `Network.HTTP.Base.urlEncode` or
@@ -91,14 +107,32 @@ urlEncodeHash bs = concatMap (encode' . unpack) (splitN 2 bs)
 
 -- | Make arguments that should be posted to tracker.
 -- This is a separate pure function for testability.
-mkArgs :: PortNumber -> String -> Metainfo -> [(String, ByteString)]
-mkArgs port peer_id m = [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m),
-                         ("peer_id", pack . urlEncode $ peer_id),
-                         ("port", pack $ show port),
-                         ("uploaded", "0"),
-                         ("downloaded", "0"),
-                         ("left", pack . show . lengthInBytes $ info m),
-                         ("compact", "1"),
-                         ("event", "started")]
-
+mkArgs :: PortNumber -> String -> Integer -> Integer -> Metainfo -> [(String, ByteString)]
+mkArgs port peer_id up down m =
+  let fileSize = lengthInBytes $ info m
+      bytesLeft = fileSize - down
+  in
+    [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m),
+     ("peer_id", pack . urlEncode $ peer_id),
+     ("port", pack $ show port),
+     ("uploaded", pack $ show up),
+     ("downloaded", pack $ show down),
+     ("left", pack $ show bytesLeft),
+     ("compact", "1"),
+     ("event", "started")]
+
+trackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO ByteString
+trackerLoop port peerId m st = do
+  up <- liftIO $ readMVar $ uploaded st
+  down <- liftIO $ readMVar $ downloaded st
+  resp <- liftIO $ httpget (head . announceList $ m) $ mkArgs port peerId up down m
+  case decode resp of
+    Left e -> return $ pack (show e)
+    Right trackerInfo ->
+      case mkTrackerResponse trackerInfo of
+        Left e -> return e
+        Right tresp -> do
+          _ <- threadDelay $ fromIntegral (interval tresp)
+          _ <- putMVar (connectedPeers st) (peers tresp)
+          trackerLoop port peerId m st
 
diff --git a/src/main/Main.hs b/src/main/Main.hs
index b67b6b2..4dd4c68 100644
--- a/src/main/Main.hs
+++ b/src/main/Main.hs
@@ -4,7 +4,8 @@ module Main where
 import Prelude hiding (log, length, readFile, getContents, replicate, writeFile)
 
 import Control.Concurrent (forkIO)
-import Data.ByteString.Char8 (ByteString, getContents, readFile, writeFile, unpack, replicate)
+import Control.Concurrent.MVar (readMVar)
+import Data.ByteString.Char8 (ByteString, getContents, readFile, writeFile, replicate)
 import Network (PortID (PortNumber))
 import System.Environment (getArgs)
 import System.Exit (exitSuccess)
@@ -15,7 +16,7 @@ import FuncTorrent.Logger (initLogger, logMessage, logStop)
 import FuncTorrent.Metainfo (Info(..), Metainfo(..), torrentToMetainfo)
 import FuncTorrent.Peer (initPieceMap, handlePeerMsgs, pieceMapFromFile)
 import qualified FuncTorrent.Server as Server
-import FuncTorrent.Tracker (peers, getTrackerResponse)
+import FuncTorrent.Tracker (connectedPeers, initialTrackerState, trackerLoop)
 
 logError :: String -> (String -> IO ()) -> IO ()
 logError e logMsg = logMsg $ "parse error: \n" ++ e
@@ -60,7 +61,7 @@ main = do
        -- if we had downloaded the file before (partly or completely)
        -- then we should check the current directory for the existence
        -- of the file and then update the map of each piece' availability.
-       -- This can be donw by reading each piece and verifying the checksum.
+       -- This can be done by reading each piece and verifying the checksum.
        -- If the checksum does not match, we don't have that piece.
        let filePath = name (info m) -- really this is just the file name, not file path
            fileLen = lengthInBytes (info m)
@@ -80,13 +81,13 @@ main = do
        (serverSock, (PortNumber portnum)) <- Server.start
        log $ "server started on " ++ show portnum
        log "Trying to fetch peers"
-       forkIO $ Server.run serverSock peerId m pieceMap
+       _ <- forkIO $ Server.run serverSock peerId m pieceMap
        log $ "Trackers: " ++ head (announceList m)
-       trackerResp <- getTrackerResponse portnum peerId m
-       case  trackerResp of
-        Left e -> log $ "Error" ++ unpack e
-        Right peerList -> do
-          log $ "Peers List : " ++ (show . peers $ peerList)
-          let p1 = head (peers peerList)
-          handlePeerMsgs p1 peerId m pieceMap True
-    logStop logR
+       -- (tstate, errstr) <- runTracker portnum peerId m
+       tstate <- initialTrackerState $ lengthInBytes $ info m
+       _ <- forkIO $ trackerLoop portnum peerId m tstate >> return ()
+       ps <- readMVar (connectedPeers tstate)
+       log $ "Peers List : " ++ (show ps)
+       let p1 = head ps
+       handlePeerMsgs p1 peerId m pieceMap True
+       logStop logR