X-Git-Url: https://git.rkrishnan.org/?p=functorrent.git;a=blobdiff_plain;f=src%2FFuncTorrent%2FTracker%2FUdp.hs;h=aaa99472b44c3a9d72f955afd0f35a5e8c05d534;hp=7db020281403304f6e7cc92ea7dcefc673f1c481;hb=272216c101f5f411726898f90355956ab9a105b7;hpb=5d445371de19f2018b7f93ac8e87ac07b8848003 diff --git a/src/FuncTorrent/Tracker/Udp.hs b/src/FuncTorrent/Tracker/Udp.hs index 7db0202..aaa9947 100644 --- a/src/FuncTorrent/Tracker/Udp.hs +++ b/src/FuncTorrent/Tracker/Udp.hs @@ -23,11 +23,12 @@ module FuncTorrent.Tracker.Udp ) where import Control.Applicative (liftA2) -import Control.Monad (liftM) -import Control.Concurrent.MVar (readMVar) +import Control.Monad (liftM, forever, void) +import Control.Concurrent (threadDelay) +import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar) import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO) import Data.Binary (Binary(..), encode, decode) -import Data.Binary.Get (Get, isEmpty, getWord32be, getByteString) +import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString) import Data.Binary.Put (putWord16be, putWord64be, putWord32be, putByteString) import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as BC @@ -36,8 +37,10 @@ import Data.Word (Word16, Word32, Word64) import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..)) import Network.Socket.ByteString (sendTo, recvFrom) import System.Random (randomIO) +import System.Timeout (timeout) -import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..)) +import FuncTorrent.PeerMsgs (Peer(..)) +import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..), UdpTrackerResponse(..)) import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort) import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats) @@ -53,7 +56,7 @@ data UDPRequest = ConnectReq Word32 deriving (Show, Eq) data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id - | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)] + | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)] | ScrapeResp Integer Integer Integer Integer | ErrorResp Integer String deriving (Show, Eq) @@ -93,10 +96,10 @@ instance Binary UDPRequest where putWord64be (fromIntegral down) putWord64be (fromIntegral left) putWord64be (fromIntegral up) - putWord32be $ fromIntegral (eventToInteger None) + putWord32be $ fromIntegral (eventToInteger event) putWord32be 0 putWord32be 0 - putWord32be $ fromIntegral (-1) + putWord32be 10 putWord16be $ fromIntegral port put (ScrapeReq _ _ _) = undefined get = undefined @@ -106,7 +109,7 @@ instance Binary UDPResponse where get = do a <- getWord32be -- action case a of - 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord32be) + 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord64be) 1 -> do tid <- fromIntegral <$> getWord32be interval' <- fromIntegral <$> getWord32be @@ -129,7 +132,6 @@ instance Binary UDPResponse where sendRequest :: UDPTrackerHandle -> ByteString -> IO () sendRequest h req = do n <- sendTo (sock h) req (addr h) - print $ BC.length req -- sanity check with n? return () @@ -150,7 +152,6 @@ connectResponse :: Word32 -> ReaderT UDPTrackerHandle IO Word64 connectResponse tid = do h <- ask resp <- liftIO $ recvResponse h - liftIO $ print resp -- check if nbytes is at least 16 bytes long case resp of (ConnectResp tidr cid) -> @@ -166,17 +167,11 @@ announceRequest :: Word64 -> ByteString -> String -> Word64 -> Word64 -> Word64 announceRequest cid infohash peerId up down left port = do h <- ask tidi <- liftIO randomIO - -- connId transId infohash peerId down left up event port) let pkt = encode $ AnnounceReq cid tidi infohash peerId down left up None port liftIO $ sendRequest h (toStrict pkt) return tidi -data PeerStats = PeerStats { leechers :: Word32 - , seeders :: Word32 - , peers :: [(IP, Port)] - } deriving (Show) - -announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats +announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO UdpTrackerResponse announceResponse tid = do h <- ask resp <- liftIO $ recvResponse h @@ -185,21 +180,21 @@ announceResponse tid = do if tidr == tid then do liftIO $ putStrLn "announce succeeded" - return $ PeerStats ls ss xs + return $ UdpTrackerResponse ls ss interval xs else - return $ PeerStats 0 0 [] - _ -> return $ PeerStats 0 0 [] + return $ UdpTrackerResponse 0 0 0 [] + _ -> return $ UdpTrackerResponse 0 0 0 [] -getIPPortPairs :: Get [(IP, Port)] +getIPPortPairs :: Get [Peer] getIPPortPairs = do empty <- isEmpty if empty then return [] else do - ip <- toIP <$> getByteString 6 + ip <- toIP <$> getByteString 4 port <- toPort <$> getByteString 2 ipportpairs <- getIPPortPairs - return $ (ip, port) : ipportpairs + return $ (Peer ip port) : ipportpairs startSession :: String -> Port -> IO UDPTrackerHandle startSession host port = do @@ -214,22 +209,29 @@ closeSession :: UDPTrackerHandle -> IO () closeSession (UDPTrackerHandle s _ _) = close s trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO () -trackerLoop url sport peerId infohash fschan tstate = do - st' <- FS.getStats fschan - st <- readMVar st' - let up = FS.bytesRead st - down = FS.bytesWritten st - port = getPort url - host = getHostname url - putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port) +trackerLoop url sport peerId infohash fschan tstate = forever $ do + st <- readMVar <$> FS.getStats fschan + up <- fmap FS.bytesRead st + down <- fmap FS.bytesWritten st handle <- startSession host port - flip runReaderT handle $ do - t1 <- connectRequest - cid <- connectResponse t1 - liftIO $ print "connect response" - liftIO $ print cid - t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport) - liftIO $ print "announce request" - liftIO $ print t2 - stats <- announceResponse t2 - liftIO $ print stats + stats <- timeout (15*(10^6)) $ worker handle up down + case stats of + Nothing -> closeSession handle + Just stats' -> do + ps <- isEmptyMVar $ connectedPeers tstate + if ps + then + putMVar (connectedPeers tstate) (peers stats') + else + void $ swapMVar (connectedPeers tstate) (peers stats') + threadDelay $ fromIntegral (interval stats') * (10^6) + return () + where + port = getPort url + host = getHostname url + worker handle up down = flip runReaderT handle $ do + t1 <- connectRequest + cid <- connectResponse t1 + t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport) + stats <- announceResponse t2 + return stats