From: Ramakrishnan Muthukrishnan Date: Tue, 15 Dec 2015 16:55:49 +0000 (+0530) Subject: FileSystem: add read/write stats X-Git-Url: https://git.rkrishnan.org/specifications/components/com_hotproperty/css/reliability?a=commitdiff_plain;h=d30178161347397d4f736735fec838f984be996a;p=functorrent.git FileSystem: add read/write stats --- diff --git a/src/FuncTorrent/FileSystem.hs b/src/FuncTorrent/FileSystem.hs index df203b0..bfaf674 100644 --- a/src/FuncTorrent/FileSystem.hs +++ b/src/FuncTorrent/FileSystem.hs @@ -1,20 +1,21 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE FlexibleContexts #-} module FuncTorrent.FileSystem - (startThread, + (run, MsgChannel, createMsgChannel, - writePiece, + writePieceToDisk, Piece(..), pieceMapFromFile ) where -import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) import Control.Concurrent.MVar (MVar, putMVar) import Control.Monad (forever) -import Data.Map (traverseWithKey) +import Control.Monad.State (StateT, liftIO, runStateT, modify) import qualified Data.ByteString as BS -import Data.Map ((!)) +import Data.Map (traverseWithKey, (!)) import System.IO (Handle, IOMode (ReadWriteMode), withFile) import System.Directory (doesFileExist) @@ -30,24 +31,37 @@ data Msg = ReadPiece PieceNum Integer (MVar Piece) type MsgChannel = Chan Msg +data Stats = Stats { bytesRead :: Integer + , bytesWritten :: Integer + } + createMsgChannel :: IO (Chan Msg) createMsgChannel = newChan -startThread :: PieceMap -> MsgChannel -> Handle -> IO ThreadId -startThread pieceMap c handle = do - forkIO $ forever $ recvMsg >>= sendResponse +run :: PieceMap -> MsgChannel -> Handle -> IO () +run pieceMap c handle = do + _ <- runStateT (run' pieceMap c handle) initialStats + return () + where initialStats = Stats { bytesRead = 0 + , bytesWritten = 0 } + +run' :: PieceMap -> MsgChannel -> Handle -> StateT Stats IO () +run' pieceMap c handle = do + msg <- liftIO recvMsg + liftIO $ sendResponse msg + updateState msg where recvMsg = readChan c sendResponse msg = case msg of - ReadPiece n len' var -> do - bs <- readPiece n len' - putMVar var (Piece n bs) - WritePiece (Piece n bs) -> do - writePiece n bs - VerifyPiece n var -> do - isHashValid <- verifyPiece n - putMVar var isHashValid + ReadPiece n len' var -> do + bs <- readPiece n len' + putMVar var (Piece n bs) + WritePiece (Piece n bs) -> + writePiece n bs + VerifyPiece n var -> do + isHashValid <- verifyPiece n + putMVar var isHashValid readPiece n len' = do let offset = pieceNumToOffset pieceMap n readFileAtOffset handle offset len' @@ -60,6 +74,11 @@ startThread pieceMap c handle = do len' = len (pieceMap ! n) bs' <- readFileAtOffset handle offset len' return $ verifyHash bs' hash' + updateState (ReadPiece _ l _) = + modify (\st -> st {bytesRead = bytesRead st + l}) + updateState (WritePiece (Piece _ bs)) = + modify (\st -> st {bytesWritten = bytesWritten st + fromIntegral (BS.length bs)}) + updateState _ = modify id pieceMapFromFile :: FilePath -> Integer -> PieceMap -> IO PieceMap pieceMapFromFile filePath fileLen pieceMap = do @@ -75,7 +94,7 @@ pieceMapFromFile filePath fileLen pieceMap = do then return $ v { dlstate = Have } else return v -writePiece :: MsgChannel -> PieceNum -> BS.ByteString -> IO () -writePiece c pieceNum bs = do +writePieceToDisk :: MsgChannel -> PieceNum -> BS.ByteString -> IO () +writePieceToDisk c pieceNum bs = writeChan c $ WritePiece (Piece pieceNum bs) diff --git a/src/FuncTorrent/Peer.hs b/src/FuncTorrent/Peer.hs index bd66f99..05ecc3c 100644 --- a/src/FuncTorrent/Peer.hs +++ b/src/FuncTorrent/Peer.hs @@ -19,7 +19,7 @@ import FuncTorrent.Metainfo (Metainfo(..)) import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg) import FuncTorrent.Utils (splitNum, verifyHash) import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability) -import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePiece, Piece(..)) +import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk, Piece(..)) data PState = PState { handle :: Handle , peer :: Peer @@ -116,7 +116,7 @@ msgLoop pieceStatus msgchannel = do liftIO $ putStrLn "Hash mismatch" else do liftIO $ putStrLn $ "Write piece: " ++ show workPiece - liftIO $ FS.writePiece msgchannel workPiece pBS + liftIO $ FS.writePieceToDisk msgchannel workPiece pBS msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel _ -> do msg <- liftIO $ getMsg h diff --git a/src/main/Main.hs b/src/main/Main.hs index 4efd36f..c641fa0 100644 --- a/src/main/Main.hs +++ b/src/main/Main.hs @@ -4,9 +4,10 @@ module Main where import Prelude hiding (log, length, readFile, getContents) import Control.Concurrent (forkIO, killThread) +import Control.Monad.State (liftIO) import Control.Concurrent.MVar (readMVar) import Data.ByteString.Char8 (ByteString, getContents, readFile) -import qualified FuncTorrent.FileSystem as FS (createMsgChannel, pieceMapFromFile, startThread) +import qualified FuncTorrent.FileSystem as FS (createMsgChannel, pieceMapFromFile, run) import FuncTorrent.Logger (initLogger, logMessage, logStop) import FuncTorrent.Metainfo (Info(..), Metainfo(..), torrentToMetainfo) import FuncTorrent.Peer (handlePeerMsgs) @@ -75,7 +76,7 @@ main = do log $ "Downloading file : " ++ filePath pieceMap <- FS.pieceMapFromFile filePath fileLen defaultPieceMap log $ "start filesystem manager thread" - fsTid <- withFile filePath ReadWriteMode (FS.startThread pieceMap fsMsgChannel) + fsTid <- forkIO $ withFile filePath ReadWriteMode (FS.run pieceMap fsMsgChannel) log $ "starting server" (serverSock, (PortNumber portnum)) <- Server.start log $ "server started on " ++ show portnum