]> git.rkrishnan.org Git - functorrent.git/blobdiff - src/FuncTorrent/Peer.hs
WIP: msg loop to send/recv infodict
[functorrent.git] / src / FuncTorrent / Peer.hs
index bd66f99a43ef5c0f0357a56fd32ec4cc00c3b781..1c6bf7f77472c00d94c2197e3dd3fa3c0d002b1d 100644 (file)
@@ -1,25 +1,45 @@
+{-
+ - Copyright (C) 2015-2016 Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
+ -
+ - This file is part of FuncTorrent.
+ -
+ - FuncTorrent is free software; you can redistribute it and/or modify
+ - it under the terms of the GNU General Public License as published by
+ - the Free Software Foundation; either version 3 of the License, or
+ - (at your option) any later version.
+ -
+ - FuncTorrent is distributed in the hope that it will be useful,
+ - but WITHOUT ANY WARRANTY; without even the implied warranty of
+ - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ - GNU General Public License for more details.
+ -
+ - You should have received a copy of the GNU General Public License
+ - along with FuncTorrent; if not,  see <http://www.gnu.org/licenses/>
+ -}
+
 {-# LANGUAGE OverloadedStrings #-}
 {-# LANGUAGE OverloadedStrings #-}
+
 module FuncTorrent.Peer
 module FuncTorrent.Peer
-    (Peer(..),
-     PieceMap,
-     handlePeerMsgs
+    (handlePeerMsgs
     ) where
 
 import Prelude hiding (lookup, concat, replicate, splitAt, take, drop)
 
     ) where
 
 import Prelude hiding (lookup, concat, replicate, splitAt, take, drop)
 
+import Control.Concurrent.MVar (MVar, newEmptyMVar, readMVar, putMVar, takeMVar)
 import Control.Monad.State
 import Control.Monad.State
-import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty)
+import Data.ByteString (ByteString, unpack, concat, hGet, hPut, take, drop, empty, singleton)
 import Data.Bits
 import Data.Word (Word8)
 import Data.Bits
 import Data.Word (Word8)
-import Data.Map ((!), adjust)
+import Data.Map (Map, (!), adjust, fromList, insert)
 import Network (connectTo, PortID(..))
 import System.IO (Handle, BufferMode(..), hSetBuffering, hClose)
 
 import Network (connectTo, PortID(..))
 import System.IO (Handle, BufferMode(..), hSetBuffering, hClose)
 
+import FuncTorrent.Bencode(BVal(..), encode, decode, decodeWithLeftOvers)
 import FuncTorrent.Metainfo (Metainfo(..))
 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
 import FuncTorrent.Utils (splitNum, verifyHash)
 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
 import FuncTorrent.Metainfo (Metainfo(..))
 import FuncTorrent.PeerMsgs (Peer(..), PeerMsg(..), sendMsg, getMsg, genHandshakeMsg)
 import FuncTorrent.Utils (splitNum, verifyHash)
 import FuncTorrent.PieceManager (PieceDlState(..), PieceData(..), PieceMap, pickPiece, updatePieceAvailability)
-import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePiece, Piece(..))
+import qualified FuncTorrent.FileSystem as FS (MsgChannel, writePieceToDisk)
 
 data PState = PState { handle :: Handle
                      , peer :: Peer
 
 data PState = PState { handle :: Handle
                      , peer :: Peer
@@ -28,28 +48,42 @@ data PState = PState { handle :: Handle
                      , heChoking :: Bool
                      , heInterested :: Bool}
 
                      , heChoking :: Bool
                      , heInterested :: Bool}
 
+data InfoPieceMap = InfoPieceMap { infoLength :: Integer
+                                 , infoMap :: Map Integer (Maybe ByteString) }
+
+newtype InfoState = InfoState (MVar InfoPieceMap)
+
 havePiece :: PieceMap -> Integer -> Bool
 havePiece pm index =
   dlstate (pm ! index) == Have
 
 connectToPeer :: Peer -> IO Handle
 havePiece :: PieceMap -> Integer -> Bool
 havePiece pm index =
   dlstate (pm ! index) == Have
 
 connectToPeer :: Peer -> IO Handle
-connectToPeer (Peer ip port) = do
+connectToPeer (Peer ip port) = do
   h <- connectTo ip (PortNumber (fromIntegral port))
   hSetBuffering h LineBuffering
   return h
 
   h <- connectTo ip (PortNumber (fromIntegral port))
   hSetBuffering h LineBuffering
   return h
 
+
 doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO ()
 doHandshake True h p infohash peerid = do
   let hs = genHandshakeMsg infohash peerid
   hPut h hs
   putStrLn $ "--> handhake to peer: " ++ show p
 doHandshake :: Bool -> Handle -> Peer -> ByteString -> String -> IO ()
 doHandshake True h p infohash peerid = do
   let hs = genHandshakeMsg infohash peerid
   hPut h hs
   putStrLn $ "--> handhake to peer: " ++ show p
-  _ <- hGet h (length (unpack hs))
+  hsMsg <- hGet h (length (unpack hs))
   putStrLn $ "<-- handshake from peer: " ++ show p
   putStrLn $ "<-- handshake from peer: " ++ show p
+  infoPieceMap <- newEmptyMVar
+  metadataMsgLoop h $ InfoState infoPieceMap
   return ()
   return ()
+  -- if doesPeerSupportExtendedMsg hsMsg
+  --   then
+  --   return doExtendedHandshake h
+  --   else
+  --   return Nothing
 doHandshake False h p infohash peerid = do
   let hs = genHandshakeMsg infohash peerid
   putStrLn "waiting for a handshake"
 doHandshake False h p infohash peerid = do
   let hs = genHandshakeMsg infohash peerid
   putStrLn "waiting for a handshake"
-  hsMsg <- hGet h (length (unpack hs))
+  -- read 28 bytes. '19' ++ 'BitTorrent Protocol' ++ 8 reserved bytes
+  hsMsg <- hGet h 28
   putStrLn $ "<-- handshake from peer: " ++ show p
   let rxInfoHash = take 20 $ drop 28 hsMsg
   if rxInfoHash /= infohash
   putStrLn $ "<-- handshake from peer: " ++ show p
   let rxInfoHash = take 20 $ drop 28 hsMsg
   if rxInfoHash /= infohash
@@ -60,7 +94,12 @@ doHandshake False h p infohash peerid = do
     else do
     _ <- hPut h hs
     putStrLn $ "--> handhake to peer: " ++ show p
     else do
     _ <- hPut h hs
     putStrLn $ "--> handhake to peer: " ++ show p
-    return ()
+    -- if doesPeerSupportExtendedMsg hsMsg
+    --   then do
+    --   doExtendedHandshake h
+    --   else
+    --   return Nothing
+
 
 bitfieldToList :: [Word8] -> [Integer]
 bitfieldToList bs = go bs 0
 
 bitfieldToList :: [Word8] -> [Integer]
 bitfieldToList bs = go bs 0
@@ -116,11 +155,11 @@ msgLoop pieceStatus msgchannel = do
             liftIO $ putStrLn "Hash mismatch"
             else do
             liftIO $ putStrLn $ "Write piece: " ++ show workPiece
             liftIO $ putStrLn "Hash mismatch"
             else do
             liftIO $ putStrLn $ "Write piece: " ++ show workPiece
-            liftIO $ FS.writePiece msgchannel workPiece pBS
+            liftIO $ FS.writePieceToDisk msgchannel workPiece pBS
             msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel
     _ -> do
       msg <- liftIO $ getMsg h
             msgLoop (adjust (\pieceData -> pieceData { dlstate = Have }) workPiece pieceStatus) msgchannel
     _ -> do
       msg <- liftIO $ getMsg h
-      gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ "from peer: " ++ show p)
+      gets peer >>= (\p -> liftIO $ putStrLn $ "<-- " ++ show msg ++ " from peer: " ++ show p)
       case msg of
         KeepAliveMsg -> do
           liftIO $ sendMsg h KeepAliveMsg
       case msg of
         KeepAliveMsg -> do
           liftIO $ sendMsg h KeepAliveMsg
