) where
import Control.Applicative (liftA2)
-import Control.Monad (liftM)
-import Control.Concurrent.MVar (readMVar)
+import Control.Monad (forever, void)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar)
import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO)
import Data.Binary (Binary(..), encode, decode)
-import Data.Binary.Get (Get, isEmpty, getWord32be, getByteString)
+import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString)
import Data.Binary.Put (putWord16be, putWord64be, putWord32be, putByteString)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BC
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.Word (Word16, Word32, Word64)
+import Network (PortNumber)
import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..))
import Network.Socket.ByteString (sendTo, recvFrom)
import System.Random (randomIO)
+import System.Timeout (timeout)
-import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..))
-import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort)
+import FuncTorrent.PeerMsgs (Peer(..))
+import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..), UdpTrackerResponse(..))
+import FuncTorrent.Utils (Port, toIP, toPort, getHostname, getPort)
import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
-- UDP tracker: http://bittorrent.org/beps/bep_0015.html
deriving (Show, Eq)
data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id
- | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)]
+ | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)]
| ScrapeResp Integer Integer Integer Integer
| ErrorResp Integer String
deriving (Show, Eq)
data UDPTrackerHandle = UDPTrackerHandle { sock :: Socket
, addr :: SockAddr
- , tid :: Word32
}
actionToInteger :: Action -> Integer
actionToInteger Announce = 1
actionToInteger Scrape = 2
-intToAction :: Integer -> Action
-intToAction 0 = Connect
-intToAction 1 = Announce
-intToAction 2 = Scrape
-
eventToInteger :: TrackerEventState -> Integer
eventToInteger None = 0
eventToInteger Completed = 1
putWord64be 0x41727101980
putWord32be $ fromIntegral (actionToInteger Connect)
putWord32be (fromIntegral transId)
- put (AnnounceReq connId transId infohash peerId down left up event port) = do
+ put (AnnounceReq connId transId infohash peerId down left' up event port) = do
putWord64be $ fromIntegral connId
putWord32be $ fromIntegral (actionToInteger Announce)
putWord32be $ fromIntegral transId
putByteString infohash
putByteString (BC.pack peerId)
putWord64be (fromIntegral down)
- putWord64be (fromIntegral left)
+ putWord64be (fromIntegral left')
putWord64be (fromIntegral up)
- putWord32be $ fromIntegral (eventToInteger None)
+ putWord32be $ fromIntegral (eventToInteger event)
putWord32be 0
putWord32be 0
- putWord32be $ fromIntegral (-1)
+ putWord32be 10
putWord16be $ fromIntegral port
put (ScrapeReq _ _ _) = undefined
get = undefined
get = do
a <- getWord32be -- action
case a of
- 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord32be)
+ 0 -> liftA2 ConnectResp (fromIntegral <$> getWord32be) (fromIntegral <$> getWord64be)
1 -> do
- tid <- fromIntegral <$> getWord32be
+ tid' <- fromIntegral <$> getWord32be
interval' <- fromIntegral <$> getWord32be
l <- getWord32be -- leechers
s <- getWord32be -- seeders
ipportpairs <- getIPPortPairs -- [(ip, port)]
- return $ AnnounceResp tid interval' l s ipportpairs
+ return $ AnnounceResp tid' interval' l s ipportpairs
2 -> do
- tid <- fromIntegral <$> getWord32be
+ tid' <- fromIntegral <$> getWord32be
_ <- getWord32be
_ <- getWord32be
_ <- getWord32be
- return $ ScrapeResp tid 0 0 0
+ return $ ScrapeResp tid' 0 0 0
3 -> do -- error response
- tid <- fromIntegral <$> getWord32be
+ tid' <- fromIntegral <$> getWord32be
bs <- getByteString 4
- return $ ErrorResp tid $ BC.unpack bs
+ return $ ErrorResp tid' $ BC.unpack bs
_ -> error ("unknown response action type: " ++ show a)
sendRequest :: UDPTrackerHandle -> ByteString -> IO ()
sendRequest h req = do
n <- sendTo (sock h) req (addr h)
- print $ BC.length req
-- sanity check with n?
return ()
connectResponse tid = do
h <- ask
resp <- liftIO $ recvResponse h
- liftIO $ print resp
-- check if nbytes is at least 16 bytes long
case resp of
(ConnectResp tidr cid) ->
_ -> return 0
announceRequest :: Word64 -> ByteString -> String -> Word64 -> Word64 -> Word64 -> Word16 -> ReaderT UDPTrackerHandle IO Word32
-announceRequest cid infohash peerId up down left port = do
+announceRequest cid infohash peerId up down left' port = do
h <- ask
tidi <- liftIO randomIO
- -- connId transId infohash peerId down left up event port)
- let pkt = encode $ AnnounceReq cid tidi infohash peerId down left up None port
+ let pkt = encode $ AnnounceReq cid tidi infohash peerId down left' up None port
liftIO $ sendRequest h (toStrict pkt)
return tidi
-data PeerStats = PeerStats { leechers :: Word32
- , seeders :: Word32
- , peers :: [(IP, Port)]
- } deriving (Show)
-
-announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats
+announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO UdpTrackerResponse
announceResponse tid = do
h <- ask
resp <- liftIO $ recvResponse h
case resp of
- (AnnounceResp tidr interval ss ls xs) ->
+ (AnnounceResp tidr interval' ss ls xs) ->
if tidr == tid
then do
liftIO $ putStrLn "announce succeeded"
- return $ PeerStats ls ss xs
+ return $ UdpTrackerResponse ls ss interval' xs
else
- return $ PeerStats 0 0 []
- _ -> return $ PeerStats 0 0 []
+ return $ UdpTrackerResponse 0 0 0 []
+ _ -> return $ UdpTrackerResponse 0 0 0 []
-getIPPortPairs :: Get [(IP, Port)]
+getIPPortPairs :: Get [Peer]
getIPPortPairs = do
empty <- isEmpty
if empty
then return []
else do
- ip <- toIP <$> getByteString 6
+ ip <- toIP <$> getByteString 4
port <- toPort <$> getByteString 2
ipportpairs <- getIPPortPairs
- return $ (ip, port) : ipportpairs
+ return $ (Peer ip port) : ipportpairs
startSession :: String -> Port -> IO UDPTrackerHandle
startSession host port = do
, addr = (SockAddrInet (fromIntegral port) ip) }
closeSession :: UDPTrackerHandle -> IO ()
-closeSession (UDPTrackerHandle s _ _) = close s
-
-trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
-trackerLoop url sport peerId infohash fschan tstate = do
- st' <- FS.getStats fschan
- st <- readMVar st'
- let up = FS.bytesRead st
- down = FS.bytesWritten st
- port = getPort url
- host = getHostname url
- putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port)
+closeSession (UDPTrackerHandle s _) = close s
+
+trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
+trackerLoop url sport peerId infohash fschan tstate = forever $ do
+ st <- readMVar <$> FS.getStats fschan
+ up <- fmap FS.bytesRead st
+ down <- fmap FS.bytesWritten st
handle <- startSession host port
- flip runReaderT handle $ do
- t1 <- connectRequest
- cid <- connectResponse t1
- liftIO $ print "connect response"
- liftIO $ print cid
- t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
- liftIO $ print "announce request"
- liftIO $ print t2
- stats <- announceResponse t2
- liftIO $ print stats
+ stats <- timeout (15*oneSec) $ worker handle up down
+ case stats of
+ Nothing -> closeSession handle
+ Just stats' -> do
+ ps <- isEmptyMVar $ connectedPeers tstate
+ if ps
+ then
+ putMVar (connectedPeers tstate) (peers stats')
+ else
+ void $ swapMVar (connectedPeers tstate) (peers stats')
+ threadDelay $ fromIntegral (interval stats') * oneSec
+ return ()
+ where
+ oneSec = 1000000
+ port = getPort url
+ host = getHostname url
+ worker handle up down = flip runReaderT handle $ do
+ t1 <- connectRequest
+ cid <- connectResponse t1
+ t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
+ stats <- announceResponse t2
+ return stats