diff --git a/README.md b/README.md index c8edce9..b80dc71 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This is the Cocytus project for tracking citations on Wikipedia. We are changing a __diff stream__ into a __citation delta__ stream. -+ we use the [recent changes stream](https://wikitech.wikimedia.org/wiki/RCStream) ++ we use the [RecentChange stream](https://www.mediawiki.org/wiki/Manual:RCFeed) published by [EventStreams](https://wikitech.wikimedia.org/wiki/EventStreams) + to make queue of diffs to be inspected + Keep a database table of the latest version we have seen so far + call the wikimedia api to fetch the diff text diff --git a/cocytus-input.py b/cocytus-input.py index ad12f92..1977089 100644 --- a/cocytus-input.py +++ b/cocytus-input.py @@ -2,7 +2,8 @@ from redis import Redis import compare_change import crossref_push -import socketIO_client +from sseclient import SSEClient as EventSource +import json import time import signal import logging @@ -27,23 +28,15 @@ def alarm_handle(signal_number, current_stack_frame): signal.siginterrupt(signal.SIGALRM, False) signal.alarm(alarm_interval) -class WikiNamespace(socketIO_client.BaseNamespace): - - def on_change(self, change): - logging.info(u"enqueing "+str(change)) - while True: - try: - queue.enqueue(compare_change.get_changes, change) - break - except Exception as e: - logging.error(e.message) - time.sleep(1.0) - - def on_connect(self): - self.emit(u"subscribe", u"*") - - -while True: - socketIO = socketIO_client.SocketIO(u'stream.wikimedia.org', 80) - socketIO.define(WikiNamespace, u'/rc') - socketIO.wait(HEARTBEAT_INTERVAL + 2) # 10 minutes, in prime seconds +for event in EventSource('https://stream.wikimedia.org/v2/stream/recentchange'): + try: + if event.event == 'message' and event.data: + change = json.loads(event.data) + logging.info(u"enqueing " + str(change)) + queue.enqueue(compare_change.get_changes, change) + elif event.event == 'error': + logging.error(event.data) + time.sleep(1.0) + except Exception as e: + logging.error(e.message) + time.sleep(1.0)