]> git.rkrishnan.org Git - functorrent.git/blob - src/FuncTorrent/Peer.hs
WIP: msg loop to send/recv infodict
[functorrent.git] / src / FuncTorrent / Peer.hs
1 {-
2  - Copyright (C) 2015-2016 Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
3  -
4  - This file is part of FuncTorrent.
5  -
6  - FuncTorrent is free software; you can redistribute it and/or modify
7  - it under the terms of the GNU General Public License as published by
8  - the Free Software Foundation; either version 3 of the License, or
9  - (at your option) any later version.
10  -
11  - FuncTorrent is distributed in the hope that it will be useful,
12  - but WITHOUT ANY WARRANTY; without even the implied warranty of
13  - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  - GNU General Public License for more details.
15  -
16  - You should have received a copy of the GNU General Public License
17  - along with FuncTorrent; if not,  see <http://www.gnu.org/licenses/>
18  -}
19
20 {-# LANGUAGE OverloadedStrings #-}
21
22 module FuncTorrent.Peer
23     (handlePeerMsgs
24     ) where
25
26 import Prelude hiding (lookup, concat, replicate, splitAt, take, drop)
27
28 import Control.Concurrent.MVar (MVar, newEmptyMVar, readMVar, putMVar, takeMVar)
29 import Control.Monad.State
30 import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty, singleton)
31 import Data.Bits
32 import Data.Word (Word8)
33 import Data.Map (Map, (!), adjust, fromList, insert)
34 import Network (connectTo, PortID(..))
35 import System.IO (Handle, BufferMode(..), hSetBuffering, hClose)
36
37 import FuncTorrent.Bencode(BVal(..), encode, decode, decodeWithLeftOvers)
38 import FuncTorrent.Metainfo (Metainfo(..))
39 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
40 import FuncTorrent.Utils (splitNum, verifyHash)
41 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
42 import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
43
44 data PState = PState { handle :: Handle
45                      , peer :: Peer
46                      , meChoking :: Bool
47                      , meInterested :: Bool
48                      , heChoking :: Bool
49                      , heInterested :: Bool}
50
51 data InfoPieceMap = InfoPieceMap { infoLength :: Integer
52                                  , infoMap :: Map Integer (Maybe ByteString) }
53
54 newtype InfoState = InfoState (MVar InfoPieceMap)
55
56 havePiece :: PieceMap -> Integer -> Bool
57 havePiece pm index =
58   dlstate (pm ! index) == Have
59
60 connectToPeer :: Peer -> IO Handle
61 connectToPeer (Peer ip port) = do
62   h <- connectTo ip (PortNumber (fromIntegral port))
63   hSetBuffering h LineBuffering
64   return h
65
66
67 doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO ()
68 doHandshake True h p infohash peerid = do
69   let hs = genHandshakeMsg infohash peerid
70   hPut h hs
71   putStrLn $ "--> handhake to peer: " ++ show p
72   hsMsg <- hGet h (length (unpack hs))
73   putStrLn $ "<-- handshake from peer: " ++ show p
74   infoPieceMap <- newEmptyMVar
75   metadataMsgLoop h $ InfoState infoPieceMap
76   return ()
77   -- if doesPeerSupportExtendedMsg hsMsg
78   --   then
79   --   return doExtendedHandshake h
80   --   else
81   --   return Nothing
82 doHandshake False h p infohash peerid = do
83   let hs = genHandshakeMsg infohash peerid
84   putStrLn "waiting for a handshake"
85   -- read 28 bytes. '19' ++ 'BitTorrent Protocol' ++ 8 reserved bytes
86   hsMsg <- hGet h 28
87   putStrLn $ "<-- handshake from peer: " ++ show p
88   let rxInfoHash = take 20 $ drop 28 hsMsg
89   if rxInfoHash /= infohash
90     then do
91     putStrLn "infoHashes does not match"
92     hClose h
93     return ()
94     else do
95     _ <- hPut h hs
96     putStrLn $ "--> handhake to peer: " ++ show p
97     -- if doesPeerSupportExtendedMsg hsMsg
98     --   then do
99     --   doExtendedHandshake h
100     --   else
101     --   return Nothing
102
103
104 bitfieldToList :: [Word8] -> [Integer]
105 bitfieldToList bs = go bs 0
106   where go [] _ = []
107         go (b:bs') pos =
108           let setBits = [pos*8 + toInteger i | i <- [0..8], testBit b i]
109           in
110            setBits ++ go bs' (pos + 1)
111
112 -- helper functions to manipulate PeerState
113 toPeerState :: Handle
114             -> Peer
115             -> Bool  -- ^ meChoking
116             -> Bool  -- ^ meInterested
117             -> Bool  -- ^ heChoking
118             -> Bool  -- ^ heInterested
119             -> PState
120 toPeerState h p meCh meIn heCh heIn =
121   PState { handle = h
122          , peer = p
123          , heChoking = heCh
124          , heInterested = heIn
125          , meChoking = meCh
126          , meInterested = meIn }
127
128 handlePeerMsgs :: Peer -> String -> Metainfo -> PieceMap -> Bool -> FS.MsgChannel -> IO ()
129 handlePeerMsgs p peerId m pieceMap isClient c = do
130   h <- connectToPeer p
131   doHandshake isClient h p (infoHash m) peerId
132   let pstate = toPeerState h p False False True True
133   _ <- runStateT (msgLoop pieceMap c) pstate
134   return ()
135
136 msgLoop :: PieceMap -> FS.MsgChannel -> StateT PState IO ()
137 msgLoop pieceStatus msgchannel = do
138   h <- gets handle
139   st <- get
140   case st of
141     PState { meInterested = False, heChoking = True } -> do
142       liftIO $ sendMsg h InterestedMsg
143       gets peer >>= (\p -> liftIO $ putStrLn $ "--> InterestedMsg to peer: " ++ show p)
144       modify (\st' -> st' { meInterested = True })
145       msgLoop pieceStatus msgchannel
146     PState { meInterested = True, heChoking = False } ->
147       case pickPiece pieceStatus of
148         Nothing -> liftIO $ putStrLn "Nothing to download"
149         Just workPiece -> do
150           let pLen = len (pieceStatus ! workPiece)
151           liftIO $ putStrLn $ "piece length = " ++ show pLen
152           pBS <- liftIO $ downloadPiece h workPiece pLen
153           if not $ verifyHash pBS (hash (pieceStatus ! workPiece))
154             then
155             liftIO $ putStrLn "Hash mismatch"
156             else do
157             liftIO $ putStrLn $ "Write piece: " ++ show workPiece
158             liftIO $ FS.writePieceToDisk msgchannel workPiece pBS
159             msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel
160     _ -> do
161       msg <- liftIO $ getMsg h
162       gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ " from peer: " ++ show p)
163       case msg of
164         KeepAliveMsg -> do
165           liftIO $ sendMsg h KeepAliveMsg
166           gets peer >>= (\p -> liftIO $ putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show p)
167           msgLoop pieceStatus msgchannel
168         BitFieldMsg bss -> do
169           p <- gets peer
170           let pieceList = bitfieldToList (unpack bss)
171               pieceStatus' = updatePieceAvailability pieceStatus p pieceList
172           liftIO $ putStrLn $ show (length pieceList) ++ " Pieces"
173           -- for each pieceIndex in pieceList, make an entry in the pieceStatus
174           -- map with pieceIndex as the key and modify the value to add the peer.
175           -- download each of the piece in order
176           msgLoop pieceStatus' msgchannel
177         UnChokeMsg -> do
178           modify (\st' -> st' {heChoking = False })
179           msgLoop pieceStatus msgchannel
180         ChokeMsg -> do
181           modify (\st' -> st' {heChoking = True })
182           msgLoop pieceStatus msgchannel
183         InterestedMsg -> do
184           modify (\st' -> st' {heInterested = True})
185           msgLoop pieceStatus msgchannel
186         NotInterestedMsg -> do
187           modify (\st' -> st' {heInterested = False})
188           msgLoop pieceStatus msgchannel
189         CancelMsg {} -> -- check if valid index, begin, length
190           msgLoop pieceStatus msgchannel
191         PortMsg _ ->
192           msgLoop pieceStatus msgchannel
193         HaveMsg idx -> do
194           p <- gets peer
195           let pieceStatus' = updatePieceAvailability pieceStatus p [idx]
196           msgLoop pieceStatus' msgchannel
197         _ -> do
198           liftIO $ putStrLn ".. not doing anything with the msg"
199           msgLoop pieceStatus msgchannel
200         -- No need to handle PieceMsg and RequestMsg here.
201
202
203 downloadPiece :: Handle -> Integer -> Integer -> IO ByteString
204 downloadPiece h index pieceLength = do
205   let chunks = splitNum pieceLength 16384
206   concat `liftM` forM (zip [0..] chunks) (\(i, pLen) -> do
207                                               sendMsg h (RequestMsg index (i*pLen) pLen)
208                                               putStrLn $ "--> " ++ "RequestMsg for Piece "
209                                                 ++ show index ++ ", part: " ++ show i ++ " of length: "
210                                                 ++ show pLen
211                                               msg <- getMsg h
212                                               case msg of
213                                                 PieceMsg index begin block -> do
214                                                   putStrLn $ " <-- PieceMsg for Piece: "
215                                                     ++ show index
216                                                     ++ ", offset: "
217                                                     ++ show begin
218                                                   return block
219                                                 _ -> do
220                                                   putStrLn $ "ignoring irrelevant msg: " ++ show msg
221                                                   return empty)
222
223
224 {-
225  -- Extension messages support (BEP-0010) --
226
227
228    In the regular peer handshake, adventise support for extension protocol. Protocol
229    extensions are done via the reserved bytes (8 of them) in the handshake message
230    as detailed in BEP-0003. For this particular "Extension Protocol" extension, we use
231    20th bit (counted from the right, from 0) is set to 1.
232
233    Once support for the extension protocol is established by the peer, the Peer is supposed
234    to support one message with the ID 20. This is sent like a regular message with 4-byte
235    length prefix and the msg id (20) in this case.
236
237    First byte of the payload of this message is either 0, which means it is a handshake
238    msg.
239
240    The rest of the payload is a dictionary with various keys. All of them are optional. The
241    one of interest at the moment for me is the one with key 'm' whose value is another
242    dictionary of all supported extensions.
243
244    Here is where it gets interesting for us (to support magneturi. When the torrent client
245    has only got a magneturi to look at, it has only got the list of trackers with it (we
246    are not looking at the DHT case for the time being). So, it somehow needs to get the info
247    dictionary. It gets this by talking to another peer in the network. To do that, the client
248    needs to talk tracker protocol, get the list of peers and talk to peers using the above
249    extension protocol to get the infodict as payload. Let us see how we can do that now.
250
251    If a peer already has the full infodict, then, the handshake message sent by that peer
252    is something like this:
253
254      {'m': {'ut_metadata', 3}, 'metadata_size': 31235}
255
256    Note that the 'metadata_size' is not part of the value of the key 'm'.
257    If we are a new client and are requesting the handshake to a peer, then we don't have
258    the infodict yet, in which case, we only send the first part:
259
260      {'m': {'ut_metadata', 3}}
261
262    This is bencoded and sent across the wire. The value "3" (integer) against the key
263    'ut_metadata" is an ordered integer within a client that identifies the extention.
264    No two extension supported by the same client shares the same value. If the value is
265    '0', then the extension is unsupported.
266
267    Here we use the BEP-0009, the metadata extension protocol. The metadata in this case
268    is the infodict. The infodict itself is divided into 16KB sized pieces.
269
270    Here is a possible interaction between two peers:
271
272    1. Peer Pn comes up, gets the ip/ports of other peers, P0, P1.... Pn does not have the
273    size of the infodict. Pn has advertised itself as supporting the extension protocol.
274    It sends the handshake msg to other peers with this bit on in the reserved bytes.
275    2. Let us say, P1 replied with a handshake. We check if it also supports the extension
276    mechanism.
277    3. Now we get into the extension message passing so that we have the info dict.
278    To do that, we send the extension handshake (ut_metadata) m dict without the
279    metadata_size. We get back the extension handshake with metadata_size. We take
280    note of the size.
281    4. We calculate the number of 16384 chunks in the total size of the metadata. That
282    gives us the number of pieces the metadata has.
283    5. We send a "request" extension msg:
284      {'msg_type': 0, 'piece': 0}
285    6. We recieve the "data" message.
286      {'msg_type': 1, 'piece': 0, 'total_size': 3425} in bencoded format, followed by
287      total_size bytes. total_size is 16KiB except perhaps for the last piece.
288    7. If the peer does not have the requested piece, it sends the "reject" message.
289      {'msg_type': 2, 'piece': 0}
290    8. Repeat 5, 6/7 for every piece.
291
292    At this point, we have the infodict.
293
294 -}
295
296 {-
297 data InfoPieceMap = { infoLength :: Integer
298                     , infoMap :: Map Integer (Maybe ByteString)
299                     }
300
301 newtype InfoState = InfoState (MVar InfoPieceMap)
302
303 -}
304
305
306 metadataMsgLoop :: Handle -> InfoState -> IO ()
307 metadataMsgLoop h (InfoState st) = do
308     infoState <- readMVar st
309     let metadataLen = infoLength infoState
310         -- send the handshake msg
311         metadata = encode (metadataMsg metadataLen)
312     sendMsg h (ExtendedMsg 0 metadata)
313     -- recv return msg from the peer. Will have 'metadata_size'
314     msg <- getMsg h
315     case msg of
316       ExtendedMsg 0 rBs -> do
317         -- decode rBs
318         let (Right (Bdict msgMap)) = decode rBs
319             (Bdict mVal) = msgMap ! "m" -- which is another dict
320             (Bint metadata_msgID) = mVal ! "ut_metadata"
321             (Bint metadata_size) = msgMap ! "metadata_size"
322             -- divide metadata_size into 16384 sized pieces, find number of pieces
323             (q, r) = metadata_size `divMod` 16384
324             -- pNumLengthPairs = zip [0..q-1] (take q (repeat 16384)) ++ (q, r)
325             -- TODO: corner case where infodict size is a multiple of 16384
326             -- and start sending request msg for each.
327         if metadataLen == 0
328           then -- We don't have any piece. Send request msg for all pieces.
329           mapM_ (\n -> do
330                     sendMsg h (ExtendedMsg metadata_msgID (encode (requestMsg n)))
331                     dataOrRejectMsg <- getMsg h
332                     case dataOrRejectMsg of
333                       ExtendedMsg 3 payload -> do
334                         -- bencoded dict followed by XXXXXX
335                         infoState <- takeMVar st
336                         let (Right (Bdict bval, pieceData)) = decodeWithLeftOvers payload
337                             (Bint pieceIndex) = bval ! "piece"
338                             payloadLen = length (unpack pieceData)
339                             infoMapVal = infoMap infoState
340                         putMVar st infoState {
341                           infoMap = insert pieceIndex (Just payload) infoMapVal }
342                 )
343           [0..q]
344           else
345           return () -- TODO: reject for now
346       where
347         metadataMsg 0 = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))]))])
348         metadataMsg l = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))])),
349                                          ("metadata_size", (Bint l))])
350         requestMsg i = Bdict (fromList [("msg_type", (Bint 0)), ("piece", (Bint i))])
351         rejectmsg i = Bdict (fromList [("msg_type", (Bint 2)), ("piece", (Bint i))])
352
353 doesPeerSupportExtendedMsg :: ByteString -> Bool
354 doesPeerSupportExtendedMsg bs = take 1 (drop 5 bs) == singleton 0x10