X-Git-Url: https://git.rkrishnan.org/?p=functorrent.git;a=blobdiff_plain;f=src%2FFuncTorrent%2FTracker%2FUdp.hs;h=aaa99472b44c3a9d72f955afd0f35a5e8c05d534;hp=aa7bfd5a14551e3bb0a669082e21920a0c40d758;hb=272216c101f5f411726898f90355956ab9a105b7;hpb=1333f3f066c8f3666b8493dd05a2ad26395f3fa3 diff --git a/src/FuncTorrent/Tracker/Udp.hs b/src/FuncTorrent/Tracker/Udp.hs index aa7bfd5..aaa9947 100644 --- a/src/FuncTorrent/Tracker/Udp.hs +++ b/src/FuncTorrent/Tracker/Udp.hs @@ -18,23 +18,31 @@ -} {-# LANGUAGE OverloadedStrings #-} -module Functorrent.Tracker.Udp - ( +module FuncTorrent.Tracker.Udp + (trackerLoop ) where import Control.Applicative (liftA2) -import Control.Monad.Error (ErrorT) -import Control.Monad.Reader (ReaderT, runReaderT, ask) +import Control.Monad (liftM, forever, void) +import Control.Concurrent (threadDelay) +import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar) +import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO) import Data.Binary (Binary(..), encode, decode) -import Data.Binary.Get (Get, isEmpty, getWord32be, getByteString) +import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString) import Data.Binary.Put (putWord16be, putWord64be, putWord32be, putByteString) -import Data.ByteString.Char8 as BC -import Data.ByteString.Lazy (fromStrict) -import Data.Word (Word32) -import Network.Socket (Socket, SockAddr, sendTo, recvFrom) +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as BC +import Data.ByteString.Lazy (fromStrict, toStrict) +import Data.Word (Word16, Word32, Word64) +import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..)) +import Network.Socket.ByteString (sendTo, recvFrom) import System.Random (randomIO) +import System.Timeout (timeout) -import FuncTorrent.Tracker.Types (TrackerEventState(..), IP, Port) +import FuncTorrent.PeerMsgs (Peer(..)) +import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..), UdpTrackerResponse(..)) +import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort) +import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats) -- UDP tracker: http://bittorrent.org/beps/bep_0015.html data Action = Connect @@ -43,12 +51,12 @@ data Action = Connect deriving (Show, Eq) data UDPRequest = ConnectReq Word32 - | AnnounceReq Integer Integer ByteString String Integer Integer Integer TrackerEventState Integer + | AnnounceReq Word64 Word32 ByteString String Word64 Word64 Word64 TrackerEventState Word16 | ScrapeReq Integer Integer ByteString deriving (Show, Eq) -data UDPResponse = ConnectResp Integer Integer -- transaction_id connection_id - | AnnounceResp Integer Integer Integer Integer [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)] +data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id + | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)] | ScrapeResp Integer Integer Integer Integer | ErrorResp Integer String deriving (Show, Eq) @@ -72,6 +80,7 @@ eventToInteger :: TrackerEventState -> Integer eventToInteger None = 0 eventToInteger Completed = 1 eventToInteger Started = 2 +eventToInteger Stopped = 3 instance Binary UDPRequest where put (ConnectReq transId) = do @@ -87,10 +96,10 @@ instance Binary UDPRequest where putWord64be (fromIntegral down) putWord64be (fromIntegral left) putWord64be (fromIntegral up) - putWord32be $ fromIntegral (eventToInteger None) + putWord32be $ fromIntegral (eventToInteger event) putWord32be 0 - -- key is optional, we will not send it for now - putWord32be $ fromIntegral (-1) + putWord32be 0 + putWord32be 10 putWord16be $ fromIntegral port put (ScrapeReq _ _ _) = undefined get = undefined @@ -100,14 +109,14 @@ instance Binary UDPResponse where get = do a <- getWord32be -- action case a of - 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord32be) + 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord64be) 1 -> do tid <- fromIntegral <$> getWord32be interval' <- fromIntegral <$> getWord32be - _ <- getWord32be -- leechers - _ <- getWord32be -- seeders + l <- getWord32be -- leechers + s <- getWord32be -- seeders ipportpairs <- getIPPortPairs -- [(ip, port)] - return $ AnnounceResp tid interval' 0 0 ipportpairs + return $ AnnounceResp tid interval' l s ipportpairs 2 -> do tid <- fromIntegral <$> getWord32be _ <- getWord32be @@ -117,59 +126,112 @@ instance Binary UDPResponse where 3 -> do -- error response tid <- fromIntegral <$> getWord32be bs <- getByteString 4 - return $ ErrorResp tid $ unpack bs + return $ ErrorResp tid $ BC.unpack bs _ -> error ("unknown response action type: " ++ show a) -sendRequest :: UDPTrackerHandle -> UDPRequest -> IO () +sendRequest :: UDPTrackerHandle -> ByteString -> IO () sendRequest h req = do n <- sendTo (sock h) req (addr h) -- sanity check with n? return () -recvResponse :: UDPTrackerHandle -> ErrorT String IO UDPResponse +recvResponse :: UDPTrackerHandle -> IO UDPResponse recvResponse h = do - (bs, nbytes, saddr) <- recvFrom (sock h) 20 - -- check if nbytes is at least 16 bytes long + (bs, saddr) <- recvFrom (sock h) (16*1024) return $ decode $ fromStrict bs -connectRequest :: ReaderT UDPTrackerHandle IO Integer +connectRequest :: ReaderT UDPTrackerHandle IO Word32 connectRequest = do h <- ask - let pkt = encode $ ConnectReq (tid h) - sendRequest h pkt + tidi <- liftIO randomIO + let pkt = encode $ ConnectReq tidi + liftIO $ sendRequest h (toStrict pkt) + return tidi -connectResponse :: ReaderT UDPTrackerHandle IO Bool -connectResponse = do +connectResponse :: Word32 -> ReaderT UDPTrackerHandle IO Word64 +connectResponse tid = do h <- ask - + resp <- liftIO $ recvResponse h + -- check if nbytes is at least 16 bytes long + case resp of + (ConnectResp tidr cid) -> + if tidr == tid + then do + liftIO $ putStrLn "connect succeeded" + return cid + else + return 0 + _ -> return 0 + +announceRequest :: Word64 -> ByteString -> String -> Word64 -> Word64 -> Word64 -> Word16 -> ReaderT UDPTrackerHandle IO Word32 +announceRequest cid infohash peerId up down left port = do + h <- ask + tidi <- liftIO randomIO + let pkt = encode $ AnnounceReq cid tidi infohash peerId down left up None port + liftIO $ sendRequest h (toStrict pkt) + return tidi -getIPPortPairs :: Get [(IP, Port)] +announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO UdpTrackerResponse +announceResponse tid = do + h <- ask + resp <- liftIO $ recvResponse h + case resp of + (AnnounceResp tidr interval ss ls xs) -> + if tidr == tid + then do + liftIO $ putStrLn "announce succeeded" + return $ UdpTrackerResponse ls ss interval xs + else + return $ UdpTrackerResponse 0 0 0 [] + _ -> return $ UdpTrackerResponse 0 0 0 [] + +getIPPortPairs :: Get [Peer] getIPPortPairs = do empty <- isEmpty if empty then return [] else do - ip <- toIP <$> getByteString 6 + ip <- toIP <$> getByteString 4 port <- toPort <$> getByteString 2 ipportpairs <- getIPPortPairs - return $ (ip, port) : ipportpairs - -getResponse :: Socket -> IO UDPResponse -getResponse s = do - -- connect packet is 16 bytes long - -- announce packet is atleast 20 bytes long - bs <- recv s (16*1024) - return $ decode $ fromStrict bs + return $ (Peer ip port) : ipportpairs - -udpTrackerLoop :: PortNumber -> String -> Metainfo -> TState -> IO String -udpTrackerLoop port peerId m st = do - -- h <- connectTo "exodus.desync.com" (PortNumber 6969) +startSession :: String -> Port -> IO UDPTrackerHandle +startSession host port = do s <- socket AF_INET Datagram defaultProtocol - hostAddr <- inet_addr "185.37.101.229" + addrinfos <- getAddrInfo Nothing (Just host) (Just (show port)) + let (SockAddrInet p ip) = addrAddress $ head addrinfos putStrLn "connected to tracker" - _ <- sendTo s (toStrict $ encode (ConnectReq 42)) (SockAddrInet 2710 hostAddr) - putStrLn "--> sent ConnectReq to tracker" - resp <- recv s 16 - putStrLn "<-- recv ConnectResp from tracker" - return $ show resp + return UDPTrackerHandle { sock = s + , addr = (SockAddrInet (fromIntegral port) ip) } + +closeSession :: UDPTrackerHandle -> IO () +closeSession (UDPTrackerHandle s _ _) = close s + +trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO () +trackerLoop url sport peerId infohash fschan tstate = forever $ do + st <- readMVar <$> FS.getStats fschan + up <- fmap FS.bytesRead st + down <- fmap FS.bytesWritten st + handle <- startSession host port + stats <- timeout (15*(10^6)) $ worker handle up down + case stats of + Nothing -> closeSession handle + Just stats' -> do + ps <- isEmptyMVar $ connectedPeers tstate + if ps + then + putMVar (connectedPeers tstate) (peers stats') + else + void $ swapMVar (connectedPeers tstate) (peers stats') + threadDelay $ fromIntegral (interval stats') * (10^6) + return () + where + port = getPort url + host = getHostname url + worker handle up down = flip runReaderT handle $ do + t1 <- connectRequest + cid <- connectResponse t1 + t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport) + stats <- announceResponse t2 + return stats