2 - Copyright (C) 2015-2016 Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
4 - This file is part of FuncTorrent.
6 - FuncTorrent is free software; you can redistribute it and/or modify
7 - it under the terms of the GNU General Public License as published by
8 - the Free Software Foundation; either version 3 of the License, or
9 - (at your option) any later version.
11 - FuncTorrent is distributed in the hope that it will be useful,
12 - but WITHOUT ANY WARRANTY; without even the implied warranty of
13 - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 - GNU General Public License for more details.
16 - You should have received a copy of the GNU General Public License
17 - along with FuncTorrent; if not, see <http://www.gnu.org/licenses/>
20 {-# LANGUAGE OverloadedStrings #-}
22 module FuncTorrent.Peer
26 import Prelude hiding (lookup, concat, replicate, splitAt, take, drop)
28 import Control.Concurrent.MVar (MVar, newEmptyMVar, readMVar, putMVar, takeMVar)
29 import Control.Monad.State
30 import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty, singleton)
32 import Data.Word (Word8)
33 import Data.Map (Map, (!), adjust, fromList, insert)
34 import Network (connectTo, PortID(..))
35 import System.IO (Handle, BufferMode(..), hSetBuffering, hClose)
37 import FuncTorrent.Bencode(BVal(..), encode, decode, decodeWithLeftOvers)
38 import FuncTorrent.Metainfo (Metainfo(..))
39 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
40 import FuncTorrent.Utils (splitNum, verifyHash)
41 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
42 import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
44 data PState = PState { handle :: Handle
47 , meInterested :: Bool
49 , heInterested :: Bool}
51 data InfoPieceMap = InfoPieceMap { infoLength :: Integer
52 , infoMap :: Map Integer (Maybe ByteString) }
54 newtype InfoState = InfoState (MVar InfoPieceMap)
56 havePiece :: PieceMap -> Integer -> Bool
58 dlstate (pm ! index) == Have
60 connectToPeer :: Peer -> IO Handle
61 connectToPeer (Peer ip port) = do
62 h <- connectTo ip (PortNumber (fromIntegral port))
63 hSetBuffering h LineBuffering
67 doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO ()
68 doHandshake True h p infohash peerid = do
69 let hs = genHandshakeMsg infohash peerid
71 putStrLn $ "--> handhake to peer: " ++ show p
72 hsMsg <- hGet h (length (unpack hs))
73 putStrLn $ "<-- handshake from peer: " ++ show p
74 infoPieceMap <- newEmptyMVar
75 metadataMsgLoop h $ InfoState infoPieceMap
77 -- if doesPeerSupportExtendedMsg hsMsg
79 -- return doExtendedHandshake h
82 doHandshake False h p infohash peerid = do
83 let hs = genHandshakeMsg infohash peerid
84 putStrLn "waiting for a handshake"
85 -- read 28 bytes. '19' ++ 'BitTorrent Protocol' ++ 8 reserved bytes
87 putStrLn $ "<-- handshake from peer: " ++ show p
88 let rxInfoHash = take 20 $ drop 28 hsMsg
89 if rxInfoHash /= infohash
91 putStrLn "infoHashes does not match"
96 putStrLn $ "--> handhake to peer: " ++ show p
97 -- if doesPeerSupportExtendedMsg hsMsg
99 -- doExtendedHandshake h
104 bitfieldToList :: [Word8] -> [Integer]
105 bitfieldToList bs = go bs 0
108 let setBits = [pos*8 + toInteger i | i <- [0..8], testBit b i]
110 setBits ++ go bs' (pos + 1)
112 -- helper functions to manipulate PeerState
113 toPeerState :: Handle
115 -> Bool -- ^ meChoking
116 -> Bool -- ^ meInterested
117 -> Bool -- ^ heChoking
118 -> Bool -- ^ heInterested
120 toPeerState h p meCh meIn heCh heIn =
124 , heInterested = heIn
126 , meInterested = meIn }
128 handlePeerMsgs :: Peer -> String -> Metainfo -> PieceMap -> Bool -> FS.MsgChannel -> IO ()
129 handlePeerMsgs p peerId m pieceMap isClient c = do
131 doHandshake isClient h p (infoHash m) peerId
132 let pstate = toPeerState h p False False True True
133 _ <- runStateT (msgLoop pieceMap c) pstate
136 msgLoop :: PieceMap -> FS.MsgChannel -> StateT PState IO ()
137 msgLoop pieceStatus msgchannel = do
141 PState { meInterested = False, heChoking = True } -> do
142 liftIO $ sendMsg h InterestedMsg
143 gets peer >>= (\p -> liftIO $ putStrLn $ "--> InterestedMsg to peer: " ++ show p)
144 modify (\st' -> st' { meInterested = True })
145 msgLoop pieceStatus msgchannel
146 PState { meInterested = True, heChoking = False } ->
147 case pickPiece pieceStatus of
148 Nothing -> liftIO $ putStrLn "Nothing to download"
150 let pLen = len (pieceStatus ! workPiece)
151 liftIO $ putStrLn $ "piece length = " ++ show pLen
152 pBS <- liftIO $ downloadPiece h workPiece pLen
153 if not $ verifyHash pBS (hash (pieceStatus ! workPiece))
155 liftIO $ putStrLn "Hash mismatch"
157 liftIO $ putStrLn $ "Write piece: " ++ show workPiece
158 liftIO $ FS.writePieceToDisk msgchannel workPiece pBS
159 msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel
161 msg <- liftIO $ getMsg h
162 gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ " from peer: " ++ show p)
165 liftIO $ sendMsg h KeepAliveMsg
166 gets peer >>= (\p -> liftIO $ putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show p)
167 msgLoop pieceStatus msgchannel
168 BitFieldMsg bss -> do
170 let pieceList = bitfieldToList (unpack bss)
171 pieceStatus' = updatePieceAvailability pieceStatus p pieceList
172 liftIO $ putStrLn $ show (length pieceList) ++ " Pieces"
173 -- for each pieceIndex in pieceList, make an entry in the pieceStatus
174 -- map with pieceIndex as the key and modify the value to add the peer.
175 -- download each of the piece in order
176 msgLoop pieceStatus' msgchannel
178 modify (\st' -> st' {heChoking = False })
179 msgLoop pieceStatus msgchannel
181 modify (\st' -> st' {heChoking = True })
182 msgLoop pieceStatus msgchannel
184 modify (\st' -> st' {heInterested = True})
185 msgLoop pieceStatus msgchannel
186 NotInterestedMsg -> do
187 modify (\st' -> st' {heInterested = False})
188 msgLoop pieceStatus msgchannel
189 CancelMsg {} -> -- check if valid index, begin, length
190 msgLoop pieceStatus msgchannel
192 msgLoop pieceStatus msgchannel
195 let pieceStatus' = updatePieceAvailability pieceStatus p [idx]
196 msgLoop pieceStatus' msgchannel
198 liftIO $ putStrLn ".. not doing anything with the msg"
199 msgLoop pieceStatus msgchannel
200 -- No need to handle PieceMsg and RequestMsg here.
203 downloadPiece :: Handle -> Integer -> Integer -> IO ByteString
204 downloadPiece h index pieceLength = do
205 let chunks = splitNum pieceLength 16384
206 concat `liftM` forM (zip [0..] chunks) (\(i, pLen) -> do
207 sendMsg h (RequestMsg index (i*pLen) pLen)
208 putStrLn $ "--> " ++ "RequestMsg for Piece "
209 ++ show index ++ ", part: " ++ show i ++ " of length: "
213 PieceMsg index begin block -> do
214 putStrLn $ " <-- PieceMsg for Piece: "
220 putStrLn $ "ignoring irrelevant msg: " ++ show msg
225 -- Extension messages support (BEP-0010) --
228 In the regular peer handshake, adventise support for extension protocol. Protocol
229 extensions are done via the reserved bytes (8 of them) in the handshake message
230 as detailed in BEP-0003. For this particular "Extension Protocol" extension, we use
231 20th bit (counted from the right, from 0) is set to 1.
233 Once support for the extension protocol is established by the peer, the Peer is supposed
234 to support one message with the ID 20. This is sent like a regular message with 4-byte
235 length prefix and the msg id (20) in this case.
237 First byte of the payload of this message is either 0, which means it is a handshake
240 The rest of the payload is a dictionary with various keys. All of them are optional. The
241 one of interest at the moment for me is the one with key 'm' whose value is another
242 dictionary of all supported extensions.
244 Here is where it gets interesting for us (to support magneturi. When the torrent client
245 has only got a magneturi to look at, it has only got the list of trackers with it (we
246 are not looking at the DHT case for the time being). So, it somehow needs to get the info
247 dictionary. It gets this by talking to another peer in the network. To do that, the client
248 needs to talk tracker protocol, get the list of peers and talk to peers using the above
249 extension protocol to get the infodict as payload. Let us see how we can do that now.
251 If a peer already has the full infodict, then, the handshake message sent by that peer
252 is something like this:
254 {'m': {'ut_metadata', 3}, 'metadata_size': 31235}
256 Note that the 'metadata_size' is not part of the value of the key 'm'.
257 If we are a new client and are requesting the handshake to a peer, then we don't have
258 the infodict yet, in which case, we only send the first part:
260 {'m': {'ut_metadata', 3}}
262 This is bencoded and sent across the wire. The value "3" (integer) against the key
263 'ut_metadata" is an ordered integer within a client that identifies the extention.
264 No two extension supported by the same client shares the same value. If the value is
265 '0', then the extension is unsupported.
267 Here we use the BEP-0009, the metadata extension protocol. The metadata in this case
268 is the infodict. The infodict itself is divided into 16KB sized pieces.
270 Here is a possible interaction between two peers:
272 1. Peer Pn comes up, gets the ip/ports of other peers, P0, P1.... Pn does not have the
273 size of the infodict. Pn has advertised itself as supporting the extension protocol.
274 It sends the handshake msg to other peers with this bit on in the reserved bytes.
275 2. Let us say, P1 replied with a handshake. We check if it also supports the extension
277 3. Now we get into the extension message passing so that we have the info dict.
278 To do that, we send the extension handshake (ut_metadata) m dict without the
279 metadata_size. We get back the extension handshake with metadata_size. We take
281 4. We calculate the number of 16384 chunks in the total size of the metadata. That
282 gives us the number of pieces the metadata has.
283 5. We send a "request" extension msg:
284 {'msg_type': 0, 'piece': 0}
285 6. We recieve the "data" message.
286 {'msg_type': 1, 'piece': 0, 'total_size': 3425} in bencoded format, followed by
287 total_size bytes. total_size is 16KiB except perhaps for the last piece.
288 7. If the peer does not have the requested piece, it sends the "reject" message.
289 {'msg_type': 2, 'piece': 0}
290 8. Repeat 5, 6/7 for every piece.
292 At this point, we have the infodict.
297 data InfoPieceMap = { infoLength :: Integer
298 , infoMap :: Map Integer (Maybe ByteString)
301 newtype InfoState = InfoState (MVar InfoPieceMap)
306 metadataMsgLoop :: Handle -> InfoState -> IO ()
307 metadataMsgLoop h (InfoState st) = do
308 infoState <- readMVar st
309 let metadataLen = infoLength infoState
310 -- send the handshake msg
311 metadata = encode (metadataMsg metadataLen)
312 sendMsg h (ExtendedMsg 0 metadata)
313 -- recv return msg from the peer. Will have 'metadata_size'
316 ExtendedMsg 0 rBs -> do
318 let (Right (Bdict msgMap)) = decode rBs
319 (Bdict mVal) = msgMap ! "m" -- which is another dict
320 (Bint metadata_msgID) = mVal ! "ut_metadata"
321 (Bint metadata_size) = msgMap ! "metadata_size"
322 -- divide metadata_size into 16384 sized pieces, find number of pieces
323 (q, r) = metadata_size `divMod` 16384
324 -- pNumLengthPairs = zip [0..q-1] (take q (repeat 16384)) ++ (q, r)
325 -- TODO: corner case where infodict size is a multiple of 16384
326 -- and start sending request msg for each.
328 then -- We don't have any piece. Send request msg for all pieces.
330 sendMsg h (ExtendedMsg metadata_msgID (encode (requestMsg n)))
331 dataOrRejectMsg <- getMsg h
332 case dataOrRejectMsg of
333 ExtendedMsg 3 payload -> do
334 -- bencoded dict followed by XXXXXX
335 infoState <- takeMVar st
336 let (Right (Bdict bval, pieceData)) = decodeWithLeftOvers payload
337 (Bint pieceIndex) = bval ! "piece"
338 payloadLen = length (unpack pieceData)
339 infoMapVal = infoMap infoState
340 putMVar st infoState {
341 infoMap = insert pieceIndex (Just payload) infoMapVal }
345 return () -- TODO: reject for now
347 metadataMsg 0 = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))]))])
348 metadataMsg l = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))])),
349 ("metadata_size", (Bint l))])
350 requestMsg i = Bdict (fromList [("msg_type", (Bint 0)), ("piece", (Bint i))])
351 rejectmsg i = Bdict (fromList [("msg_type", (Bint 2)), ("piece", (Bint i))])
353 doesPeerSupportExtendedMsg :: ByteString -> Bool
354 doesPeerSupportExtendedMsg bs = take 1 (drop 5 bs) == singleton 0x10