X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2FFuncTorrent%2FPeer.hs;h=1c6bf7f77472c00d94c2197e3dd3fa3c0d002b1d;hb=10c9204caa36be49d5ce9694f92bf13aa09ba2e7;hp=99906b074293bd260ec748cb5b027fdbf15c2aa9;hpb=144c253adea364bdd52027f484020eeafafa20f6;p=functorrent.git diff --git a/src/FuncTorrent/Peer.hs b/src/FuncTorrent/Peer.hs index 99906b0..1c6bf7f 100644 --- a/src/FuncTorrent/Peer.hs +++ b/src/FuncTorrent/Peer.hs @@ -1,25 +1,45 @@ +{- + - Copyright (C) 2015-2016 Ramakrishnan Muthukrishnan + - + - This file is part of FuncTorrent. + - + - FuncTorrent is free software; you can redistribute it and/or modify + - it under the terms of the GNU General Public License as published by + - the Free Software Foundation; either version 3 of the License, or + - (at your option) any later version. + - + - FuncTorrent is distributed in the hope that it will be useful, + - but WITHOUT ANY WARRANTY; without even the implied warranty of + - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + - GNU General Public License for more details. + - + - You should have received a copy of the GNU General Public License + - along with FuncTorrent; if not, see + -} + {-# LANGUAGE OverloadedStrings #-} + module FuncTorrent.Peer - (Peer(..), - handlePeerMsgs + (handlePeerMsgs ) where -import Prelude hiding (lookup, concat, replicate, splitAt, take) +import Prelude hiding (lookup, concat, replicate, splitAt, take, drop) -import System.IO (Handle, BufferMode(..), hSetBuffering) -import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, empty) -import qualified Data.ByteString.Char8 as BC (length) -import Network (connectTo, PortID(..)) +import Control.Concurrent.MVar (MVar, newEmptyMVar, readMVar, putMVar, takeMVar) import Control.Monad.State +import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty, singleton) import Data.Bits import Data.Word (Word8) -import Data.Map (Map, fromList, toList, (!), mapWithKey, adjust) -import qualified Crypto.Hash.SHA1 as SHA1 (hash) +import Data.Map (Map, (!), adjust, fromList, insert) +import Network (connectTo, PortID(..)) +import System.IO (Handle, BufferMode(..), hSetBuffering, hClose) -import FuncTorrent.Metainfo (Info(..), Metainfo(..)) -import FuncTorrent.Utils (splitN, splitNum) -import FuncTorrent.Fileops (createDummyFile, writeFileAtOffset) +import FuncTorrent.Bencode(BVal(..), encode, decode, decodeWithLeftOvers) +import FuncTorrent.Metainfo (Metainfo(..)) import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg) +import FuncTorrent.Utils (splitNum, verifyHash) +import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability) +import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk) data PState = PState { handle :: Handle , peer :: Peer @@ -28,52 +48,58 @@ data PState = PState { handle :: Handle , heChoking :: Bool , heInterested :: Bool} -type PeerState = State PState - -data PieceDlState = Pending - | InProgress - | Have - deriving (Show, Eq) +data InfoPieceMap = InfoPieceMap { infoLength :: Integer + , infoMap :: Map Integer (Maybe ByteString) } --- todo - map with index to a new data structure (peers who have that piece amd state) -data PieceData = PieceData { peers :: [Peer] -- ^ list of peers who have this piece - , dlstate :: PieceDlState -- ^ state of the piece from download perspective. - , hash :: ByteString -- ^ piece hash - , len :: Integer } -- ^ piece length - --- which piece is with which peers -type PieceMap = Map Integer PieceData - - --- Make the initial Piece map, with the assumption that no peer has the --- piece and that every piece is pending download. -mkPieceMap :: Integer -> ByteString -> [Integer] -> PieceMap -mkPieceMap numPieces pieceHash pLengths = fromList kvs - where kvs = [(i, PieceData { peers = [] - , dlstate = Pending - , hash = h - , len = pLen }) - | (i, h, pLen) <- zip3 [0..numPieces] hashes pLengths] - hashes = splitN 20 pieceHash +newtype InfoState = InfoState (MVar InfoPieceMap) havePiece :: PieceMap -> Integer -> Bool havePiece pm index = dlstate (pm ! index) == Have connectToPeer :: Peer -> IO Handle -connectToPeer (Peer _ ip port) = do +connectToPeer (Peer ip port) = do h <- connectTo ip (PortNumber (fromIntegral port)) hSetBuffering h LineBuffering return h -doHandshake :: Handle -> Peer -> ByteString -> String -> IO () -doHandshake h peer infoHash peerid = do - let hs = genHandshakeMsg infoHash peerid + +doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO () +doHandshake True h p infohash peerid = do + let hs = genHandshakeMsg infohash peerid hPut h hs - putStrLn $ "--> handhake to peer: " ++ show peer - _ <- hGet h (length (unpack hs)) - putStrLn $ "<-- handshake from peer: " ++ show peer + putStrLn $ "--> handhake to peer: " ++ show p + hsMsg <- hGet h (length (unpack hs)) + putStrLn $ "<-- handshake from peer: " ++ show p + infoPieceMap <- newEmptyMVar + metadataMsgLoop h $ InfoState infoPieceMap return () + -- if doesPeerSupportExtendedMsg hsMsg + -- then + -- return doExtendedHandshake h + -- else + -- return Nothing +doHandshake False h p infohash peerid = do + let hs = genHandshakeMsg infohash peerid + putStrLn "waiting for a handshake" + -- read 28 bytes. '19' ++ 'BitTorrent Protocol' ++ 8 reserved bytes + hsMsg <- hGet h 28 + putStrLn $ "<-- handshake from peer: " ++ show p + let rxInfoHash = take 20 $ drop 28 hsMsg + if rxInfoHash /= infohash + then do + putStrLn "infoHashes does not match" + hClose h + return () + else do + _ <- hPut h hs + putStrLn $ "--> handhake to peer: " ++ show p + -- if doesPeerSupportExtendedMsg hsMsg + -- then do + -- doExtendedHandshake h + -- else + -- return Nothing + bitfieldToList :: [Word8] -> [Integer] bitfieldToList bs = go bs 0 @@ -99,116 +125,230 @@ toPeerState h p meCh meIn heCh heIn = , meChoking = meCh , meInterested = meIn } --- -- recvMsg :: Peer -> Handle -> Msg --- msgLoop :: PeerState -> PieceMap -> FilePath -> IO () --- msgLoop pState@(PeerState { meInterested = False, heChoking = True }) pieceStatus file = do --- -- if me NOT Interested and she is Choking, tell her that --- -- I am interested. --- let h = handle pState --- sendMsg h InterestedMsg --- putStrLn $ "--> InterestedMsg to peer: " ++ show (peer pState) --- msgLoop (pState { meInterested = True }) pieceStatus file --- msgLoop pState@(PeerState { meInterested = True, heChoking = False }) pieceStatus file = --- -- if me Interested and she not Choking, send her a request --- -- for a piece. --- case pickPiece pieceStatus of --- Nothing -> putStrLn "Nothing to download" --- Just workPiece -> do --- let pLen = len (pieceStatus ! workPiece) --- putStrLn $ "piece length = " ++ show pLen --- pBS <- downloadPiece (handle pState) workPiece pLen --- if not $ verifyHash pBS (hash (pieceStatus ! workPiece)) --- then --- putStrLn $ "Hash mismatch: " ++ show (hash (pieceStatus ! workPiece)) ++ " vs " ++ show (take 20 (SHA1.hash pBS)) --- else do --- let fileOffset = if workPiece == 0 then 0 else workPiece * len (pieceStatus ! (workPiece - 1)) --- putStrLn $ "Write into file at offset: " ++ show fileOffset --- writeFileAtOffset file fileOffset pBS --- msgLoop pState (adjust (\pieceData -> pieceData { state = Have }) workPiece pieceStatus) file --- msgLoop pState pieceStatus file = do --- msg <- getMsg (handle pState) --- putStrLn $ "<-- " ++ show msg ++ "from peer: " ++ show (peer pState) --- case msg of --- KeepAliveMsg -> do --- sendMsg (handle pState) KeepAliveMsg --- putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show (peer pState) --- msgLoop pState pieceStatus file --- BitFieldMsg bss -> do --- let pieceList = bitfieldToList (unpack bss) --- pieceStatus' = updatePieceAvailability pieceStatus (peer pState) pieceList --- putStrLn $ show (length pieceList) ++ " Pieces" --- -- for each pieceIndex in pieceList, make an entry in the pieceStatus --- -- map with pieceIndex as the key and modify the value to add the peer. --- -- download each of the piece in order --- msgLoop pState pieceStatus' file --- UnChokeMsg -> --- msgLoop (pState { heChoking = False }) pieceStatus file --- _ -> --- msgLoop pState pieceStatus file - --- simple algorithm to pick piece. --- pick the first piece from 0 that is not downloaded yet. -pickPiece :: PieceMap -> Maybe Integer -pickPiece m = - let pieceList = toList m - allPending = filter (\(_, v) -> dlstate v == Pending) pieceList - in - case allPending of - [] -> Nothing - ((i, _):_) -> Just i - -updatePieceAvailability :: PieceMap -> Peer -> [Integer] -> PieceMap -updatePieceAvailability pieceStatus p pieceList = - mapWithKey (\k pd -> if k `elem` pieceList - then (pd { peers = p : peers pd }) - else pd) pieceStatus - -handlePeerMsgs :: Peer -> Metainfo -> String -> IO () -handlePeerMsgs p m peerId = do +handlePeerMsgs :: Peer -> String -> Metainfo -> PieceMap -> Bool -> FS.MsgChannel -> IO () +handlePeerMsgs p peerId m pieceMap isClient c = do h <- connectToPeer p - doHandshake h p (infoHash m) peerId - let pstate = toPeerState h p False True False True - pieceHash = pieces (info m) - numPieces = (toInteger . (`quot` 20) . BC.length) pieceHash - pLen = pieceLength (info m) - fileLen = lengthInBytes (info m) - fileName = name (info m) - pieceStatus = mkPieceMap numPieces pieceHash (splitNum fileLen pLen) - createDummyFile fileName (fromIntegral fileLen) - (r, _) <- runStateT (msgLoop pieceStatus fileName) pstate + doHandshake isClient h p (infoHash m) peerId + let pstate = toPeerState h p False False True True + _ <- runStateT (msgLoop pieceMap c) pstate return () -msgLoop :: PieceMap -> FilePath -> StateT PState IO () -msgLoop pieceStatus file = do +msgLoop :: PieceMap -> FS.MsgChannel -> StateT PState IO () +msgLoop pieceStatus msgchannel = do h <- gets handle - msg <- liftIO $ getMsg h - gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ "from peer: " ++ show p) - case msg of - KeepAliveMsg -> do - liftIO $ sendMsg h KeepAliveMsg - gets peer >>= (\p -> liftIO $ putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show p) - msgLoop pieceStatus file + st <- get + case st of + PState { meInterested = False, heChoking = True } -> do + liftIO $ sendMsg h InterestedMsg + gets peer >>= (\p -> liftIO $ putStrLn $ "--> InterestedMsg to peer: " ++ show p) + modify (\st' -> st' { meInterested = True }) + msgLoop pieceStatus msgchannel + PState { meInterested = True, heChoking = False } -> + case pickPiece pieceStatus of + Nothing -> liftIO $ putStrLn "Nothing to download" + Just workPiece -> do + let pLen = len (pieceStatus ! workPiece) + liftIO $ putStrLn $ "piece length = " ++ show pLen + pBS <- liftIO $ downloadPiece h workPiece pLen + if not $ verifyHash pBS (hash (pieceStatus ! workPiece)) + then + liftIO $ putStrLn "Hash mismatch" + else do + liftIO $ putStrLn $ "Write piece: " ++ show workPiece + liftIO $ FS.writePieceToDisk msgchannel workPiece pBS + msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel + _ -> do + msg <- liftIO $ getMsg h + gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ " from peer: " ++ show p) + case msg of + KeepAliveMsg -> do + liftIO $ sendMsg h KeepAliveMsg + gets peer >>= (\p -> liftIO $ putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show p) + msgLoop pieceStatus msgchannel + BitFieldMsg bss -> do + p <- gets peer + let pieceList = bitfieldToList (unpack bss) + pieceStatus' = updatePieceAvailability pieceStatus p pieceList + liftIO $ putStrLn $ show (length pieceList) ++ " Pieces" + -- for each pieceIndex in pieceList, make an entry in the pieceStatus + -- map with pieceIndex as the key and modify the value to add the peer. + -- download each of the piece in order + msgLoop pieceStatus' msgchannel + UnChokeMsg -> do + modify (\st' -> st' {heChoking = False }) + msgLoop pieceStatus msgchannel + ChokeMsg -> do + modify (\st' -> st' {heChoking = True }) + msgLoop pieceStatus msgchannel + InterestedMsg -> do + modify (\st' -> st' {heInterested = True}) + msgLoop pieceStatus msgchannel + NotInterestedMsg -> do + modify (\st' -> st' {heInterested = False}) + msgLoop pieceStatus msgchannel + CancelMsg {} -> -- check if valid index, begin, length + msgLoop pieceStatus msgchannel + PortMsg _ -> + msgLoop pieceStatus msgchannel + HaveMsg idx -> do + p <- gets peer + let pieceStatus' = updatePieceAvailability pieceStatus p [idx] + msgLoop pieceStatus' msgchannel + _ -> do + liftIO $ putStrLn ".. not doing anything with the msg" + msgLoop pieceStatus msgchannel + -- No need to handle PieceMsg and RequestMsg here. + downloadPiece :: Handle -> Integer -> Integer -> IO ByteString downloadPiece h index pieceLength = do let chunks = splitNum pieceLength 16384 - liftM concat $ forM (zip [0..] chunks) (\(i, pLen) -> do - sendMsg h (RequestMsg index (i*pLen) pLen) - putStrLn $ "--> " ++ "RequestMsg for Piece " - ++ show index ++ ", part: " ++ show i ++ " of length: " - ++ show pLen - msg <- getMsg h - case msg of - PieceMsg index begin block -> do - putStrLn $ " <-- PieceMsg for Piece: " - ++ show index - ++ ", offset: " - ++ show begin - return block - _ -> do - putStrLn "ignoring irrelevant msg" - return empty) - -verifyHash :: ByteString -> ByteString -> Bool -verifyHash bs pieceHash = - take 20 (SHA1.hash bs) == pieceHash + concat `liftM` forM (zip [0..] chunks) (\(i, pLen) -> do + sendMsg h (RequestMsg index (i*pLen) pLen) + putStrLn $ "--> " ++ "RequestMsg for Piece " + ++ show index ++ ", part: " ++ show i ++ " of length: " + ++ show pLen + msg <- getMsg h + case msg of + PieceMsg index begin block -> do + putStrLn $ " <-- PieceMsg for Piece: " + ++ show index + ++ ", offset: " + ++ show begin + return block + _ -> do + putStrLn $ "ignoring irrelevant msg: " ++ show msg + return empty) + + +{- + -- Extension messages support (BEP-0010) -- + + + In the regular peer handshake, adventise support for extension protocol. Protocol + extensions are done via the reserved bytes (8 of them) in the handshake message + as detailed in BEP-0003. For this particular "Extension Protocol" extension, we use + 20th bit (counted from the right, from 0) is set to 1. + + Once support for the extension protocol is established by the peer, the Peer is supposed + to support one message with the ID 20. This is sent like a regular message with 4-byte + length prefix and the msg id (20) in this case. + + First byte of the payload of this message is either 0, which means it is a handshake + msg. + + The rest of the payload is a dictionary with various keys. All of them are optional. The + one of interest at the moment for me is the one with key 'm' whose value is another + dictionary of all supported extensions. + + Here is where it gets interesting for us (to support magneturi. When the torrent client + has only got a magneturi to look at, it has only got the list of trackers with it (we + are not looking at the DHT case for the time being). So, it somehow needs to get the info + dictionary. It gets this by talking to another peer in the network. To do that, the client + needs to talk tracker protocol, get the list of peers and talk to peers using the above + extension protocol to get the infodict as payload. Let us see how we can do that now. + + If a peer already has the full infodict, then, the handshake message sent by that peer + is something like this: + + {'m': {'ut_metadata', 3}, 'metadata_size': 31235} + + Note that the 'metadata_size' is not part of the value of the key 'm'. + If we are a new client and are requesting the handshake to a peer, then we don't have + the infodict yet, in which case, we only send the first part: + + {'m': {'ut_metadata', 3}} + + This is bencoded and sent across the wire. The value "3" (integer) against the key + 'ut_metadata" is an ordered integer within a client that identifies the extention. + No two extension supported by the same client shares the same value. If the value is + '0', then the extension is unsupported. + + Here we use the BEP-0009, the metadata extension protocol. The metadata in this case + is the infodict. The infodict itself is divided into 16KB sized pieces. + + Here is a possible interaction between two peers: + + 1. Peer Pn comes up, gets the ip/ports of other peers, P0, P1.... Pn does not have the + size of the infodict. Pn has advertised itself as supporting the extension protocol. + It sends the handshake msg to other peers with this bit on in the reserved bytes. + 2. Let us say, P1 replied with a handshake. We check if it also supports the extension + mechanism. + 3. Now we get into the extension message passing so that we have the info dict. + To do that, we send the extension handshake (ut_metadata) m dict without the + metadata_size. We get back the extension handshake with metadata_size. We take + note of the size. + 4. We calculate the number of 16384 chunks in the total size of the metadata. That + gives us the number of pieces the metadata has. + 5. We send a "request" extension msg: + {'msg_type': 0, 'piece': 0} + 6. We recieve the "data" message. + {'msg_type': 1, 'piece': 0, 'total_size': 3425} in bencoded format, followed by + total_size bytes. total_size is 16KiB except perhaps for the last piece. + 7. If the peer does not have the requested piece, it sends the "reject" message. + {'msg_type': 2, 'piece': 0} + 8. Repeat 5, 6/7 for every piece. + + At this point, we have the infodict. + +-} + +{- +data InfoPieceMap = { infoLength :: Integer + , infoMap :: Map Integer (Maybe ByteString) + } + +newtype InfoState = InfoState (MVar InfoPieceMap) + +-} + + +metadataMsgLoop :: Handle -> InfoState -> IO () +metadataMsgLoop h (InfoState st) = do + infoState <- readMVar st + let metadataLen = infoLength infoState + -- send the handshake msg + metadata = encode (metadataMsg metadataLen) + sendMsg h (ExtendedMsg 0 metadata) + -- recv return msg from the peer. Will have 'metadata_size' + msg <- getMsg h + case msg of + ExtendedMsg 0 rBs -> do + -- decode rBs + let (Right (Bdict msgMap)) = decode rBs + (Bdict mVal) = msgMap ! "m" -- which is another dict + (Bint metadata_msgID) = mVal ! "ut_metadata" + (Bint metadata_size) = msgMap ! "metadata_size" + -- divide metadata_size into 16384 sized pieces, find number of pieces + (q, r) = metadata_size `divMod` 16384 + -- pNumLengthPairs = zip [0..q-1] (take q (repeat 16384)) ++ (q, r) + -- TODO: corner case where infodict size is a multiple of 16384 + -- and start sending request msg for each. + if metadataLen == 0 + then -- We don't have any piece. Send request msg for all pieces. + mapM_ (\n -> do + sendMsg h (ExtendedMsg metadata_msgID (encode (requestMsg n))) + dataOrRejectMsg <- getMsg h + case dataOrRejectMsg of + ExtendedMsg 3 payload -> do + -- bencoded dict followed by XXXXXX + infoState <- takeMVar st + let (Right (Bdict bval, pieceData)) = decodeWithLeftOvers payload + (Bint pieceIndex) = bval ! "piece" + payloadLen = length (unpack pieceData) + infoMapVal = infoMap infoState + putMVar st infoState { + infoMap = insert pieceIndex (Just payload) infoMapVal } + ) + [0..q] + else + return () -- TODO: reject for now + where + metadataMsg 0 = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))]))]) + metadataMsg l = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))])), + ("metadata_size", (Bint l))]) + requestMsg i = Bdict (fromList [("msg_type", (Bint 0)), ("piece", (Bint i))]) + rejectmsg i = Bdict (fromList [("msg_type", (Bint 2)), ("piece", (Bint i))]) + +doesPeerSupportExtendedMsg :: ByteString -> Bool +doesPeerSupportExtendedMsg bs = take 1 (drop 5 bs) == singleton 0x10