createMsgChannel,
writePieceToDisk,
Piece(..),
- pieceMapFromFile
+ pieceMapFromFile,
+ Stats(..),
+ getStats
)
where
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
-import Control.Concurrent.MVar (MVar, putMVar)
+import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar)
import Control.Monad (forever)
-import Control.Monad.State (StateT, liftIO, runStateT, modify)
+import Control.Monad.State (StateT, liftIO, get, runStateT, modify)
import qualified Data.ByteString as BS
import Data.Map (traverseWithKey, (!))
import System.IO (Handle, IOMode (ReadWriteMode), withFile)
data Msg = ReadPiece PieceNum Integer (MVar Piece)
| WritePiece Piece
| VerifyPiece PieceNum (MVar Bool)
+ | GetStats (MVar Stats)
type MsgChannel = Chan Msg
run' :: PieceMap -> MsgChannel -> Handle -> StateT Stats IO ()
run' pieceMap c handle = do
+ stats <- get
msg <- liftIO recvMsg
- liftIO $ sendResponse msg
+ liftIO $ sendResponse msg stats
updateStats msg
- where
- recvMsg = readChan c
- sendResponse msg =
- case msg of
- ReadPiece n len' var -> do
- bs <- readPiece n len'
- putMVar var (Piece n bs)
- WritePiece (Piece n bs) ->
- writePiece n bs
- VerifyPiece n var -> do
- isHashValid <- verifyPiece n
- putMVar var isHashValid
- readPiece n len' = do
- let offset = pieceNumToOffset pieceMap n
- readFileAtOffset handle offset len'
- writePiece n piece = do
- let offset = pieceNumToOffset pieceMap n
- writeFileAtOffset handle offset piece
- verifyPiece n = do
- let offset = pieceNumToOffset pieceMap n
- hash' = hash (pieceMap ! n)
- len' = len (pieceMap ! n)
- bs' <- readFileAtOffset handle offset len'
- return $ verifyHash bs' hash'
- updateStats (ReadPiece _ l _) =
- modify (\st -> st {bytesRead = bytesRead st + l})
- updateStats (WritePiece (Piece _ bs)) =
- modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)})
- updateStats _ = modify id
+ where
+ recvMsg = readChan c
+ sendResponse msg stats =
+ case msg of
+ ReadPiece n len' var -> do
+ bs <- readPiece n len'
+ putMVar var (Piece n bs)
+ WritePiece (Piece n bs) ->
+ writePiece n bs
+ VerifyPiece n var -> do
+ isHashValid <- verifyPiece n
+ putMVar var isHashValid
+ GetStats var -> do
+ putMVar var stats
+ readPiece n len' = do
+ let offset = pieceNumToOffset pieceMap n
+ readFileAtOffset handle offset len'
+ writePiece n piece = do
+ let offset = pieceNumToOffset pieceMap n
+ writeFileAtOffset handle offset piece
+ verifyPiece n = do
+ let offset = pieceNumToOffset pieceMap n
+ hash' = hash (pieceMap ! n)
+ len' = len (pieceMap ! n)
+ bs' <- readFileAtOffset handle offset len'
+ return $ verifyHash bs' hash'
+ updateStats (ReadPiece _ l _) =
+ modify (\st -> st {bytesRead = bytesRead st + l})
+ updateStats (WritePiece (Piece _ bs)) =
+ modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)})
+ updateStats _ = modify id
pieceMapFromFile :: FilePath -> Integer -> PieceMap -> IO PieceMap
pieceMapFromFile filePath fileLen pieceMap = do
writePieceToDisk c pieceNum bs =
writeChan c $ WritePiece (Piece pieceNum bs)
+getStats :: MsgChannel -> IO (MVar Stats)
+getStats c = do
+ v <- newEmptyMVar
+ writeChan c $ GetStats v
+ return v
import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
import FuncTorrent.Utils (splitNum, verifyHash)
import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
-import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk, Piece(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
data PState = PState { handle :: Handle
, peer :: Peer
{-# LANGUAGE OverloadedStrings #-}
module FuncTorrent.Tracker
- (TState(..),
- initialTrackerState,
- trackerLoop
- ) where
+ (runTracker
+ , getConnectedPeers
+ , newTracker
+ ) where
-import Control.Concurrent.MVar (newEmptyMVar, newMVar)
+import Control.Concurrent(forkIO)
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
+import Control.Concurrent.MVar (newEmptyMVar, putMVar, readMVar)
+import Control.Monad.State (StateT, liftIO, get, runStateT)
+import Control.Monad (forever)
+import Data.ByteString.Char8 (ByteString)
import Data.List (isPrefixOf)
+import Network (PortNumber)
-import FuncTorrent.Tracker.Http(trackerLoop)
-import FuncTorrent.Tracker.Types(TState(..), TrackerEventState(..), TrackerProtocol(..))
+import FuncTorrent.Tracker.Http (trackerLoop)
+import FuncTorrent.Tracker.Types (TState(..), TrackerEventState(..), TrackerProtocol(..), TrackerMsg(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel)
+import FuncTorrent.Peer (Peer)
-initialTrackerState :: Integer -> IO TState
-initialTrackerState sz = do
+type MsgChannel = Chan TrackerMsg
+
+newTracker :: IO MsgChannel
+newTracker = newChan
+
+runTracker :: MsgChannel -> FS.MsgChannel -> ByteString -> PortNumber
+ -> String -> [String] -> Integer -> IO ()
+runTracker msgChannel fsChan infohash port peerId announceList sz = do
ps <- newEmptyMVar
- up <- newMVar 0
- down <- newMVar 0
- return $ TState { currentState = None
- , connectedPeers = ps
- , uploaded = up
- , downloaded = down
- , left = sz }
+ let initialTState = TState { currentState = None
+ , connectedPeers = ps
+ , left = sz }
+ turl = head announceList
+ case (getTrackerType turl) of
+ Http -> do
+ _ <- forkIO $ trackerLoop turl port peerId infohash fsChan initialTState
+ runStateT (msgHandler msgChannel) initialTState
+ return ()
+ _ -> do
+ error "Tracker Protocol unimplemented"
getTrackerType :: String -> TrackerProtocol
getTrackerType url | isPrefixOf "http://" url = Http
| isPrefixOf "udp://" url = Udp
| otherwise = UnknownProtocol
+
+msgHandler :: MsgChannel -> StateT TState IO ()
+msgHandler c = forever $ do
+ st <- get
+ peers <- liftIO $ readMVar (connectedPeers st)
+ msg <- liftIO recvMsg
+ liftIO $ sendResponse msg peers
+ where
+ recvMsg = readChan c
+ sendResponse msg peers =
+ case msg of
+ GetConnectedPeersMsg var -> do
+ putMVar var peers
+ _ -> do
+ putStrLn "Unhandled Tracker Msg"
+
+getConnectedPeers :: MsgChannel -> IO [Peer]
+getConnectedPeers c = do
+ v <- newEmptyMVar
+ writeChan c (GetConnectedPeersMsg v)
+ ps <- readMVar v
+ return ps
{-# LANGUAGE OverloadedStrings #-}
module FuncTorrent.Tracker.Http
- ( trackerLoop
+ (trackerLoop
) where
import Prelude hiding (lookup, splitAt)
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (readMVar, putMVar)
+import Control.Monad (forever)
import qualified Data.ByteString.Base16 as B16 (encode)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 as BC (pack, unpack, splitAt)
import qualified FuncTorrent.Bencode as Benc
import FuncTorrent.Bencode (BVal(..))
-import FuncTorrent.Metainfo (Info(..), Metainfo(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
import FuncTorrent.Network (sendGetRequest)
import FuncTorrent.Peer (Peer(..))
import FuncTorrent.Utils (splitN)
-- | Make arguments that should be posted to tracker.
-- This is a separate pure function for testability.
-mkArgs :: PortNumber -> String -> Integer -> Integer -> Metainfo -> [(String, ByteString)]
-mkArgs port peer_id up down m =
- let fileSize = lengthInBytes $ info m
- bytesLeft = fileSize - down
- in
- [("info_hash", pack . urlEncodeHash . B16.encode . infoHash $ m),
- ("peer_id", pack . urlEncode $ peer_id),
- ("port", pack $ show port),
- ("uploaded", pack $ show up),
- ("downloaded", pack $ show down),
- ("left", pack $ show bytesLeft),
- ("compact", "1"),
- ("event", "started")]
+mkArgs :: PortNumber -> String -> Integer -> Integer -> Integer -> ByteString -> [(String, ByteString)]
+mkArgs port peer_id up down left' infoHash =
+ [("info_hash", pack . urlEncodeHash . B16.encode $ infoHash),
+ ("peer_id", pack . urlEncode $ peer_id),
+ ("port", pack $ show port),
+ ("uploaded", pack $ show up),
+ ("downloaded", pack $ show down),
+ ("left", pack $ show left'),
+ ("compact", "1"),
+ ("event", "started")]
-trackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO ByteString
-trackerLoop port peerId m st = do
- up <- readMVar $ uploaded st
- down <- readMVar $ downloaded st
- resp <- sendGetRequest (head . announceList $ m) $ mkArgs port peerId up down m
+trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
+trackerLoop url port peerId infohash fschan tstate = forever $ do
+ st' <- FS.getStats fschan
+ st <- readMVar st'
+ let up = FS.bytesRead st
+ down = FS.bytesWritten st
+ resp <- sendGetRequest url $ mkArgs port peerId up down (left tstate) infohash
case Benc.decode resp of
- Left e -> return $ pack (show e)
+ Left e -> return () -- $ pack (show e)
Right trackerInfo ->
case parseTrackerResponse trackerInfo of
- Left e -> return e
+ Left e -> return () -- e
Right tresp -> do
_ <- threadDelay $ fromIntegral (interval tresp)
- _ <- putMVar (connectedPeers st) (peers tresp)
- trackerLoop port peerId m st
+ _ <- putMVar (connectedPeers tstate) (peers tresp)
+ return () -- trackerLoop port peerId st
parseTrackerResponse :: BVal -> Either ByteString TrackerResponse
parseTrackerResponse resp =
, TrackerResponse(..)
, TrackerEventState(..)
, TState(..)
+ , TrackerMsg(..)
, IP
, Port
) where
| Stopped
| Completed
deriving (Show, Eq)
+data TrackerMsg = GetStatusMsg
+ | GetConnectedPeersMsg (MVar [Peer])
-data TState = TState {
- uploaded :: MVar Integer
- , downloaded :: MVar Integer
- , left :: Integer
- , currentState :: TrackerEventState
- , connectedPeers :: MVar [Peer]
- }
+data TState = TState { left :: Integer
+ , currentState :: TrackerEventState
+ , connectedPeers :: MVar [Peer]
+ }
-- | Tracker response
data TrackerResponse = TrackerResponse {
import FuncTorrent.Peer (handlePeerMsgs)
import FuncTorrent.PieceManager (initPieceMap)
import qualified FuncTorrent.Server as Server
-import FuncTorrent.Tracker (connectedPeers, initialTrackerState, trackerLoop)
+import FuncTorrent.Tracker (runTracker, getConnectedPeers, newTracker)
import Network (PortID (PortNumber))
import System.IO (withFile, IOMode (ReadWriteMode))
import System.Directory (doesFileExist)
fileLen = lengthInBytes (info m)
pieceHash = pieces (info m)
pLen = pieceLength (info m)
+ infohash = infoHash m
defaultPieceMap = initPieceMap pieceHash fileLen pLen
log $ "create FS msg channel"
fsMsgChannel <- FS.createMsgChannel
log "Trying to fetch peers"
_ <- forkIO $ Server.run serverSock peerId m pieceMap fsMsgChannel
log $ "Trackers: " ++ head (announceList m)
- -- (tstate, errstr) <- runTracker portnum peerId m
- tstate <- initialTrackerState $ lengthInBytes $ info m
- _ <- forkIO $ trackerLoop portnum peerId m tstate >> return ()
- ps <- readMVar (connectedPeers tstate)
+ trackerMsgChan <- newTracker
+ _ <- forkIO $ runTracker trackerMsgChan fsMsgChannel infohash portnum peerId (announceList m) fileLen
+ ps <- getConnectedPeers trackerMsgChan
log $ "Peers List : " ++ (show ps)
let p1 = head ps
handlePeerMsgs p1 peerId m pieceMap True fsMsgChannel