) where
import Control.Applicative (liftA2)
-import Control.Monad (liftM)
-import Control.Concurrent.MVar (readMVar)
+import Control.Monad (liftM, forever, void)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar)
import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO)
import Data.Binary (Binary(..), encode, decode)
import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString)
import qualified Data.ByteString.Char8 as BC
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.Word (Word16, Word32, Word64)
+import Network (PortNumber)
import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..))
import Network.Socket.ByteString (sendTo, recvFrom)
import System.Random (randomIO)
+import System.Timeout (timeout)
-import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..))
+import FuncTorrent.PeerMsgs (Peer(..))
+import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..), UdpTrackerResponse(..))
import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort)
import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
deriving (Show, Eq)
data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id
- | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)]
+ | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)]
| ScrapeResp Integer Integer Integer Integer
| ErrorResp Integer String
deriving (Show, Eq)
putWord32be $ fromIntegral (eventToInteger event)
putWord32be 0
putWord32be 0
- putWord32be 20
+ putWord32be 10
putWord16be $ fromIntegral port
put (ScrapeReq _ _ _) = undefined
get = undefined
sendRequest :: UDPTrackerHandle -> ByteString -> IO ()
sendRequest h req = do
n <- sendTo (sock h) req (addr h)
- print $ BC.length req
-- sanity check with n?
return ()
connectResponse tid = do
h <- ask
resp <- liftIO $ recvResponse h
- liftIO $ print resp
-- check if nbytes is at least 16 bytes long
case resp of
(ConnectResp tidr cid) ->
liftIO $ sendRequest h (toStrict pkt)
return tidi
-data PeerStats = PeerStats { leechers :: Word32
- , seeders :: Word32
- , peers :: [(IP, Port)]
- } deriving (Show)
-
-announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats
+announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO UdpTrackerResponse
announceResponse tid = do
h <- ask
resp <- liftIO $ recvResponse h
if tidr == tid
then do
liftIO $ putStrLn "announce succeeded"
- return $ PeerStats ls ss xs
+ return $ UdpTrackerResponse ls ss interval xs
else
- return $ PeerStats 0 0 []
- _ -> return $ PeerStats 0 0 []
+ return $ UdpTrackerResponse 0 0 0 []
+ _ -> return $ UdpTrackerResponse 0 0 0 []
-getIPPortPairs :: Get [(IP, Port)]
+getIPPortPairs :: Get [Peer]
getIPPortPairs = do
empty <- isEmpty
if empty
then return []
else do
- ip <- toIP <$> getByteString 6
+ ip <- toIP <$> getByteString 4
port <- toPort <$> getByteString 2
ipportpairs <- getIPPortPairs
- return $ (ip, port) : ipportpairs
+ return $ (Peer ip port) : ipportpairs
startSession :: String -> Port -> IO UDPTrackerHandle
startSession host port = do
closeSession :: UDPTrackerHandle -> IO ()
closeSession (UDPTrackerHandle s _ _) = close s
-trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
-trackerLoop url sport peerId infohash fschan tstate = do
- st' <- FS.getStats fschan
- st <- readMVar st'
- let up = FS.bytesRead st
- down = FS.bytesWritten st
- port = getPort url
- host = getHostname url
- putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port)
+trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
+trackerLoop url sport peerId infohash fschan tstate = forever $ do
+ st <- readMVar <$> FS.getStats fschan
+ up <- fmap FS.bytesRead st
+ down <- fmap FS.bytesWritten st
handle <- startSession host port
- flip runReaderT handle $ do
- t1 <- connectRequest
- cid <- connectResponse t1
- liftIO $ print "connected: connect id"
- liftIO $ print cid
- t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
- liftIO $ print "announce request"
- liftIO $ print t2
- liftIO $ print "waiting for announce response"
- stats <- announceResponse t2
- liftIO $ print stats
+ stats <- timeout (15*(10^6)) $ worker handle up down
+ case stats of
+ Nothing -> closeSession handle
+ Just stats' -> do
+ ps <- isEmptyMVar $ connectedPeers tstate
+ if ps
+ then
+ putMVar (connectedPeers tstate) (peers stats')
+ else
+ void $ swapMVar (connectedPeers tstate) (peers stats')
+ threadDelay $ fromIntegral (interval stats') * (10^6)
+ return ()
+ where
+ port = getPort url
+ host = getHostname url
+ worker handle up down = flip runReaderT handle $ do
+ t1 <- connectRequest
+ cid <- connectResponse t1
+ t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
+ stats <- announceResponse t2
+ return stats