]> git.rkrishnan.org Git - functorrent.git/blob - src/FuncTorrent/Peer.hs
refactor msgloop a bit
[functorrent.git] / src / FuncTorrent / Peer.hs
1 {-# LANGUAGE OverloadedStrings #-}
2 module FuncTorrent.Peer
3     (Peer(..),
4      handlePeerMsgs
5     ) where
6
7 import Prelude hiding (lookup, concat, replicate, splitAt, take)
8
9 import System.IO (Handle, BufferMode(..), hSetBuffering)
10 import Data.ByteString (ByteString, pack, unpack, concat, hGet, hPut, singleton, take, empty)
11 import Data.ByteString.Lazy (fromStrict, fromChunks, toStrict)
12 import qualified Data.ByteString.Char8 as BC (replicate, pack, length)
13 import Network (connectTo, PortID(..))
14 import Data.Binary (Binary(..), decode, encode)
15 import Data.Binary.Put (putWord32be, putWord16be, putWord8)
16 import Data.Binary.Get (getWord32be, getWord16be, getWord8, runGet)
17 import Control.Monad (replicateM, liftM, forM)
18 import Control.Applicative ((<$>), liftA3)
19 import Data.Bits
20 import Data.Word (Word8)
21 import Data.Map (Map, fromList, toList, (!), mapWithKey, adjust)
22 import qualified Crypto.Hash.SHA1 as SHA1 (hash)
23
24 import FuncTorrent.Metainfo (Info(..), Metainfo(..))
25 import FuncTorrent.Utils (splitN, splitNum)
26 import FuncTorrent.Fileops (createDummyFile, writeFileAtOffset)
27
28 type ID = String
29 type IP = String
30 type Port = Integer
31
32 -- PeerState is a misnomer
33 data PeerState = PeerState { handle :: Handle
34                            , peer :: Peer
35                            , meChoking :: Bool
36                            , meInterested :: Bool
37                            , heChoking :: Bool
38                            , heInterested :: Bool}
39
40 data PieceDlState = Pending
41                   | InProgress
42                   | Have
43                   deriving (Show, Eq)
44
45 -- todo - map with index to a new data structure (peers who have that piece amd state)
46 data PieceData = PieceData { peers :: [Peer]        -- ^ list of peers who have this piece
47                            , state :: PieceDlState  -- ^ state of the piece from download perspective.
48                            , hash  :: ByteString    -- ^ piece hash
49                            , len :: Integer }       -- ^ piece length
50
51 -- which piece is with which peers
52 type PieceMap = Map Integer PieceData
53
54 -- | Peer is a PeerID, IP address, port tuple
55 data Peer = Peer ID IP Port
56           deriving (Show, Eq)
57
58 data PeerMsg = KeepAliveMsg
59              | ChokeMsg
60              | UnChokeMsg
61              | InterestedMsg
62              | NotInterestedMsg
63              | HaveMsg Integer
64              | BitFieldMsg ByteString
65              | RequestMsg Integer Integer Integer
66              | PieceMsg Integer Integer ByteString
67              | CancelMsg Integer Integer Integer
68              | PortMsg Port
69              deriving (Show)
70
71 -- Make the initial Piece map, with the assumption that no peer has the
72 -- piece and that every piece is pending download.
73 mkPieceMap :: Integer -> ByteString -> [Integer] -> PieceMap
74 mkPieceMap numPieces pieceHash pLengths = fromList kvs
75   where kvs = [(i, PieceData { peers = []
76                              , state = Pending
77                              , hash = h
78                              , len = pLen })
79               | (i, h, pLen) <- zip3 [0..numPieces] hashes pLengths]
80         hashes = splitN 20 pieceHash
81
82 havePiece :: PieceMap -> Integer -> Bool
83 havePiece pm index =
84   state (pm ! index) == Have
85
86 genHandShakeMsg :: ByteString -> String -> ByteString
87 genHandShakeMsg infoHash peer_id = concat [pstrlen, pstr, reserved, infoHash, peerID]
88   where pstrlen = singleton 19
89         pstr = BC.pack "BitTorrent protocol"
90         reserved = BC.replicate 8 '\0'
91         peerID = BC.pack peer_id
92
93 connectToPeer :: Peer -> IO Handle
94 connectToPeer peer@(Peer _ ip port) = do
95   h <- connectTo ip (PortNumber (fromIntegral port))
96   hSetBuffering h LineBuffering
97   return h
98
99 doHandShake :: Handle -> Peer -> ByteString -> String -> IO ()
100 doHandShake h peer infoHash peerid = do
101   let hs = genHandShakeMsg infoHash peerid
102   hPut h hs
103   putStrLn $ "--> handhake to peer: " ++ show peer
104   _ <- hGet h (length (unpack hs))
105   putStrLn $ "<-- handshake from peer: " ++ show peer
106   return ()
107
108 instance Binary PeerMsg where
109   put msg = case msg of
110              KeepAliveMsg -> putWord32be 0
111              ChokeMsg -> do putWord32be 1
112                             putWord8 0
113              UnChokeMsg -> do putWord32be 1
114                               putWord8 1
115              InterestedMsg -> do putWord32be 1
116                                  putWord8 2
117              NotInterestedMsg -> do putWord32be 1
118                                     putWord8 3
119              HaveMsg i -> do putWord32be 5
120                              putWord8 4
121                              putWord32be (fromIntegral i)
122              BitFieldMsg bf -> do putWord32be $ fromIntegral (1 + bfListLen)
123                                   putWord8 5
124                                   mapM_ putWord8 bfList
125                                     where bfList = unpack bf
126                                           bfListLen = length bfList
127              RequestMsg i o l -> do putWord32be 13
128                                     putWord8 6
129                                     putWord32be (fromIntegral i)
130                                     putWord32be (fromIntegral o)
131                                     putWord32be (fromIntegral l)
132              PieceMsg i o b -> do putWord32be $ fromIntegral (9 + blocklen)
133                                   putWord8 7
134                                   putWord32be (fromIntegral i)
135                                   putWord32be (fromIntegral o)
136                                   mapM_ putWord8 blockList
137                                     where blockList = unpack b
138                                           blocklen = length blockList
139              CancelMsg i o l -> do putWord32be 13
140                                    putWord8 8
141                                    putWord32be (fromIntegral i)
142                                    putWord32be (fromIntegral o)
143                                    putWord32be (fromIntegral l)
144              PortMsg p -> do putWord32be 3
145                              putWord8 9
146                              putWord16be (fromIntegral p)
147   get = do
148     l <- getWord32be
149     msgid <- getWord8
150     case msgid of
151      0 -> return ChokeMsg
152      1 -> return UnChokeMsg
153      2 -> return InterestedMsg
154      3 -> return NotInterestedMsg
155      4 -> liftM (HaveMsg . fromIntegral) getWord32be
156      5 -> liftM (BitFieldMsg . pack) (replicateM (fromIntegral l - 1) getWord8)
157      6 -> liftA3 RequestMsg getInteger getInteger getInteger
158        where getInteger = fromIntegral <$> getWord32be
159      7 -> liftA3 PieceMsg getInteger getInteger (pack  <$> replicateM (fromIntegral l - 9) getWord8)
160        where getInteger = fromIntegral <$> getWord32be
161      8 -> liftA3 CancelMsg getInteger getInteger getInteger
162        where getInteger = fromIntegral <$> getWord32be
163      9 -> liftM (PortMsg . fromIntegral) getWord16be
164      _ -> error ("unknown message ID: " ++ show msgid)
165
166 getMsg :: Handle -> IO PeerMsg
167 getMsg h = do
168   lBS <- hGet h 4
169   let l = bsToInt lBS
170   if l == 0
171     then return KeepAliveMsg
172     else do
173     msg <- hGet h l
174     return $ decode $ fromStrict $ concat [lBS, msg]
175
176 sendMsg :: Handle -> PeerMsg -> IO ()
177 sendMsg h msg = hPut h bsMsg
178   where bsMsg = toStrict $ encode msg
179
180 bsToInt :: ByteString -> Int
181 bsToInt x = fromIntegral (runGet getWord32be (fromChunks (return x)))
182
183 bitfieldToList :: [Word8] -> [Integer]
184 bitfieldToList bs = go bs 0
185   where go [] _ = []
186         go (b:bs') pos =
187           let setBits = [pos*8 + toInteger i | i <- [0..8], testBit b i]
188           in
189            setBits ++ go bs' (pos + 1)
190
191 -- recvMsg :: Peer -> Handle -> Msg
192 msgLoop :: PeerState -> PieceMap -> IO ()
193 msgLoop pState@(PeerState { meInterested = False , heChoking = True }) pieceStatus =
194   do
195     -- if me NOT Interested and she is Choking, tell her that
196     -- I am interested.
197     let h = handle pState
198     sendMsg h InterestedMsg
199     putStrLn $ "--> InterestedMsg to peer: " ++ show (peer pState)
200     msgLoop (pState { meInterested = True }) pieceStatus
201 msgLoop pState@(PeerState { meInterested = True, heChoking = False }) pieceStatus =
202   -- if me Interested and she not Choking, send her a request
203   -- for a piece.
204   case pickPiece pieceStatus of
205    Nothing -> putStrLn "Nothing to download"
206    Just workPiece -> do
207      let pLen = len (pieceStatus ! workPiece)
208      putStrLn $ "piece length = " ++ show pLen
209      pBS <- downloadPiece (handle pState) workPiece pLen
210      if not $ verifyHash pBS (hash (pieceStatus ! workPiece))
211        then
212        putStrLn $ "Hash mismatch: " ++ show (hash (pieceStatus ! workPiece)) ++ " vs " ++ show (take 20 (SHA1.hash pBS))
213        else do
214        let fileOffset = if workPiece == 0 then 0 else workPiece * len (pieceStatus ! (workPiece - 1))
215        putStrLn $ "Write into file at offset: " ++ show fileOffset
216        writeFileAtOffset "/tmp/download.file" fileOffset pBS
217        msgLoop pState (adjust (\pieceData -> pieceData { state = Have }) workPiece pieceStatus)
218 msgLoop pState pieceStatus = do
219   msg <- getMsg (handle pState)
220   putStrLn $ "<-- " ++ show msg ++ "from peer: " ++ show (peer pState)
221   case msg of
222    KeepAliveMsg -> do
223      sendMsg (handle pState) KeepAliveMsg
224      putStrLn $ "--> " ++ "KeepAliveMsg to peer: " ++ show (peer pState)
225      msgLoop pState pieceStatus
226    BitFieldMsg bss -> do
227      let pieceList = bitfieldToList (unpack bss)
228          pieceStatus' = updatePieceAvailability pieceStatus (peer pState) pieceList
229      putStrLn $ show (length pieceList) ++ " Pieces"
230      -- for each pieceIndex in pieceList, make an entry in the pieceStatus
231      -- map with pieceIndex as the key and modify the value to add the peer.
232      -- download each of the piece in order
233      msgLoop pState pieceStatus'
234    UnChokeMsg ->
235      msgLoop (pState { heChoking = False }) pieceStatus
236    _ ->
237      msgLoop pState pieceStatus
238
239 -- simple algorithm to pick piece.
240 -- pick the first piece from 0 that is not downloaded yet.
241 pickPiece :: PieceMap -> Maybe Integer
242 pickPiece m =
243   let pieceList = toList m
244       allPending = filter (\(_, v) -> state v == Pending) pieceList
245   in
246    case allPending of
247     [] -> Nothing
248     ((i, _):_) -> Just i
249
250 updatePieceAvailability :: PieceMap -> Peer -> [Integer] -> PieceMap
251 updatePieceAvailability pieceStatus p pieceList =
252   mapWithKey (\k pd -> if k `elem` pieceList
253                        then (pd { peers = p : peers pd })
254                        else pd) pieceStatus
255
256 handlePeerMsgs :: Peer -> Metainfo -> String -> IO ()
257 handlePeerMsgs p m peerId = do
258   h <- connectToPeer p
259   doHandShake h p (infoHash m) peerId
260   let state = PeerState { handle = h
261                         , peer = p
262                         , heInterested = False
263                         , heChoking = True
264                         , meInterested = False
265                         , meChoking = True }
266       pieceHash = pieces (info m)
267       numPieces = (toInteger . (`quot` 20) . BC.length) pieceHash
268       pLen = pieceLength (info m)
269       fileLen = lengthInBytes (info m)
270       pieceStatus = mkPieceMap numPieces pieceHash (splitNum fileLen pLen)
271   createDummyFile "/tmp/download.file" (fromIntegral fileLen)
272   msgLoop state pieceStatus
273   
274 downloadPiece :: Handle -> Integer -> Integer -> IO ByteString
275 downloadPiece h index pieceLength = do
276   let chunks = splitNum pieceLength 16384
277   liftM concat $ forM (zip [0..] chunks) (\(i, pLen) -> do
278                                              sendMsg h (RequestMsg index (i*pLen) pLen)
279                                              putStrLn $ "--> " ++ "RequestMsg for Piece "
280                                                ++ show index ++ ", part: " ++ show i ++ " of length: "
281                                                ++ show pLen
282                                              msg <- getMsg h
283                                              case msg of
284                                               PieceMsg index begin block -> do
285                                                 putStrLn $ " <-- PieceMsg for Piece: "
286                                                   ++ show index
287                                                   ++ ", offset: "
288                                                   ++ show begin
289                                                 return block
290                                               _ -> do
291                                                 putStrLn "ignoring irrelevant msg"
292                                                 return empty)
293
294 verifyHash :: ByteString -> ByteString -> Bool
295 verifyHash bs pieceHash =
296   take 20 (SHA1.hash bs) == pieceHash