Skip to content

Commit 76f3ae5

Browse files
Merge pull request #479 from TogetherCrew/mediawiki-changes
feat: exec mediawiki workflow on module changes
2 parents aea1670 + d1f45f9 commit 76f3ae5

2 files changed

Lines changed: 67 additions & 1 deletion

File tree

src/services/module.service.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { IModule, IModuleUpdateBody, Module, PlatformNames, ModuleNames } from '
44

55
import platformService from './platform.service';
66
import websiteService from './website';
7-
7+
import temporalMediaWiki from './temporal/mediaWiki.service';
88
/**
99
* Create a module
1010
* @param {IModule} ModuleBody
@@ -94,6 +94,9 @@ const updateModule = async (
9494
// if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.Website) {
9595
// await handleHivemindWebsiteCase(newPlatform);
9696
// }
97+
if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.MediaWiki) {
98+
await handleHivemindMediaWikiCase(newPlatform);
99+
}
97100
existingPlatform.metadata = newPlatform.metadata;
98101
} else {
99102
module.options.platforms.push(newPlatform);
@@ -132,6 +135,27 @@ const handleHivemindWebsiteCase = async (platform: any) => {
132135
}
133136
};
134137

138+
/**
139+
* Handle special case for Hivemind module with MediaWiki platform
140+
* @param {Object} platform - Platform object
141+
*/
142+
const handleHivemindMediaWikiCase = async (platform: any) => {
143+
const platformDoc = await platformService.getPlatformById(platform.platform);
144+
145+
if (!platformDoc) return;
146+
147+
const isActivated = platform.metadata?.activated;
148+
const existingWorkflowId = platformDoc.get('metadata.workflowId');
149+
150+
if (isActivated === true) {
151+
if (!existingWorkflowId) {
152+
const workflowId = await temporalMediaWiki.executeWorkflow(platform.platform);
153+
platformDoc.set('metadata.workflowId', workflowId);
154+
await platformDoc.save();
155+
}
156+
}
157+
};
158+
135159
/**
136160
* Delete module
137161
* @param {HydratedDocument<IModule>} module - module doc
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { Types } from 'mongoose';
2+
import { v4 as uuidv4 } from 'uuid';
3+
4+
import { Client } from '@temporalio/client';
5+
6+
import parentLogger from '../../config/logger';
7+
import { queues } from './configs/temporal.config';
8+
import { TemporalCoreService } from './core.service';
9+
10+
const logger = parentLogger.child({ module: 'MediaWikiTemporalService' });
11+
12+
class TemporalMediaWikiService extends TemporalCoreService {
13+
public async executeWorkflow(platformId: Types.ObjectId) {
14+
const client: Client = await this.getClient();
15+
const payload = {
16+
platform_id: platformId,
17+
};
18+
try {
19+
const workflowHandle = await client.workflow.execute('MediaWikiETLWorkflow', {
20+
taskQueue: queues.TEMPORAL_QUEUE_PYTHON_HEAVY,
21+
args: [payload],
22+
workflowId: `mediawiki/${platformId}/${uuidv4()}`,
23+
});
24+
logger.info(`Started MediaWiki workflow with ID: ${workflowHandle}`);
25+
return workflowHandle;
26+
} catch (error) {
27+
logger.error(`Failed to trigger MediaWiki workflow: ${(error as Error).message}`);
28+
throw new Error(`Failed to trigger MediaWiki workflow: ${(error as Error).message}`);
29+
}
30+
}
31+
32+
public async terminateWorkflow(workflowId: string): Promise<void> {
33+
const client: Client = await this.getClient();
34+
const handle = client.workflow.getHandle(workflowId);
35+
const description = await handle.describe();
36+
if (description.status.name !== 'TERMINATED' && description.status.name !== 'COMPLETED') {
37+
await handle.terminate('Terminated due to schedule deletion');
38+
}
39+
}
40+
}
41+
42+
export default new TemporalMediaWikiService();

0 commit comments

Comments
 (0)