Skip to content

Commit 31db17a

Browse files
committed
command line option to switch between threads and processes
This commit enables the choice between using threads or (forked) processes when using parallel seeding. The default is to use a single thread. Support for multiple processes is disabled on windows platforms due to the lack of ipc message queues. Setting the size of the ipc message queue (msg_qbytes) might be unsupported on osx, needs checking.
1 parent fd78257 commit 31db17a

1 file changed

Lines changed: 121 additions & 80 deletions

File tree

util/mapcache_seed.c

Lines changed: 121 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
#include <time.h>
3737
#ifndef _WIN32
3838
#include <unistd.h>
39-
#define nUSE_FORK
39+
#define USE_FORK
4040
#include <sys/time.h>
4141
#endif
4242

@@ -47,10 +47,11 @@
4747
int msqid;
4848
#include <sys/ipc.h>
4949
#include <sys/msg.h>
50-
#else
50+
#include <errno.h>
51+
#endif
52+
5153
#include <apr_queue.h>
5254
apr_queue_t *work_queue;
53-
#endif
5455

5556
#if defined(USE_OGR) && defined(USE_GEOS)
5657
#define USE_CLIPPERS
@@ -71,7 +72,8 @@ apr_table_t *dimensions;
7172
int minzoom=-1;
7273
int maxzoom=-1;
7374
mapcache_grid_link *grid_link;
74-
int nthreads=1;
75+
int nthreads=0;
76+
int nprocesses=0;
7577
int quiet = 0;
7678
int verbose = 0;
7779
int force = 0;
@@ -118,67 +120,72 @@ cmd mode = MAPCACHE_CMD_SEED; /* the mode the utility will be running in: either
118120
int push_queue(struct seed_cmd cmd)
119121
{
120122
#ifdef USE_FORK
121-
struct msg_cmd mcmd;
122-
mcmd.mtype = 1;
123-
mcmd.cmd = cmd;
124-
if (msgsnd(msqid, &mcmd, sizeof(struct seed_cmd), 0) == -1) {
125-
printf("failed to push tile %d %d %d\n",cmd.z,cmd.y,cmd.x);
126-
return APR_EGENERAL;
123+
if(nprocesses > 1) {
124+
struct msg_cmd mcmd;
125+
mcmd.mtype = 1;
126+
mcmd.cmd = cmd;
127+
if (msgsnd(msqid, &mcmd, sizeof(struct seed_cmd), 0) == -1) {
128+
printf("failed to push tile %d %d %d\n",cmd.z,cmd.y,cmd.x);
129+
return APR_EGENERAL;
130+
}
131+
return APR_SUCCESS;
127132
}
128-
return APR_SUCCESS;
129-
#else
133+
#endif
130134
struct seed_cmd *pcmd = calloc(1,sizeof(struct seed_cmd));
131135
*pcmd = cmd;
132136
return apr_queue_push(work_queue,pcmd);
133-
#endif
134137
}
135138

136139
int pop_queue(struct seed_cmd *cmd)
137140
{
138-
#ifdef USE_FORK
139-
struct msg_cmd mcmd;
140-
if (msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, 0) == -1) {
141-
printf("failed to pop tile\n");
142-
return APR_EGENERAL;
143-
}
144-
*cmd = mcmd.cmd;
145-
return APR_SUCCESS;
146-
#else
147141
int ret;
148142
struct seed_cmd *pcmd;
143+
144+
#ifdef USE_FORK
145+
if(nprocesses > 1) {
146+
struct msg_cmd mcmd;
147+
if (msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, 0) == -1) {
148+
printf("failed to pop tile\n");
149+
return APR_EGENERAL;
150+
}
151+
*cmd = mcmd.cmd;
152+
return APR_SUCCESS;
153+
}
154+
#endif
155+
149156
ret = apr_queue_pop(work_queue, (void**)&pcmd);
150157
if(ret == APR_SUCCESS) {
151158
*cmd = *pcmd;
152159
free(pcmd);
153160
}
154161
return ret;
155-
#endif
156162
}
157163

158164
int trypop_queue(struct seed_cmd *cmd)
159165
{
160-
#ifdef USE_FORK
161-
int ret;
162-
struct msg_cmd mcmd;
163-
ret = msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, IPC_NOWAIT);
164-
if(errno == ENOMSG) return APR_EAGAIN;
165-
if(ret>0) {
166-
*cmd = mcmd.cmd;
167-
return APR_SUCCESS;
168-
} else {
169-
printf("failed to trypop tile\n");
170-
return APR_EGENERAL;
171-
}
172-
#else
173166
int ret;
174167
struct seed_cmd *pcmd;
168+
169+
#ifdef USE_FORK
170+
if(nprocesses>1) {
171+
struct msg_cmd mcmd;
172+
ret = msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, IPC_NOWAIT);
173+
if(errno == ENOMSG) return APR_EAGAIN;
174+
if(ret>0) {
175+
*cmd = mcmd.cmd;
176+
return APR_SUCCESS;
177+
} else {
178+
printf("failed to trypop tile\n");
179+
return APR_EGENERAL;
180+
}
181+
}
182+
#endif
175183
ret = apr_queue_trypop(work_queue,(void**)&pcmd);
176184
if(ret == APR_SUCCESS) {
177185
*cmd = *pcmd;
178186
free(pcmd);
179187
}
180188
return ret;
181-
#endif
182189
}
183190

184191
static const apr_getopt_option_t seed_options[] = {
@@ -189,7 +196,8 @@ static const apr_getopt_option_t seed_options[] = {
189196
{ "zoom", 'z', TRUE, "min and max zoomlevels to seed, separated by a comma. eg 0,6" },
190197
{ "metasize", 'M', TRUE, "override metatile size while seeding, eg 8,8" },
191198
{ "extent", 'e', TRUE, "extent to seed, format: minx,miny,maxx,maxy" },
192-
{ "nthreads", 'n', TRUE, "number of parallel threads to use" },
199+
{ "nthreads", 'n', TRUE, "number of parallel threads to use (incompatible with -p/--nprocesses)" },
200+
{ "nprocesses", 'p', TRUE, "number of parallel processes to use (incompatible with -n/--nthreads)" },
193201
{ "mode", 'm', TRUE, "mode: seed (default), delete or transfer" },
194202
{ "older", 'o', TRUE, "reseed tiles older than supplied date (format: year/month/day hour:minute, eg: 2011/01/31 20:45" },
195203
{ "dimension", 'D', TRUE, "set the value of a dimension (format DIMENSIONNAME=VALUE). Can be used multiple times for multiple dimensions" },
@@ -277,7 +285,8 @@ void progresslog(int x, int y, int z)
277285
{
278286
char msg[1024];
279287
if(quiet) return;
280-
288+
int nworkers = nthreads;
289+
if(nprocesses >= 1) nworkers = nprocesses;
281290

282291
sprintf(msg,"seeding tile %d %d %d",x,y,z);
283292
if(lastmsglen) {
@@ -293,11 +302,11 @@ void progresslog(int x, int y, int z)
293302
fflush(NULL);
294303
return;
295304

296-
if(queuedtilestot>nthreads) {
305+
if(queuedtilestot>nworkers) {
297306
struct mctimeval now_t;
298307
float duration;
299308
float totalduration;
300-
seededtilestot = queuedtilestot - nthreads;
309+
seededtilestot = queuedtilestot - nworkers;
301310

302311
mapcache_gettimeofday(&now_t,NULL);
303312
duration = ((now_t.tv_sec-lastlogtime.tv_sec)*1000000+(now_t.tv_usec-lastlogtime.tv_usec))/1000000.0;
@@ -503,14 +512,16 @@ void cmd_recurse(mapcache_context *cmd_ctx, mapcache_tile *tile)
503512
tile->z = curz;
504513
}
505514

506-
void cmd_thread()
515+
void cmd_worker()
507516
{
508517
int n;
509518
mapcache_tile *tile;
510519
int z = minzoom;
511520
int x = grid_link->grid_limits[z][0];
512521
int y = grid_link->grid_limits[z][1];
513522
mapcache_context cmd_ctx = ctx;
523+
int nworkers = nthreads;
524+
if(nprocesses >= 1) nworkers = nprocesses;
514525
apr_pool_create(&cmd_ctx.pool,ctx.pool);
515526
tile = mapcache_tileset_tile_create(ctx.pool, tileset, grid_link);
516527
tile->dimensions = dimensions;
@@ -578,7 +589,7 @@ void cmd_thread()
578589
}
579590
//instruct rendering threads to stop working
580591

581-
for(n=0; n<nthreads; n++) {
592+
for(n=0; n<nworkers; n++) {
582593
struct seed_cmd cmd;
583594
cmd.command = MAPCACHE_CMD_STOP;
584595
push_queue(cmd);
@@ -589,11 +600,8 @@ void cmd_thread()
589600
}
590601
}
591602

592-
#ifdef USE_FORK
593-
int seed_thread(void *data)
594-
#else
595-
static void* APR_THREAD_FUNC seed_thread(apr_thread_t *thread, void *data)
596-
#endif
603+
604+
void seed_worker()
597605
{
598606
mapcache_tile *tile;
599607
mapcache_context seed_ctx = ctx;
@@ -637,12 +645,17 @@ static void* APR_THREAD_FUNC seed_thread(apr_thread_t *thread, void *data)
637645
ctx.log(&ctx,MAPCACHE_INFO,seed_ctx.get_error_message(&seed_ctx));
638646
}
639647
}
648+
}
649+
640650
#ifdef USE_FORK
651+
int seed_process() {
652+
seed_worker();
641653
return 0;
642-
#else
643-
apr_thread_exit(thread,MAPCACHE_SUCCESS);
644-
return NULL;
654+
}
645655
#endif
656+
static void* APR_THREAD_FUNC seed_thread(apr_thread_t *thread, void *data) {
657+
seed_worker();
658+
return NULL;
646659
}
647660

648661
void
@@ -703,10 +716,8 @@ int main(int argc, const char **argv)
703716
/* initialize apr_getopt_t */
704717
apr_getopt_t *opt;
705718
const char *configfile=NULL;
706-
#ifndef USE_FORK
707719
apr_thread_t **threads;
708720
apr_threadattr_t *thread_attrs;
709-
#endif
710721
const char *tileset_name=NULL;
711722
const char *tileset_transfer_name=NULL;
712723
const char *grid_name = NULL;
@@ -734,11 +745,6 @@ int main(int argc, const char **argv)
734745
apr_pool_create(&ctx.pool,NULL);
735746
mapcache_context_init(&ctx);
736747
ctx.process_pool = ctx.pool;
737-
#ifndef USE_FORK
738-
apr_thread_mutex_create((apr_thread_mutex_t**)&ctx.threadlock,APR_THREAD_MUTEX_DEFAULT,ctx.pool);
739-
#else
740-
ctx.threadlock = NULL;
741-
#endif
742748
cfg = mapcache_configuration_create(ctx.pool);
743749
ctx.config = cfg;
744750
ctx.log= mapcache_context_seeding_log;
@@ -790,7 +796,18 @@ int main(int argc, const char **argv)
790796
break;
791797
case 'n':
792798
nthreads = (int)strtol(optarg, NULL, 10);
799+
if(nthreads <=0 )
800+
return usage(argv[0], "failed to parse nthreads, expecting positive integer");
801+
break;
802+
case 'p':
803+
#ifdef USE_FORK
804+
nprocesses = (int)strtol(optarg, NULL, 10);
805+
if(nprocesses <=0 )
806+
return usage(argv[0], "failed to parse nprocesses, expecting positive integer");
793807
break;
808+
#else
809+
return usage(argv[0], "multi process seeding not available on this platform");
810+
#endif
794811
case 'e':
795812
if ( MAPCACHE_SUCCESS != mapcache_util_extract_double_list(&ctx, (char*)optarg, ",", &extent, &n) ||
796813
n != 4 || extent[0] >= extent[2] || extent[1] >= extent[3] ) {
@@ -1079,40 +1096,65 @@ int main(int argc, const char **argv)
10791096

10801097
}
10811098

