sendMsg,
getMsg,
Peer(..),
+ makePeer,
PeerMsg(..)
) where
import System.IO (Handle)
import Data.ByteString (ByteString, pack, unpack, concat, hGet, hPut, singleton)
import Data.ByteString.Lazy (fromStrict, fromChunks, toStrict)
+import Data.ByteString.Char8 as BC (splitAt)
import qualified Data.ByteString.Char8 as BC (replicate, pack)
import Control.Monad (replicateM, liftM)
import Control.Applicative (liftA3)
import Data.Binary.Put (putWord32be, putWord16be, putWord8)
import Data.Binary.Get (getWord32be, getWord16be, getWord8, runGet)
+import FuncTorrent.Utils (toIP, toPort)
+
-- | Peer is a PeerID, IP address, port tuple
data Peer = Peer ID IP Port
deriving (Show, Eq)
bsToInt :: ByteString -> Int
bsToInt x = fromIntegral (runGet getWord32be (fromChunks (return x)))
+
+makePeer :: ByteString -> Peer
+makePeer peer = Peer "" (toIP ip') (toPort port')
+ where (ip', port') = splitAt 4 peer
import FuncTorrent.Bencode (BVal(..))
import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
import FuncTorrent.Network (sendGetRequest)
-import FuncTorrent.Peer (Peer(..))
-import FuncTorrent.Utils (splitN, toIP, toPort, IP, Port)
+import FuncTorrent.PeerMsgs (Peer(..), makePeer)
+import FuncTorrent.Utils (splitN, IP, Port)
import FuncTorrent.Tracker.Types(TState(..), TrackerResponse(..))
("event", "started")]
trackerLoop :: String -> PortNumber -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
-trackerLoop url port peerId infohash fschan tstate = forever $ do
+trackerLoop url sport peerId infohash fschan tstate = forever $ do
st' <- FS.getStats fschan
st <- readMVar st'
let up = FS.bytesRead st
down = FS.bytesWritten st
- resp <- sendGetRequest url $ mkArgs port peerId up down (left tstate) infohash
+ resp <- sendGetRequest url $ mkArgs sport peerId up down (left tstate) infohash
case Benc.decode resp of
Left e ->
return () -- $ pack (show e)
where
(Bdict body) = resp
-makePeer :: ByteString -> Peer
-makePeer peer = Peer "" (toIP ip') (toPort port')
- where (ip', port') = splitAt 4 peer
) where
import Control.Applicative (liftA2)
-import Control.Monad (liftM)
-import Control.Concurrent.MVar (readMVar)
+import Control.Monad (liftM, forever, void)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar (readMVar, putMVar, isEmptyMVar, swapMVar)
import Control.Monad.Reader (ReaderT, runReaderT, ask, liftIO)
import Data.Binary (Binary(..), encode, decode)
import Data.Binary.Get (Get, isEmpty, getWord32be, getWord64be, getByteString)
import Network.Socket (Socket, Family( AF_INET ), SocketType( Datagram ), defaultProtocol, SockAddr(..), socket, close, getAddrInfo, addrAddress, SockAddr(..))
import Network.Socket.ByteString (sendTo, recvFrom)
import System.Random (randomIO)
+import System.Timeout (timeout)
+import FuncTorrent.Peer (Peer(..))
import FuncTorrent.Tracker.Types (TrackerEventState(..), TState(..))
import FuncTorrent.Utils (IP, Port, toIP, toPort, getHostname, getPort)
import qualified FuncTorrent.FileSystem as FS (MsgChannel, Stats(..), getStats)
deriving (Show, Eq)
data UDPResponse = ConnectResp Word32 Word64 -- transaction_id connection_id
- | AnnounceResp Word32 Word32 Word32 Word32 [(IP, Port)] -- transaction_id interval leechers seeders [(ip, port)]
+ | AnnounceResp Word32 Word32 Word32 Word32 [Peer] -- transaction_id interval leechers seeders [(ip, port)]
| ScrapeResp Integer Integer Integer Integer
| ErrorResp Integer String
deriving (Show, Eq)
data PeerStats = PeerStats { leechers :: Word32
, seeders :: Word32
- , peers :: [(IP, Port)]
+ , interval :: Word32
+ , peers :: [Peer]
} deriving (Show)
announceResponse :: Word32 -> ReaderT UDPTrackerHandle IO PeerStats
if tidr == tid
then do
liftIO $ putStrLn "announce succeeded"
- return $ PeerStats ls ss xs
+ return $ PeerStats ls ss interval xs
else
- return $ PeerStats 0 0 []
- _ -> return $ PeerStats 0 0 []
+ return $ PeerStats 0 0 0 []
+ _ -> return $ PeerStats 0 0 0 []
-getIPPortPairs :: Get [(IP, Port)]
+getIPPortPairs :: Get [Peer]
getIPPortPairs = do
empty <- isEmpty
if empty
ip <- toIP <$> getByteString 4
port <- toPort <$> getByteString 2
ipportpairs <- getIPPortPairs
- return $ (ip, port) : ipportpairs
+ return $ (Peer "" ip port) : ipportpairs
startSession :: String -> Port -> IO UDPTrackerHandle
startSession host port = do
closeSession (UDPTrackerHandle s _ _) = close s
trackerLoop :: String -> Port -> String -> ByteString -> FS.MsgChannel -> TState -> IO ()
-trackerLoop url sport peerId infohash fschan tstate = do
- st' <- FS.getStats fschan
- st <- readMVar st'
- let up = FS.bytesRead st
- down = FS.bytesWritten st
- port = getPort url
- host = getHostname url
- putStrLn $ "host = " ++ (show host) ++ " port= " ++ (show port)
+trackerLoop url sport peerId infohash fschan tstate = forever $ do
+ st <- fmap readMVar $ FS.getStats fschan
+ up <- fmap FS.bytesRead st
+ down <- fmap FS.bytesWritten st
handle <- startSession host port
- flip runReaderT handle $ do
- t1 <- connectRequest
- cid <- connectResponse t1
- t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
- stats <- announceResponse t2
- liftIO $ print stats
--- _ <- threadDelay $
+ stats <- timeout (15*(10^6)) $ worker handle up down
+ case stats of
+ Nothing -> closeSession handle
+ Just stats' -> do
+ ps <- isEmptyMVar $ connectedPeers tstate
+ if ps
+ then
+ putMVar (connectedPeers tstate) (peers stats')
+ else
+ void $ swapMVar (connectedPeers tstate) (peers stats')
+ threadDelay $ fromIntegral (interval stats') * (10^6)
+ return ()
+ where
+ port = getPort url
+ host = getHostname url
+ worker handle up down = flip runReaderT handle $ do
+ t1 <- connectRequest
+ cid <- connectResponse t1
+ t2 <- announceRequest cid infohash peerId (fromIntegral up) (fromIntegral down) (fromIntegral (left tstate)) (fromIntegral sport)
+ stats <- announceResponse t2
+ return stats