]> git.rkrishnan.org Git - functorrent.git/blobdiff - src/FuncTorrent/Tracker/Udp.hs
Tracker/Udp: cleanup of warnings
[functorrent.git] / src / FuncTorrent / Tracker / Udp.hs
index 2437003ec47692f5667e47dba970613823849a8b..ae7ad1e72b56166a80616c592c1cf05270e1d002 100644 (file)
@@ -23,22 +23,26 @@ module FuncTorrent.Tracker.Udp
        ) where
 
 import Control.Applicative (liftA2)
-import Control.Monad (liftM)
-import Control.Concurrent.MVar (readMVar)
+import Control.Monad (forever, void)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar)
 import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO)
 import Data.Binary (Binary(..), encode, decode)
-import Data.Binary.Get (Get, isEmpty, getWord32be, getByteString)
+import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString)
 import Data.Binary.Put (putWord16be, putWord64be, putWord32be, putByteString)
 import Data.ByteString (ByteString)
 import qualified Data.ByteString.Char8 as BC
 import Data.ByteString.Lazy (fromStrict, toStrict)
 import Data.Word (Word16, Word32, Word64)
+import Network (PortNumber)
 import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..))
 import Network.Socket.ByteString (sendTo, recvFrom)
 import System.Random (randomIO)
+import System.Timeout (timeout)
 
-import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..))
-import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort)
+import FuncTorrent.PeerMsgs (Peer(..))
+import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..), UdpTrackerResponse(..))
+import FuncTorrent.Utils (Port, toIP, toPort, getHostname, getPort)
 import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
 
 -- UDP tracker: http://bittorrent.org/beps/bep_0015.html
@@ -53,14 +57,13 @@ data UDPRequest = ConnectReq Word32
                 deriving (Show, Eq)
 
 data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id
-                 | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)]
+                 | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)]
                  | ScrapeResp Integer Integer Integer Integer
                  | ErrorResp Integer String
                  deriving (Show, Eq)
 
 data UDPTrackerHandle = UDPTrackerHandle { sock :: Socket
                                          , addr :: SockAddr
-                                         , tid  :: Word32
                                          }
 
 actionToInteger :: Action -> Integer
@@ -68,11 +71,6 @@ actionToInteger Connect  = 0
 actionToInteger Announce = 1
 actionToInteger Scrape   = 2
 
-intToAction :: Integer -> Action
-intToAction 0 = Connect
-intToAction 1 = Announce
-intToAction 2 = Scrape
-
 eventToInteger :: TrackerEventState -> Integer
 eventToInteger None = 0
 eventToInteger Completed = 1
@@ -84,19 +82,19 @@ instance Binary UDPRequest where
     putWord64be 0x41727101980
     putWord32be $ fromIntegral (actionToInteger Connect)
     putWord32be (fromIntegral transId)
-  put (AnnounceReq connId transId infohash peerId down left up event port) = do
+  put (AnnounceReq connId transId infohash peerId down left' up event port) = do
     putWord64be $ fromIntegral connId
     putWord32be $ fromIntegral (actionToInteger Announce)
     putWord32be $ fromIntegral transId
     putByteString infohash
     putByteString (BC.pack peerId)
     putWord64be (fromIntegral down)
-    putWord64be (fromIntegral left)
+    putWord64be (fromIntegral left')
     putWord64be (fromIntegral up)
-    putWord32be $ fromIntegral (eventToInteger None)
+    putWord32be $ fromIntegral (eventToInteger event)
+    putWord32be 0
     putWord32be 0
-    -- key is optional, we will not send it for now
-    putWord32be $ fromIntegral (-1)
+    putWord32be 10
     putWord16be $ fromIntegral port
   put (ScrapeReq _ _ _) = undefined
   get = undefined
@@ -106,24 +104,24 @@ instance Binary UDPResponse where
   get = do
     a <- getWord32be -- action
     case a of
-      0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord32be)
+      0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord64be)
       1 -> do
-        tid <- fromIntegral <$> getWord32be
+        tid' <- fromIntegral <$> getWord32be
         interval' <- fromIntegral <$> getWord32be
         l <- getWord32be -- leechers
         s <- getWord32be -- seeders
         ipportpairs <- getIPPortPairs -- [(ip, port)]
-        return $ AnnounceResp tid interval' l s ipportpairs
+        return $ AnnounceResp tid' interval' l s ipportpairs
       2 -> do
-        tid <- fromIntegral <$> getWord32be
+        tid' <- fromIntegral <$> getWord32be
         _ <- getWord32be
         _ <- getWord32be
         _ <- getWord32be
-        return $ ScrapeResp tid 0 0 0
+        return $ ScrapeResp tid' 0 0 0
       3 -> do -- error response
-        tid <- fromIntegral <$> getWord32be
+        tid' <- fromIntegral <$> getWord32be
         bs  <- getByteString 4
