X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2FFuncTorrent%2FPeer.hs;h=63aaa5afe9ede340862383318370616a8c9813e8;hb=9beb0fb9814b33725f6adfa5adabb3225a54277b;hp=845c14aa0d51f675a9ee00184c761ebeae0db486;hpb=6902cd2d7856b416a793dd95efdb4fd329c63e06;p=functorrent.git diff --git a/src/FuncTorrent/Peer.hs b/src/FuncTorrent/Peer.hs index 845c14a..63aaa5a 100644 --- a/src/FuncTorrent/Peer.hs +++ b/src/FuncTorrent/Peer.hs @@ -1,181 +1,85 @@ +{- + - Copyright (C) 2015-2016 Ramakrishnan Muthukrishnan + - + - This file is part of FuncTorrent. + - + - FuncTorrent is free software; you can redistribute it and/or modify + - it under the terms of the GNU General Public License as published by + - the Free Software Foundation; either version 3 of the License, or + - (at your option) any later version. + - + - FuncTorrent is distributed in the hope that it will be useful, + - but WITHOUT ANY WARRANTY; without even the implied warranty of + - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + - GNU General Public License for more details. + - + - You should have received a copy of the GNU General Public License + - along with FuncTorrent; if not, see + -} + {-# LANGUAGE OverloadedStrings #-} + module FuncTorrent.Peer - (Peer(..), + (PieceMap, handlePeerMsgs ) where -import Prelude hiding (lookup, concat, replicate, splitAt, writeFile, take) +import Prelude hiding (lookup, concat, replicate, splitAt, take, drop) -import System.IO (Handle, BufferMode(..), IOMode(..), SeekMode(..), withFile, hSeek, hSetBuffering) -import Data.ByteString (ByteString, pack, unpack, concat, hGet, hPut, singleton, writeFile, take, empty) -import Data.ByteString.Lazy (fromStrict, fromChunks, toStrict) -import qualified Data.ByteString.Char8 as BC (replicate, pack, length) -import Network (connectTo, PortID(..)) -import Data.Binary (Binary(..), decode, encode) -import Data.Binary.Put (putWord32be, putWord16be, putWord8) -import Data.Binary.Get (getWord32be, getWord16be, getWord8, runGet) -import Control.Monad (replicateM, liftM, forM) -import Control.Applicative ((<$>), liftA3) +import Control.Monad.State +import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty) import Data.Bits import Data.Word (Word8) -import Data.Map (Map, fromList, toList, (!), mapWithKey, adjust) -import qualified Crypto.Hash.SHA1 as SHA1 (hash) - -import FuncTorrent.Metainfo (Info(..), Metainfo(..)) -import FuncTorrent.Utils (splitN, splitNum) - -type ID = String -type IP = String -type Port = Integer - --- PeerState is a misnomer -data PeerState = PeerState { handle :: Handle - , peer :: Peer - , meChoking :: Bool - , meInterested :: Bool - , heChoking :: Bool - , heInterested :: Bool} - -data PieceDlState = Pending - | InProgress - | Have - deriving (Show, Eq) - --- todo - map with index to a new data structure (peers who have that piece amd state) -data PieceData = PieceData { peers :: [Peer] -- ^ list of peers who have this piece - , state :: PieceDlState -- ^ state of the piece from download perspective. - , hash :: ByteString -- ^ piece hash - , len :: Integer } -- ^ piece length - --- which piece is with which peers -type PieceMap = Map Integer PieceData - --- | Peer is a PeerID, IP address, port tuple -data Peer = Peer ID IP Port - deriving (Show, Eq) +import Data.Map ((!), adjust) +import Network (connectTo, PortID(..)) +import System.IO (Handle, BufferMode(..), hSetBuffering, hClose) -data PeerMsg = KeepAliveMsg - | ChokeMsg - | UnChokeMsg - | InterestedMsg - | NotInterestedMsg - | HaveMsg Integer - | BitFieldMsg ByteString - | RequestMsg Integer Integer Integer - | PieceMsg Integer Integer ByteString - | CancelMsg Integer Integer Integer - | PortMsg Port - deriving (Show) +import FuncTorrent.Metainfo (Metainfo(..)) +import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg) +import FuncTorrent.Utils (splitNum, verifyHash) +import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability) +import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk) --- Make the initial Piece map, with the assumption that no peer has the --- piece and that every piece is pending download. -mkPieceMap :: Integer -> ByteString -> [Integer] -> PieceMap -mkPieceMap numPieces pieceHash pLengths = fromList kvs - where kvs = [(i, PieceData { peers = [] - , state = Pending - , hash = h - , len = pLen }) - | (i, h, pLen) <- zip3 [0..numPieces] hashes pLengths] - hashes = splitN 20 pieceHash +data PState = PState { handle :: Handle + , peer :: Peer + , meChoking :: Bool + , meInterested :: Bool + , heChoking :: Bool + , heInterested :: Bool} havePiece :: PieceMap -> Integer -> Bool havePiece pm index = - state (pm ! index) == Have + dlstate (pm ! index) == Have -genHandShakeMsg :: ByteString -> String -> ByteString -genHandShakeMsg infoHash peer_id = concat [pstrlen, pstr, reserved, infoHash, peerID] - where pstrlen = singleton 19 - pstr = BC.pack "BitTorrent protocol" - reserved = BC.replicate 8 '\0' - peerID = BC.pack peer_id - -handShake :: Peer -> ByteString -> String -> IO Handle -handShake peer@(Peer _ ip port) infoHash peerid = do - let hs = genHandShakeMsg infoHash peerid +connectToPeer :: Peer -> IO Handle +connectToPeer (Peer ip port) = do h <- connectTo ip (PortNumber (fromIntegral port)) hSetBuffering h LineBuffering - hPut h hs - putStrLn $ "--> handhake to peer: " ++ show peer - _ <- hGet h (length (unpack hs)) - putStrLn $ "<-- handshake from peer: " ++ show peer return h -instance Binary PeerMsg where - put msg = case msg of - KeepAliveMsg -> putWord32be 0 - ChokeMsg -> do putWord32be 1 - putWord8 0 - UnChokeMsg -> do putWord32be 1 - putWord8 1 - InterestedMsg -> do putWord32be 1 - putWord8 2 - NotInterestedMsg -> do putWord32be 1 - putWord8 3 - HaveMsg i -> do putWord32be 5 - putWord8 4 - putWord32be (fromIntegral i) - BitFieldMsg bf -> do putWord32be $ fromIntegral (1 + bfListLen) - putWord8 5 - mapM_ putWord8 bfList - where bfList = unpack bf - bfListLen = length bfList - RequestMsg i o l -> do putWord32be 13 - putWord8 6 - putWord32be (fromIntegral i) - putWord32be (fromIntegral o) - putWord32be (fromIntegral l) - PieceMsg i o b -> do putWord32be $ fromIntegral (9 + blocklen) - putWord8 7 - putWord32be (fromIntegral i) - putWord32be (fromIntegral o) - mapM_ putWord8 blockList - where blockList = unpack b - blocklen = length blockList - CancelMsg i o l -> do putWord32be 13 - putWord8 8 - putWord32be (fromIntegral i) - putWord32be (fromIntegral o) - putWord32be (fromIntegral l) - PortMsg p -> do putWord32be 3 - putWord8 9 - putWord16be (fromIntegral p) - get = do - l <- getWord32be - msgid <- getWord8 - case msgid of - 0 -> return ChokeMsg - 1 -> return UnChokeMsg - 2 -> return InterestedMsg - 3 -> return NotInterestedMsg - 4 -> liftM (HaveMsg . fromIntegral) getWord32be - 5 -> liftM (BitFieldMsg . pack) (replicateM (fromIntegral l - 1) getWord8) - 6 -> liftA3 RequestMsg getInteger getInteger getInteger - where getInteger = fromIntegral <$> getWord32be - 7 -> liftA3 PieceMsg getInteger getInteger (pack <$> replicateM (fromIntegral l - 9) getWord8) - where getInteger = fromIntegral <$> getWord32be - 8 -> liftA3 CancelMsg getInteger getInteger getInteger - where getInteger = fromIntegral <$> getWord32be - 9 -> liftM (PortMsg . fromIntegral) getWord16be - _ -> error ("unknown message ID: " ++ show msgid) - -getMsg :: Handle -> IO PeerMsg -getMsg h = do - lBS <- hGet h 4 - let l = bsToInt lBS - if l == 0 - then return KeepAliveMsg +doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO () +doHandshake True h p infohash peerid = do + let hs = genHandshakeMsg infohash peerid + hPut h hs + putStrLn $ "--> handhake to peer: " ++ show p + _ <- hGet h (length (unpack hs)) + putStrLn $ "<-- handshake from peer: " ++ show p + return () +doHandshake False h p infohash peerid = do + let hs = genHandshakeMsg infohash peerid + putStrLn "waiting for a handshake" + hsMsg <- hGet h (length (unpack hs)) + putStrLn $ "<-- handshake from peer: " ++ show p + let rxInfoHash = take 20 $ drop 28 hsMsg + if rxInfoHash /= infohash + then do + putStrLn "infoHashes does not match" + hClose h + return () else do - msg <- hGet h l - return $ decode $ fromStrict $ concat [lBS, msg] - -sendMsg :: Handle -> PeerMsg -> IO () -sendMsg h msg = - let bsMsg = toStrict $ encode msg - in - hPut h bsMsg - -bsToInt :: ByteString -> Int -bsToInt x = fromIntegral (runGet getWord32be (fromChunks (return x))) + _ <- hPut h hs + putStrLn $ "--> handhake to peer: " ++ show p + return () bitfieldToList :: [Word8] -> [Integer] bitfieldToList bs = go bs 0 @@ -185,117 +89,114 @@ bitfieldToList bs = go bs 0 in setBits ++ go bs' (pos + 1) -createDummyFile :: FilePath -> Int -> IO () -createDummyFile path size = - writeFile path (BC.replicate size '\0') - --- write into a file at a specific offet -writeFileAtOffset :: FilePath -> Integer -> ByteString -> IO () -writeFileAtOffset path offset block = - withFile path ReadWriteMode (\h -> do - _ <- hSeek h AbsoluteSeek offset - hPut h block) - --- recvMsg :: Peer -> Handle -> Msg -msgLoop :: PeerState -> PieceMap -> IO () -msgLoop pState pieceStatus | not (meInterested pState) && heChoking pState = do - -- if me NOT Interested and she is Choking, tell her that - -- I am interested. - let h = handle pState - sendMsg h InterestedMsg - putStrLn $ "--> InterestedMsg to peer: " ++ show (peer pState) - msgLoop (pState { meInterested = True }) pieceStatus - | meInterested pState && not (heChoking pState) = - -- if me Interested and she not Choking, send her a request - -- for a piece. - case pickPiece pieceStatus of - Nothing -> putStrLn "Nothing to download" - Just workPiece -> do - let pLen = len (pieceStatus ! workPiece) - putStrLn $ "piece length = " ++ show pLen - pBS <- downloadPiece (handle pState) workPiece pLen - if not $ verifyHash pBS (hash (pieceStatus ! workPiece)) - then - putStrLn $ "Hash mismatch: " ++ show (hash (pieceStatus ! workPiece)) ++ " vs " ++ show (take 20 (SHA1.hash pBS)) - else do - let fileOffset = if workPiece == 0 then 0 else (len (pieceStatus ! (workPiece - 1))) - writeFileAtOffset "/tmp/download.file" fileOffset pBS - msgLoop pState (adjust (\pieceData -> pieceData { state = Have }) workPiece pieceStatus) - | otherwise = do - msg <- getMsg (handle pState) - putStrLn $ "<-- " ++ show msg ++ "from peer: " ++ show (peer pState) - case msg of - KeepAliveMsg -> do - sendMsg (handle pState) KeepAliveMsg - putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show (peer pState) - msgLoop pState pieceStatus - BitFieldMsg bss -> do - let pieceList = bitfieldToList (unpack bss) - pieceStatus' = updatePieceAvailability pieceStatus (peer pState) pieceList - putStrLn $ show (length pieceList) ++ " Pieces" - -- for each pieceIndex in pieceList, make an entry in the pieceStatus - -- map with pieceIndex as the key and modify the value to add the peer. - -- download each of the piece in order - msgLoop pState pieceStatus' - UnChokeMsg -> - msgLoop (pState { heChoking = False }) pieceStatus - _ -> - msgLoop pState pieceStatus - --- simple algorithm to pick piece. --- pick the first piece from 0 that is not downloaded yet. -pickPiece :: PieceMap -> Maybe Integer -pickPiece m = - let pieceList = toList m - allPending = filter (\(_, v) -> state v == Pending) pieceList - in - case allPending of - [] -> Nothing - ((i, _):_) -> Just i +-- helper functions to manipulate PeerState +toPeerState :: Handle + -> Peer + -> Bool -- ^ meChoking + -> Bool -- ^ meInterested + -> Bool -- ^ heChoking + -> Bool -- ^ heInterested + -> PState +toPeerState h p meCh meIn heCh heIn = + PState { handle = h + , peer = p + , heChoking = heCh + , heInterested = heIn + , meChoking = meCh + , meInterested = meIn } + +handlePeerMsgs :: Peer -> String -> Metainfo -> PieceMap -> Bool -> FS.MsgChannel -> IO () +handlePeerMsgs p peerId m pieceMap isClient c = do + h <- connectToPeer p + doHandshake isClient h p (infoHash m) peerId + let pstate = toPeerState h p False False True True + _ <- runStateT (msgLoop pieceMap c) pstate + return () + +msgLoop :: PieceMap -> FS.MsgChannel -> StateT PState IO () +msgLoop pieceStatus msgchannel = do + h <- gets handle + st <- get + case st of + PState { meInterested = False, heChoking = True } -> do + liftIO $ sendMsg h InterestedMsg + gets peer >>= (\p -> liftIO $ putStrLn $ "--> InterestedMsg to peer: " ++ show p) + modify (\st' -> st' { meInterested = True }) + msgLoop pieceStatus msgchannel + PState { meInterested = True, heChoking = False } -> + case pickPiece pieceStatus of + Nothing -> liftIO $ putStrLn "Nothing to download" + Just workPiece -> do + let pLen = len (pieceStatus ! workPiece) + liftIO $ putStrLn $ "piece length = " ++ show pLen + pBS <- liftIO $ downloadPiece h workPiece pLen + if not $ verifyHash pBS (hash (pieceStatus ! workPiece)) + then + liftIO $ putStrLn "Hash mismatch" + else do + liftIO $ putStrLn $ "Write piece: " ++ show workPiece + liftIO $ FS.writePieceToDisk msgchannel workPiece pBS + msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel + _ -> do + msg <- liftIO $ getMsg h + gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ " from peer: " ++ show p) + case msg of + KeepAliveMsg -> do + liftIO $ sendMsg h KeepAliveMsg + gets peer >>= (\p -> liftIO $ putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show p) + msgLoop pieceStatus msgchannel + BitFieldMsg bss -> do + p <- gets peer + let pieceList = bitfieldToList (unpack bss) + pieceStatus' = updatePieceAvailability pieceStatus p pieceList + liftIO $ putStrLn $ show (length pieceList) ++ " Pieces" + -- for each pieceIndex in pieceList, make an entry in the pieceStatus + -- map with pieceIndex as the key and modify the value to add the peer. + -- download each of the piece in order + msgLoop pieceStatus' msgchannel + UnChokeMsg -> do + modify (\st' -> st' {heChoking = False }) + msgLoop pieceStatus msgchannel + ChokeMsg -> do + modify (\st' -> st' {heChoking = True }) + msgLoop pieceStatus msgchannel + InterestedMsg -> do + modify (\st' -> st' {heInterested = True}) + msgLoop pieceStatus msgchannel + NotInterestedMsg -> do + modify (\st' -> st' {heInterested = False}) + msgLoop pieceStatus msgchannel + CancelMsg _ _ _ -> -- check if valid index, begin, length + msgLoop pieceStatus msgchannel + PortMsg _ -> + msgLoop pieceStatus msgchannel + HaveMsg idx -> do + p <- gets peer + let pieceStatus' = updatePieceAvailability pieceStatus p [idx] + msgLoop pieceStatus' msgchannel + _ -> do + liftIO $ putStrLn $ ".. not doing anything with the msg" + msgLoop pieceStatus msgchannel + -- No need to handle PieceMsg and RequestMsg here. -updatePieceAvailability :: PieceMap -> Peer -> [Integer] -> PieceMap -updatePieceAvailability pieceStatus p pieceList = - mapWithKey (\k pd -> if k `elem` pieceList - then (pd { peers = p : peers pd }) - else pd) pieceStatus -handlePeerMsgs :: Peer -> Metainfo -> String -> IO () -handlePeerMsgs p m peerId = do - h <- handShake p (infoHash m) peerId - let state = PeerState { handle = h - , peer = p - , heInterested = False - , heChoking = True - , meInterested = False - , meChoking = True } - pieceHash = pieces (info m) - numPieces = (toInteger . (`quot` 20) . BC.length) pieceHash - pLen = pieceLength (info m) - fileLen = lengthInBytes (info m) - pieceStatus = mkPieceMap numPieces pieceHash (splitNum fileLen pLen) - createDummyFile "/tmp/download.file" (fromIntegral fileLen) - msgLoop state pieceStatus - downloadPiece :: Handle -> Integer -> Integer -> IO ByteString downloadPiece h index pieceLength = do let chunks = splitNum pieceLength 16384 - liftM concat $ forM (zip [0..] chunks) (\(i, pLen) -> do - sendMsg h (RequestMsg index (i*pLen) pLen) - putStrLn $ "--> " ++ "RequestMsg for Piece " - ++ show index ++ ", part: " ++ show i ++ " of length: " - ++ show pLen - msg <- getMsg h - case msg of - PieceMsg index begin block -> do - putStrLn $ " <-- PieceMsg for Piece: " - ++ show index - ++ ", offset: " - ++ show begin - return block - _ -> do - putStrLn "ignoring irrelevant msg" - return empty) + concat `liftM` forM (zip [0..] chunks) (\(i, pLen) -> do + sendMsg h (RequestMsg index (i*pLen) pLen) + putStrLn $ "--> " ++ "RequestMsg for Piece " + ++ show index ++ ", part: " ++ show i ++ " of length: " + ++ show pLen + msg <- getMsg h + case msg of + PieceMsg index begin block -> do + putStrLn $ " <-- PieceMsg for Piece: " + ++ show index + ++ ", offset: " + ++ show begin + return block + _ -> do + putStrLn $ "ignoring irrelevant msg: " ++ show msg + return empty) -verifyHash :: ByteString -> ByteString -> Bool -verifyHash bs pieceHash = - take 20 (SHA1.hash bs) == pieceHash