1082-
if( ! nthreads ) {
1083-
return usage(argv[0],"failed to parse nthreads, must be int");
1084-
} else {
1085-
1099+
if(nthreads == 0 && nprocesses == 0) {
1100+
nthreads = 1;
1101+
}
1102+
if(nthreads >= 1 && nprocesses >= 1) {
1103+
return usage(argv[0],"cannot set both nthreads and nprocesses");
1104+
}
1105+
if(nprocesses > 1) {
10861106
#ifdef USE_FORK
10871107
key_t key;
10881108
int i;
1089-
pid_t *pids = malloc(nthreads*sizeof(pid_t));
1109+
pid_t *pids = malloc(nprocesses*sizeof(pid_t));
10901110
struct msqid_ds queue_ds;
1111+
ctx.threadlock = NULL;
10911112
key = ftok(argv[0], 'B');
10921113
if ((msqid = msgget(key, 0644 | IPC_CREAT|S_IRUSR|S_IWUSR)) == -1) {
10931114
return usage(argv[0],"failed to create sysv ipc message queue");
10941115
}
10951116
if (-1 == msgctl(msqid, IPC_STAT, &queue_ds)) {
1096-
return usage(argv[0], "\nFailure in msgctl() 1");
1117+
return usage(argv[0], "\nFailure in msgctl() stat");
1118+
}
1119+
queue_ds.msg_qbytes = nprocesses*sizeof(struct seed_cmd);
1120+
if(-1 == msgctl(msqid, IPC_SET, &queue_ds)) {
1121+
switch(errno) {
1122+
case EACCES:
1123+
return usage(argv[0], "\nFailure in msgctl() set qbytes: EACCESS (should not happen here)");
1124+
case EFAULT:
1125+
return usage(argv[0], "\nFailure in msgctl() set qbytes: EFAULT queue not accessible");
1126+
case EIDRM:
1127+
return usage(argv[0], "\nFailure in msgctl() set qbytes: EIDRM message queue removed");
1128+
case EINVAL:
1129+
return usage(argv[0], "\nFailure in msgctl() set qbytes: EINVAL invalid value for msg_qbytes");
1130+
case EPERM:
1131+
return usage(argv[0], "\nFailure in msgctl() set qbytes: EPERM permission denied on msg_qbytes");
1132+
default:
1133+
return usage(argv[0], "\nFailure in msgctl() set qbytes: unknown");
1134+
}
10971135
}
10981136

1099-
for(i=0; i<nthreads; i++) {
1137+
for(i=0; i<nprocesses; i++) {
11001138
int pid = fork();
11011139
if(pid==0) {
1102-
seed_thread(NULL);
1140+
seed_process();
11031141
exit(0);
11041142
} else {
11051143
pids[i] = pid;
11061144
}
11071145
}
1108-
cmd_thread();
1109-
for(i=0; i<nthreads; i++) {
1146+
cmd_worker();
1147+
for(i=0; i<nprocesses; i++) {
11101148
int stat_loc;
11111149
waitpid(pids[i],&stat_loc,0);
11121150
}
11131151
msgctl(msqid,IPC_RMID,NULL);
11141152
#else
1153+
return usage(argv[0],"bug: multi process support not available");
1154+
#endif
1155+
} else {
11151156
//start the thread that will populate the queue.
1157+
apr_thread_mutex_create((apr_thread_mutex_t**)&ctx.threadlock,APR_THREAD_MUTEX_DEFAULT,ctx.pool);
11161158
//create the queue where tile requests will be put
11171159
apr_queue_create(&work_queue,nthreads,ctx.pool);
11181160

@@ -1122,22 +1164,21 @@ int main(int argc, const char **argv)
11221164
for(n=0; n<nthreads; n++) {
11231165
apr_thread_create(&threads[n], thread_attrs, seed_thread, NULL, ctx.pool);
11241166
}
1125-
cmd_thread();
1167+
cmd_worker();
11261168
for(n=0; n<nthreads; n++) {
11271169
apr_thread_join(&rv, threads[n]);
11281170
}
1129-
#endif
1130-
if(ctx.get_error(&ctx)) {
1131-
printf("%s",ctx.get_error_message(&ctx));
1132-
}
1171+
}
1172+
if(ctx.get_error(&ctx)) {
1173+
printf("%s",ctx.get_error_message(&ctx));
1174+
}
11331175

1134-
if(seededtilestot>0) {
1135-
struct mctimeval now_t;
1136-
float duration;
1137-
mapcache_gettimeofday(&now_t,NULL);
1138-
duration = ((now_t.tv_sec-starttime.tv_sec)*1000000+(now_t.tv_usec-starttime.tv_usec))/1000000.0;
1139-
printf("\nseeded %d metatiles at %g tiles/sec\n",seededtilestot, seededtilestot/duration);
1140-
}
1176+
if(seededtilestot>0) {
1177+
struct mctimeval now_t;
1178+
float duration;
1179+
mapcache_gettimeofday(&now_t,NULL);
1180+
duration = ((now_t.tv_sec-starttime.tv_sec)*1000000+(now_t.tv_usec-starttime.tv_usec))/1000000.0;
1181+
printf("\nseeded %d metatiles at %g tiles/sec\n",seededtilestot, seededtilestot/duration);
11411182
}
11421183
apr_terminate();
11431184
return 0;

0 commit comments

Comments
 (0)