-        return $ ErrorResp tid $ BC.unpack bs
+        return $ ErrorResp tid' $ BC.unpack bs
       _ -> error ("unknown response action type: " ++ show a)
 
 sendRequest :: UDPTrackerHandle -> ByteString -> IO ()
@@ -149,7 +147,6 @@ connectResponse :: Word32 -> ReaderT UDPTrackerHandle IO Word64
 connectResponse tid = do
   h <- ask
   resp <- liftIO $ recvResponse h
-  liftIO $ print resp
   -- check if nbytes is at least 16 bytes long
   case resp of
     (ConnectResp tidr cid) ->
@@ -162,43 +159,37 @@ connectResponse tid = do
     _                      -> return 0
 
 announceRequest :: Word64 -> ByteString -> String -> Word64 -> Word64 -> Word64 -> Word16 -> ReaderT UDPTrackerHandle IO Word32
-announceRequest cid infohash peerId up down left port = do
+announceRequest cid infohash peerId up down left' port = do
   h <- ask
   tidi <- liftIO randomIO
-  -- connId transId infohash peerId down left up event port)
-  let pkt = encode $ AnnounceReq cid tidi infohash peerId down left up None port
+  let pkt = encode $ AnnounceReq cid tidi infohash peerId down left' up None port
   liftIO $ sendRequest h (toStrict pkt)
   return tidi
 
-data PeerStats = PeerStats { leechers :: Word32
-                           , seeders :: Word32
-                           , peers :: [(IP, Port)]
-                           } deriving (Show)
-
-announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats
+announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO UdpTrackerResponse
 announceResponse tid = do
   h <- ask
   resp <- liftIO $ recvResponse h
   case resp of
-    (AnnounceResp tidr interval ss ls xs) ->
+    (AnnounceResp tidr interval' ss ls xs) ->
       if tidr == tid
       then do
         liftIO $ putStrLn "announce succeeded"
-        return $ PeerStats ls ss xs
+        return $ UdpTrackerResponse ls ss interval' xs
       else
-        return $ PeerStats 0 0 []
-    _ -> return $ PeerStats 0 0 []
+        return $ UdpTrackerResponse 0 0 0 []
+    _ -> return $ UdpTrackerResponse 0 0 0 []
 
-getIPPortPairs :: Get [(IP, Port)]
+getIPPortPairs :: Get [Peer]
 getIPPortPairs = do
   empty <- isEmpty
   if empty
     then return []
     else do
-    ip <- toIP <$> getByteString 6
+    ip <- toIP <$> getByteString 4
     port <- toPort <$> getByteString 2
     ipportpairs <- getIPPortPairs
-    return $ (ip, port) : ipportpairs
+    return $ (Peer ip port) : ipportpairs
 
 startSession :: String -> Port -> IO UDPTrackerHandle
 startSession host port = do
@@ -206,26 +197,37 @@ startSession host port = do
   addrinfos <- getAddrInfo Nothing (Just host) (Just (show port))
   let (SockAddrInet p ip) = addrAddress $ head addrinfos
   putStrLn "connected to tracker"
-  return UDPTrackerHandle { sock = s
-                            , addr = (SockAddrInet (fromIntegral port) ip) }
+  return UDPTrackerHandle { sock = s
+                          , addr = (SockAddrInet (fromIntegral port) ip) }
   
 closeSession :: UDPTrackerHandle -> IO ()
-closeSession (UDPTrackerHandle s _ _) = close s
-
-trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
-trackerLoop url sport peerId infohash fschan tstate = do
-  st' <- FS.getStats fschan
-  st <- readMVar st'
-  let up = FS.bytesRead st
-      down = FS.bytesWritten st
-      port = getPort url
-      host = getHostname url
-  putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port)
+closeSession (UDPTrackerHandle s _) = close s
+
+trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
+trackerLoop url sport peerId infohash fschan tstate = forever $ do
+  st <- readMVar <$> FS.getStats fschan
+  up <- fmap FS.bytesRead st
+  down <- fmap FS.bytesWritten st
   handle <- startSession host port
-  flip runReaderT handle $ do
-    t1 <- connectRequest
-    cid <- connectResponse t1
-    liftIO $ print cid
-    t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
-    stats <- announceResponse t2
-    liftIO $ print stats
+  stats <- timeout (15*oneSec) $ worker handle up down
+  case stats of
+    Nothing -> closeSession handle
+    Just stats' -> do
+      ps <- isEmptyMVar $ connectedPeers tstate
+      if ps
+        then
+        putMVar (connectedPeers tstate) (peers stats')
+        else
+        void $ swapMVar (connectedPeers tstate) (peers stats')
+      threadDelay $ fromIntegral (interval stats') * oneSec
+      return ()
+  where
+    oneSec = 1000000
+    port = getPort url
+    host = getHostname url
+    worker handle up down = flip runReaderT handle $ do
+      t1 <- connectRequest
+      cid <- connectResponse t1
+      t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
+      stats <- announceResponse t2
+      return stats