) where
import Control.Applicative (liftA2)
-import Control.Monad (liftM)
-import Control.Concurrent.MVar (readMVar)
+import Control.Monad (liftM, forever, void)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar)
import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO)
import Data.Binary (Binary(..), encode, decode)
-import Data.Binary.Get (Get, isEmpty, getWord32be, getByteString)
+import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString)
import Data.Binary.Put (putWord16be, putWord64be, putWord32be, putByteString)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BC
import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..))
import Network.Socket.ByteString (sendTo, recvFrom)
import System.Random (randomIO)
+import System.Timeout (timeout)
+import FuncTorrent.PeerMsgs (Peer(..))
import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..))
import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort)
import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
deriving (Show, Eq)
data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id
- | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)]
+ | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)]
| ScrapeResp Integer Integer Integer Integer
| ErrorResp Integer String
deriving (Show, Eq)
putWord64be (fromIntegral down)
putWord64be (fromIntegral left)
putWord64be (fromIntegral up)
- putWord32be $ fromIntegral (eventToInteger None)
+ putWord32be $ fromIntegral (eventToInteger event)
putWord32be 0
putWord32be 0
- putWord32be $ fromIntegral (-1)
+ putWord32be 10
putWord16be $ fromIntegral port
put (ScrapeReq _ _ _) = undefined
get = undefined
get = do
a <- getWord32be -- action
case a of
- 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord32be)
+ 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord64be)
1 -> do
tid <- fromIntegral <$> getWord32be
interval' <- fromIntegral <$> getWord32be
sendRequest :: UDPTrackerHandle -> ByteString -> IO ()
sendRequest h req = do
n <- sendTo (sock h) req (addr h)
- print $ BC.length req
-- sanity check with n?
return ()
connectResponse tid = do
h <- ask
resp <- liftIO $ recvResponse h
- liftIO $ print resp
-- check if nbytes is at least 16 bytes long
case resp of
(ConnectResp tidr cid) ->
announceRequest cid infohash peerId up down left port = do
h <- ask
tidi <- liftIO randomIO
- -- connId transId infohash peerId down left up event port)
let pkt = encode $ AnnounceReq cid tidi infohash peerId down left up None port
liftIO $ sendRequest h (toStrict pkt)
return tidi
data PeerStats = PeerStats { leechers :: Word32
, seeders :: Word32
- , peers :: [(IP, Port)]
+ , interval :: Word32
+ , peers :: [Peer]
} deriving (Show)
announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats
if tidr == tid
then do
liftIO $ putStrLn "announce succeeded"
- return $ PeerStats ls ss xs
+ return $ PeerStats ls ss interval xs
else
- return $ PeerStats 0 0 []
- _ -> return $ PeerStats 0 0 []
+ return $ PeerStats 0 0 0 []
+ _ -> return $ PeerStats 0 0 0 []
-getIPPortPairs :: Get [(IP, Port)]
+getIPPortPairs :: Get [Peer]
getIPPortPairs = do
empty <- isEmpty
if empty
then return []
else do
- ip <- toIP <$> getByteString 6
+ ip <- toIP <$> getByteString 4
port <- toPort <$> getByteString 2
ipportpairs <- getIPPortPairs
- return $ (ip, port) : ipportpairs
+ return $ (Peer ip port) : ipportpairs
startSession :: String -> Port -> IO UDPTrackerHandle
startSession host port = do
closeSession (UDPTrackerHandle s _ _) = close s
trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
-trackerLoop url sport peerId infohash fschan tstate = do
- st' <- FS.getStats fschan
- st <- readMVar st'
- let up = FS.bytesRead st
- down = FS.bytesWritten st
- port = getPort url
- host = getHostname url
- putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port)
+trackerLoop url sport peerId infohash fschan tstate = forever $ do
+ st <- readMVar <$> FS.getStats fschan
+ up <- fmap FS.bytesRead st
+ down <- fmap FS.bytesWritten st
handle <- startSession host port
- flip runReaderT handle $ do
- t1 <- connectRequest
- cid <- connectResponse t1
- liftIO $ print "connect response"
- liftIO $ print cid
- t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
- liftIO $ print "announce request"
- liftIO $ print t2
- stats <- announceResponse t2
- liftIO $ print stats
+ stats <- timeout (15*(10^6)) $ worker handle up down
+ case stats of
+ Nothing -> closeSession handle
+ Just stats' -> do
+ ps <- isEmptyMVar $ connectedPeers tstate
+ if ps
+ then
+ putMVar (connectedPeers tstate) (peers stats')
+ else
+ void $ swapMVar (connectedPeers tstate) (peers stats')
+ threadDelay $ fromIntegral (interval stats') * (10^6)
+ return ()
+ where
+ port = getPort url
+ host = getHostname url
+ worker handle up down = flip runReaderT handle $ do
+ t1 <- connectRequest
+ cid <- connectResponse t1
+ t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
+ stats <- announceResponse t2
+ return stats