Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This is the Cocytus project for tracking citations on Wikipedia.

We are changing a __diff stream__ into a __citation delta__ stream.

+ we use the [recent changes stream](https://wikitech.wikimedia.org/wiki/RCStream)
+ we use the [RecentChange stream](https://www.mediawiki.org/wiki/Manual:RCFeed) published by [EventStreams](https://wikitech.wikimedia.org/wiki/EventStreams)
+ to make queue of diffs to be inspected
+ Keep a database table of the latest version we have seen so far
+ call the wikimedia api to fetch the diff text
Expand Down
35 changes: 14 additions & 21 deletions cocytus-input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from redis import Redis
import compare_change
import crossref_push
import socketIO_client
from sseclient import SSEClient as EventSource
import json
import time
import signal
import logging
Expand All @@ -27,23 +28,15 @@ def alarm_handle(signal_number, current_stack_frame):
signal.siginterrupt(signal.SIGALRM, False)
signal.alarm(alarm_interval)

class WikiNamespace(socketIO_client.BaseNamespace):

def on_change(self, change):
logging.info(u"enqueing "+str(change))
while True:
try:
queue.enqueue(compare_change.get_changes, change)
break
except Exception as e:
logging.error(e.message)
time.sleep(1.0)

def on_connect(self):
self.emit(u"subscribe", u"*")


while True:
socketIO = socketIO_client.SocketIO(u'stream.wikimedia.org', 80)
socketIO.define(WikiNamespace, u'/rc')
socketIO.wait(HEARTBEAT_INTERVAL + 2) # 10 minutes, in prime seconds
for event in EventSource('https://stream.wikimedia.org/v2/stream/recentchange'):
try:
if event.event == 'message' and event.data:
change = json.loads(event.data)
logging.info(u"enqueing " + str(change))
queue.enqueue(compare_change.get_changes, change)
elif event.event == 'error':
logging.error(event.data)
time.sleep(1.0)
except Exception as e:
logging.error(e.message)
time.sleep(1.0)