@@ -147,12 +186,18 @@ msgLoop pieceStatus msgchannel = do
         NotInterestedMsg -> do
           modify (\st' -> st' {heInterested = False})
           msgLoop pieceStatus msgchannel
         NotInterestedMsg -> do
           modify (\st' -> st' {heInterested = False})
           msgLoop pieceStatus msgchannel
-        CancelMsg _ _ _ -> -- check if valid index, begin, length
+        CancelMsg {} -> -- check if valid index, begin, length
           msgLoop pieceStatus msgchannel
         PortMsg _ ->
           msgLoop pieceStatus msgchannel
           msgLoop pieceStatus msgchannel
         PortMsg _ ->
           msgLoop pieceStatus msgchannel
-        -- handle RequestMsg, HaveMsg. No need to handle PieceMsg here.
-        -- also BitFieldMsg
+        HaveMsg idx -> do
+          p <- gets peer
+          let pieceStatus' = updatePieceAvailability pieceStatus p [idx]
+          msgLoop pieceStatus' msgchannel
+        _ -> do
+          liftIO $ putStrLn ".. not doing anything with the msg"
+          msgLoop pieceStatus msgchannel
+        -- No need to handle PieceMsg and RequestMsg here.
 
 
 downloadPiece :: Handle -> Integer -> Integer -> IO ByteString
 
 
 downloadPiece :: Handle -> Integer -> Integer -> IO ByteString
