python-具有redis pubsub的Django / gevent socket.IO.我在哪里放东西?

我有一个隔离的python脚本,该脚本仅从Twitter的流API捕获数据,然后在接收到每条消息时,使用redis pubsub将其发布到“ tweets”频道.这是该脚本:

def main():
    username = "username"
    password = "password"
    track_list = ["apple", "microsoft", "google"]

    with tweetstream.FilterStream(username, password, track=track_list) as stream:
        for tweet in stream:
            text = tweet["text"]
            user = tweet["user"]["screen_name"]
            message = {"text": text, "user": user}
            db.publish("tweets", message)

if __name__ == '__main__':
    try:
        print "Started..."
        main()
    except KeyboardInterrupt:
        print '\nGoodbye!'

我的服务器端socket.io实现是使用django-socketio(基于gevent-socketio)https://github.com/stephenmcd/django-socketio完成的,它仅提供了一些辅助装饰器以及broadcast_channel方法.因为它是在django中完成的,所以我只是将这段代码简单地放在views.py中,以便将它们导入.我的views.py代码:

def index(request):
    return render_to_response("twitter_app/index.html", {
    }, context_instance=RequestContext(request))

def _listen(socket):
    db = redis.Redis(host="localhost", port=6379, db=0)
    client = db.pubsub()
    client.subscribe("tweets")
    tweets = client.listen()

    while True:
        tweet = tweets.next()
        tweet_data = ast.literal_eval(tweet["data"])
        message = {"text": tweet_data["text"], "user": tweet_data["user"], "type": "tweet"}
        socket.broadcast_channel(message)

@on_subscribe(channel="livestream")
def subscribe(request, socket, context, channel):
    g = Greenlet.spawn(_listen, socket)

客户端socket.io JavaScript仅连接并订阅频道“实时流”并捕获到该频道的所有接收到的消息:

var socket = new io.Socket();
socket.connect();
socket.on('connect', function() {
    socket.subscribe("livestream");
});
socket.on('message', function(data) {
    console.log(data);
});

此代码的明显问题是,每次在页面上打开新用户或浏览器窗口时,都会产生一个新的_listen方法,并且向每个用户订阅并广播这些推文,从而导致客户端上收到重复的消息.我的问题是,将_listen方法放在什么位置才是合适的位置,以便无论客户端数量如何,都仅创建一次该方法?另外,请记住,broadcast_channel方法是套接字实例的方法.

解决方法:

问题是我应该使用socket.send时使用了socket.broadcast_channel.

上一篇:协成与异步IO


下一篇:Python,gevent,urllib2.urlopen.read(),下载加速器