2 - Copyright (C) 2015-2016 Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
4 - This file is part of FuncTorrent.
6 - FuncTorrent is free software; you can redistribute it and/or modify
7 - it under the terms of the GNU General Public License as published by
8 - the Free Software Foundation; either version 3 of the License, or
9 - (at your option) any later version.
11 - FuncTorrent is distributed in the hope that it will be useful,
12 - but WITHOUT ANY WARRANTY; without even the implied warranty of
13 - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 - GNU General Public License for more details.
16 - You should have received a copy of the GNU General Public License
17 - along with FuncTorrent; if not, see <http://www.gnu.org/licenses/>
20 {-# LANGUAGE OverloadedStrings #-}
22 module FuncTorrent.Peer
26 import Prelude hiding (lookup, concat, replicate, splitAt, take, drop)
28 import Control.Concurrent.MVar (MVar, readMVar, putMVar, takeMVar)
29 import Control.Monad.State
30 import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty, singleton)
32 import Data.Word (Word8)
33 import Data.Map (Map, (!), adjust, fromList, insert)
34 import Network (connectTo, PortID(..))
35 import System.IO (Handle, BufferMode(..), hSetBuffering, hClose)
37 import FuncTorrent.Bencode(BVal(..), encode, decode, decodeWithLeftOvers)
38 import FuncTorrent.Metainfo (Metainfo(..))
39 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
40 import FuncTorrent.Utils (splitNum, verifyHash)
41 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
42 import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
44 data PState = PState { handle :: Handle
47 , meInterested :: Bool
49 , heInterested :: Bool}
51 data InfoPieceMap = InfoPieceMap { infoLength :: Integer
52 , infoMap :: Map Integer (Maybe ByteString) }
54 newtype InfoState = InfoState (MVar InfoPieceMap)
56 havePiece :: PieceMap -> Integer -> Bool
58 dlstate (pm ! index) == Have
60 connectToPeer :: Peer -> IO Handle
61 connectToPeer (Peer ip port) = do
62 h <- connectTo ip (PortNumber (fromIntegral port))
63 hSetBuffering h LineBuffering
67 doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO ()
68 doHandshake True h p infohash peerid = do
69 let hs = genHandshakeMsg infohash peerid
71 putStrLn $ "--> handhake to peer: " ++ show p
72 hsMsg <- hGet h (length (unpack hs))
73 putStrLn $ "<-- handshake from peer: " ++ show p
75 -- if doesPeerSupportExtendedMsg hsMsg
77 -- return doExtendedHandshake h
80 doHandshake False h p infohash peerid = do
81 let hs = genHandshakeMsg infohash peerid
82 putStrLn "waiting for a handshake"
83 -- read 28 bytes. '19' ++ 'BitTorrent Protocol' ++ 8 reserved bytes
85 putStrLn $ "<-- handshake from peer: " ++ show p
86 let rxInfoHash = take 20 $ drop 28 hsMsg
87 if rxInfoHash /= infohash
89 putStrLn "infoHashes does not match"
94 putStrLn $ "--> handhake to peer: " ++ show p
95 -- if doesPeerSupportExtendedMsg hsMsg
97 -- doExtendedHandshake h
102 bitfieldToList :: [Word8] -> [Integer]
103 bitfieldToList bs = go bs 0
106 let setBits = [pos*8 + toInteger i | i <- [0..8], testBit b i]
108 setBits ++ go bs' (pos + 1)
110 -- helper functions to manipulate PeerState
111 toPeerState :: Handle
113 -> Bool -- ^ meChoking
114 -> Bool -- ^ meInterested
115 -> Bool -- ^ heChoking
116 -> Bool -- ^ heInterested
118 toPeerState h p meCh meIn heCh heIn =
122 , heInterested = heIn
124 , meInterested = meIn }
126 handlePeerMsgs :: Peer -> String -> Metainfo -> PieceMap -> Bool -> FS.MsgChannel -> IO ()
127 handlePeerMsgs p peerId m pieceMap isClient c = do
129 doHandshake isClient h p (infoHash m) peerId
130 let pstate = toPeerState h p False False True True
131 _ <- runStateT (msgLoop pieceMap c) pstate
134 msgLoop :: PieceMap -> FS.MsgChannel -> StateT PState IO ()
135 msgLoop pieceStatus msgchannel = do
139 PState { meInterested = False, heChoking = True } -> do
140 liftIO $ sendMsg h InterestedMsg
141 gets peer >>= (\p -> liftIO $ putStrLn $ "--> InterestedMsg to peer: " ++ show p)
142 modify (\st' -> st' { meInterested = True })
143 msgLoop pieceStatus msgchannel
144 PState { meInterested = True, heChoking = False } ->
145 case pickPiece pieceStatus of
146 Nothing -> liftIO $ putStrLn "Nothing to download"
148 let pLen = len (pieceStatus ! workPiece)
149 liftIO $ putStrLn $ "piece length = " ++ show pLen
150 pBS <- liftIO $ downloadPiece h workPiece pLen
151 if not $ verifyHash pBS (hash (pieceStatus ! workPiece))
153 liftIO $ putStrLn "Hash mismatch"
155 liftIO $ putStrLn $ "Write piece: " ++ show workPiece
156 liftIO $ FS.writePieceToDisk msgchannel workPiece pBS
157 msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel
159 msg <- liftIO $ getMsg h
160 gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ " from peer: " ++ show p)
163 liftIO $ sendMsg h KeepAliveMsg
164 gets peer >>= (\p -> liftIO $ putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show p)
165 msgLoop pieceStatus msgchannel
166 BitFieldMsg bss -> do
168 let pieceList = bitfieldToList (unpack bss)
169 pieceStatus' = updatePieceAvailability pieceStatus p pieceList
170 liftIO $ putStrLn $ show (length pieceList) ++ " Pieces"
171 -- for each pieceIndex in pieceList, make an entry in the pieceStatus
172 -- map with pieceIndex as the key and modify the value to add the peer.
173 -- download each of the piece in order
174 msgLoop pieceStatus' msgchannel
176 modify (\st' -> st' {heChoking = False })
177 msgLoop pieceStatus msgchannel
179 modify (\st' -> st' {heChoking = True })
180 msgLoop pieceStatus msgchannel
182 modify (\st' -> st' {heInterested = True})
183 msgLoop pieceStatus msgchannel
184 NotInterestedMsg -> do
185 modify (\st' -> st' {heInterested = False})
186 msgLoop pieceStatus msgchannel
187 CancelMsg {} -> -- check if valid index, begin, length
188 msgLoop pieceStatus msgchannel
190 msgLoop pieceStatus msgchannel
193 let pieceStatus' = updatePieceAvailability pieceStatus p [idx]
194 msgLoop pieceStatus' msgchannel
196 liftIO $ putStrLn ".. not doing anything with the msg"
197 msgLoop pieceStatus msgchannel
198 -- No need to handle PieceMsg and RequestMsg here.
201 downloadPiece :: Handle -> Integer -> Integer -> IO ByteString
202 downloadPiece h index pieceLength = do
203 let chunks = splitNum pieceLength 16384
204 concat `liftM` forM (zip [0..] chunks) (\(i, pLen) -> do
205 sendMsg h (RequestMsg index (i*pLen) pLen)
206 putStrLn $ "--> " ++ "RequestMsg for Piece "
207 ++ show index ++ ", part: " ++ show i ++ " of length: "
211 PieceMsg index begin block -> do
212 putStrLn $ " <-- PieceMsg for Piece: "
218 putStrLn $ "ignoring irrelevant msg: " ++ show msg
223 -- Extension messages support (BEP-0010) --
226 In the regular peer handshake, adventise support for extension protocol. Protocol
227 extensions are done via the reserved bytes (8 of them) in the handshake message
228 as detailed in BEP-0003. For this particular "Extension Protocol" extension, we use
229 20th bit (counted from the right, from 0) is set to 1.
231 Once support for the extension protocol is established by the peer, the Peer is supposed
232 to support one message with the ID 20. This is sent like a regular message with 4-byte
233 length prefix and the msg id (20) in this case.
235 First byte of the payload of this message is either 0, which means it is a handshake
238 The rest of the payload is a dictionary with various keys. All of them are optional. The
239 one of interest at the moment for me is the one with key 'm' whose value is another
240 dictionary of all supported extensions.
242 Here is where it gets interesting for us (to support magneturi. When the torrent client
243 has only got a magneturi to look at, it has only got the list of trackers with it (we
244 are not looking at the DHT case for the time being). So, it somehow needs to get the info
245 dictionary. It gets this by talking to another peer in the network. To do that, the client
246 needs to talk tracker protocol, get the list of peers and talk to peers using the above
247 extension protocol to get the infodict as payload. Let us see how we can do that now.
249 If a peer already has the full infodict, then, the handshake message sent by that peer
250 is something like this:
252 {'m': {'ut_metadata', 3}, 'metadata_size': 31235}
254 Note that the 'metadata_size' is not part of the value of the key 'm'.
255 If we are a new client and are requesting the handshake to a peer, then we don't have
256 the infodict yet, in which case, we only send the first part:
258 {'m': {'ut_metadata', 3}}
260 This is bencoded and sent across the wire. The value "3" (integer) against the key
261 'ut_metadata" is an ordered integer within a client that identifies the extention.
262 No two extension supported by the same client shares the same value. If the value is
263 '0', then the extension is unsupported.
265 Here we use the BEP-0009, the metadata extension protocol. The metadata in this case
266 is the infodict. The infodict itself is divided into 16KB sized pieces.
268 Here is a possible interaction between two peers:
270 1. Peer Pn comes up, gets the ip/ports of other peers, P0, P1.... Pn does not have the
271 size of the infodict. Pn has advertised itself as supporting the extension protocol.
272 It sends the handshake msg to other peers with this bit on in the reserved bytes.
273 2. Let us say, P1 replied with a handshake. We check if it also supports the extension
275 3. Now we get into the extension message passing so that we have the info dict.
276 To do that, we send the extension handshake (ut_metadata) m dict without the
277 metadata_size. We get back the extension handshake with metadata_size. We take
279 4. We calculate the number of 16384 chunks in the total size of the metadata. That
280 gives us the number of pieces the metadata has.
281 5. We send a "request" extension msg:
282 {'msg_type': 0, 'piece': 0}
283 6. We recieve the "data" message.
284 {'msg_type': 1, 'piece': 0, 'total_size': 3425} in bencoded format, followed by
285 total_size bytes. total_size is 16KiB except perhaps for the last piece.
286 7. If the peer does not have the requested piece, it sends the "reject" message.
287 {'msg_type': 2, 'piece': 0}
288 8. Repeat 5, 6/7 for every piece.
290 At this point, we have the infodict.
295 data InfoPieceMap = { infoLength :: Integer
296 , infoMap :: Map Integer (Maybe ByteString)
299 newtype InfoState = InfoState (MVar InfoPieceMap)
304 metadataMsgLoop :: Handle -> InfoState -> IO ()
305 metadataMsgLoop h (InfoState st) = do
306 infoState <- readMVar st
307 let metadataLen = infoLength infoState
308 -- send the handshake msg
309 metadata = encode (metadataMsg metadataLen)
310 sendMsg h (ExtendedMsg 0 metadata)
311 -- recv return msg from the peer. Will have 'metadata_size'
314 ExtendedMsg 0 rBs -> do
316 let (Right (Bdict msgMap)) = decode rBs
317 (Bdict mVal) = msgMap ! "m" -- which is another dict
318 (Bint metadata_msgID) = mVal ! "ut_metadata"
319 (Bint metadata_size) = msgMap ! "metadata_size"
320 -- divide metadata_size into 16384 sized pieces, find number of pieces
321 (q, r) = metadata_size `divMod` 16384
322 -- pNumLengthPairs = zip [0..q-1] (take q (repeat 16384)) ++ (q, r)
323 -- TODO: corner case where infodict size is a multiple of 16384
324 -- and start sending request msg for each.
326 then -- We don't have any piece. Send request msg for all pieces.
328 sendMsg h (ExtendedMsg metadata_msgID (encode (requestMsg n)))
329 dataOrRejectMsg <- getMsg h
330 case dataOrRejectMsg of
331 ExtendedMsg 3 payload -> do
332 -- bencoded dict followed by XXXXXX
333 infoState <- takeMVar st
334 let (Right (Bdict bval, pieceData)) = decodeWithLeftOvers payload
335 (Bint pieceIndex) = bval ! "piece"
336 payloadLen = length (unpack pieceData)
337 infoMapVal = infoMap infoState
338 putMVar st infoState {
339 infoMap = insert pieceIndex (Just payload) infoMapVal }
343 return () -- TODO: reject for now
345 metadataMsg 0 = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))]))])
346 metadataMsg l = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))])),
347 ("metadata_size", (Bint l))])
348 requestMsg i = Bdict (fromList [("msg_type", (Bint 0)), ("piece", (Bint i))])
349 rejectmsg i = Bdict (fromList [("msg_type", (Bint 2)), ("piece", (Bint i))])
351 doesPeerSupportExtendedMsg :: ByteString -> Bool
352 doesPeerSupportExtendedMsg bs = take 1 (drop 5 bs) == singleton 0x10