@@ -175,3 +220,135 @@ downloadPiece h index pieceLength = do
                                                   putStrLn $ "ignoring irrelevant msg: " ++ show msg
                                                   return empty)
 
                                                   putStrLn $ "ignoring irrelevant msg: " ++ show msg
                                                   return empty)
 
+
+{-
+ -- Extension messages support (BEP-0010) --
+
+
+   In the regular peer handshake, adventise support for extension protocol. Protocol
+   extensions are done via the reserved bytes (8 of them) in the handshake message
+   as detailed in BEP-0003. For this particular "Extension Protocol" extension, we use
+   20th bit (counted from the right, from 0) is set to 1.
+
+   Once support for the extension protocol is established by the peer, the Peer is supposed
+   to support one message with the ID 20. This is sent like a regular message with 4-byte
+   length prefix and the msg id (20) in this case.
+
+   First byte of the payload of this message is either 0, which means it is a handshake
+   msg.
+
+   The rest of the payload is a dictionary with various keys. All of them are optional. The
+   one of interest at the moment for me is the one with key 'm' whose value is another
+   dictionary of all supported extensions.
+
+   Here is where it gets interesting for us (to support magneturi. When the torrent client
+   has only got a magneturi to look at, it has only got the list of trackers with it (we
+   are not looking at the DHT case for the time being). So, it somehow needs to get the info
+   dictionary. It gets this by talking to another peer in the network. To do that, the client
+   needs to talk tracker protocol, get the list of peers and talk to peers using the above
+   extension protocol to get the infodict as payload. Let us see how we can do that now.
+
+   If a peer already has the full infodict, then, the handshake message sent by that peer
+   is something like this:
+
+     {'m': {'ut_metadata', 3}, 'metadata_size': 31235}
+
+   Note that the 'metadata_size' is not part of the value of the key 'm'.
+   If we are a new client and are requesting the handshake to a peer, then we don't have
+   the infodict yet, in which case, we only send the first part:
+
+     {'m': {'ut_metadata', 3}}
+
+   This is bencoded and sent across the wire. The value "3" (integer) against the key
+   'ut_metadata" is an ordered integer within a client that identifies the extention.
+   No two extension supported by the same client shares the same value. If the value is
+   '0', then the extension is unsupported.
+
+   Here we use the BEP-0009, the metadata extension protocol. The metadata in this case
+   is the infodict. The infodict itself is divided into 16KB sized pieces.
+
+   Here is a possible interaction between two peers:
+
+   1. Peer Pn comes up, gets the ip/ports of other peers, P0, P1.... Pn does not have the
+   size of the infodict. Pn has advertised itself as supporting the extension protocol.
+   It sends the handshake msg to other peers with this bit on in the reserved bytes.
+   2. Let us say, P1 replied with a handshake. We check if it also supports the extension
+   mechanism.
+   3. Now we get into the extension message passing so that we have the info dict.
+   To do that, we send the extension handshake (ut_metadata) m dict without the
+   metadata_size. We get back the extension handshake with metadata_size. We take
+   note of the size.
+   4. We calculate the number of 16384 chunks in the total size of the metadata. That
+   gives us the number of pieces the metadata has.
+   5. We send a "request" extension msg:
+     {'msg_type': 0, 'piece': 0}
+   6. We recieve the "data" message.
+     {'msg_type': 1, 'piece': 0, 'total_size': 3425} in bencoded format, followed by
+     total_size bytes. total_size is 16KiB except perhaps for the last piece.
+   7. If the peer does not have the requested piece, it sends the "reject" message.
+     {'msg_type': 2, 'piece': 0}
+   8. Repeat 5, 6/7 for every piece.
+
+   At this point, we have the infodict.
+
+-}
+
+{-
+data InfoPieceMap = { infoLength :: Integer
+                    , infoMap :: Map Integer (Maybe ByteString)
+                    }
+
+newtype InfoState = InfoState (MVar InfoPieceMap)
+
+-}
+
+
+metadataMsgLoop :: Handle -> InfoState -> IO ()
+metadataMsgLoop h (InfoState st) = do
+    infoState <- readMVar st
+    let metadataLen = infoLength infoState
+        -- send the handshake msg
+        metadata = encode (metadataMsg metadataLen)
+    sendMsg h (ExtendedMsg 0 metadata)
+    -- recv return msg from the peer. Will have 'metadata_size'
+    msg <- getMsg h
+    case msg of
+      ExtendedMsg 0 rBs -> do
+        -- decode rBs
+        let (Right (Bdict msgMap)) = decode rBs
+            (Bdict mVal) = msgMap ! "m" -- which is another dict
+            (Bint metadata_msgID) = mVal ! "ut_metadata"
+            (Bint metadata_size) = msgMap ! "metadata_size"
+            -- divide metadata_size into 16384 sized pieces, find number of pieces
+            (q, r) = metadata_size `divMod` 16384
+            -- pNumLengthPairs = zip [0..q-1] (take q (repeat 16384)) ++ (q, r)
+            -- TODO: corner case where infodict size is a multiple of 16384
+            -- and start sending request msg for each.
+        if metadataLen == 0
+          then -- We don't have any piece. Send request msg for all pieces.
+          mapM_ (\n -> do
+                    sendMsg h (ExtendedMsg metadata_msgID (encode (requestMsg n)))
+                    dataOrRejectMsg <- getMsg h
+                    case dataOrRejectMsg of
+                      ExtendedMsg 3 payload -> do
+                        -- bencoded dict followed by XXXXXX
+                        infoState <- takeMVar st
+                        let (Right (Bdict bval, pieceData)) = decodeWithLeftOvers payload
+                            (Bint pieceIndex) = bval ! "piece"
+                            payloadLen = length (unpack pieceData)
+                            infoMapVal = infoMap infoState
+                        putMVar st infoState {
+                          infoMap = insert pieceIndex (Just payload) infoMapVal }
+                )
+          [0..q]
+          else
+          return () -- TODO: reject for now
+      where
+        metadataMsg 0 = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))]))])
+        metadataMsg l = Bdict (fromList [("m", Bdict (fromList [("ut_metadata", (Bint 3))])),
+                                         ("metadata_size", (Bint l))])
+        requestMsg i = Bdict (fromList [("msg_type", (Bint 0)), ("piece", (Bint i))])
+        rejectmsg i = Bdict (fromList [("msg_type", (Bint 2)), ("piece", (Bint i))])
+
+doesPeerSupportExtendedMsg :: ByteString -> Bool
+doesPeerSupportExtendedMsg bs = take 1 (drop 5 bs) == singleton 0x10