diff --git a/CHANGELOG.md b/CHANGELOG.md index ee02454d..b13636f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -# v1.8.6-alpha0 +# v1.8.6-alpha1 * Read `type_count(..)` from cache if available, pr #443. +* Add `scope` in format `@collection:...` to room events, pr #444. # v1.8.5 diff --git a/inc/ti/collection.t.h b/inc/ti/collection.t.h index 11e78147..f20a9852 100644 --- a/inc/ti/collection.t.h +++ b/inc/ti/collection.t.h @@ -28,6 +28,7 @@ struct ti_collection_s uint64_t created_at; /* UNIX time-stamp in seconds */ ti_tz_t * tz; ti_raw_t * name; + ti_raw_t * scope; imap_t * things; /* weak map for ti_thing_t */ imap_t * rooms; /* weak map for ti_room_t */ queue_t * gc; /* ti_gc_t */ diff --git a/inc/ti/version.h b/inc/ti/version.h index 13d4b7af..b43362d3 100644 --- a/inc/ti/version.h +++ b/inc/ti/version.h @@ -25,7 +25,7 @@ * "-rc0" * "" */ -#define TI_VERSION_PRE_RELEASE "-alpha0" +#define TI_VERSION_PRE_RELEASE "-alpha1" #define TI_MAINTAINER \ "Jeroen van der Heijden " diff --git a/itest/requirements.txt b/itest/requirements.txt index 74182d78..e0f7a154 100644 --- a/itest/requirements.txt +++ b/itest/requirements.txt @@ -1,7 +1,7 @@ psutil pycodestyle msgpack -python-thingsdb>=1.3.0 +python-thingsdb>=1.4.0 requests pytz websockets diff --git a/itest/test_room.py b/itest/test_room.py index 27a40e87..8b8a696a 100755 --- a/itest/test_room.py +++ b/itest/test_room.py @@ -336,6 +336,38 @@ async def test_room_peer_only(self, cl0, cl1, cl2): 'from_0_to_peers', ]), sorted(actions1)) + async def test_multi_collection_room(self, cl0, cl1, cl2): + await cl0.query(r"""//ti + new_collection('a'); + new_collection('b'); + """, scope='/t') + room_id_a = await cl0.query(r"""//ti + .room = room(); + .room.id(); + """, scope='//a') + room_id_b = await cl0.query(r"""//ti + .room = room(); + .room.id(); + """, scope='//b') + + self.assertEqual(room_id_a, room_id_b) + + actions0 = [] + actions1 = [] + room0 = ORoom(actions0, room_id_a, scope='//a') + room1 = ORoom(actions1, room_id_b, scope='//b') + + await room0.join(cl0) + await room1.join(cl0) + + await room0.emit('add', 'room0') + await room1.emit('add', 'room1') + + await asyncio.sleep(1.5) + + self.assertEqual(['room0'], actions0) + self.assertEqual(['room1'], actions1) + if __name__ == '__main__': run_test(TestRoom()) diff --git a/src/ti/collection.c b/src/ti/collection.c index 7e929b67..f3ae537f 100644 --- a/src/ti/collection.c +++ b/src/ti/collection.c @@ -56,6 +56,7 @@ ti_collection_t * ti_collection_create( collection->id = collection_id; collection->next_free_id = next_free_id; collection->name = ti_str_create(name, n); + collection->scope = ti_str_from_fmt("@collection:%.*s", (int) n, name); collection->things = imap_create(); collection->rooms = imap_create(); collection->gc = queue_new(20); @@ -77,7 +78,7 @@ ti_collection_t * ti_collection_create( if (!collection->name || !collection->things || !collection->gc || !collection->access || !collection->procedures || !collection->lock || !collection->types || !collection->enums || !collection->futures || - !collection->rooms || !collection->named_rooms || + !collection->rooms || !collection->named_rooms || !collection->scope || !collection->ano_types || uv_mutex_init(collection->lock)) { ti_collection_drop(collection); @@ -100,6 +101,7 @@ void ti_collection_destroy(ti_collection_t * collection) imap_destroy(collection->rooms, NULL); queue_destroy(collection->gc, NULL); ti_val_drop((ti_val_t *) collection->name); + ti_val_drop((ti_val_t *) collection->scope); vec_destroy(collection->access, (vec_destroy_cb) ti_auth_destroy); vec_destroy(collection->vtasks, (vec_destroy_cb) ti_vtask_drop); vec_destroy(collection->commits, (vec_destroy_cb) ti_commit_destroy); diff --git a/src/ti/room.c b/src/ti/room.c index c544b574..6f8867f1 100644 --- a/src/ti/room.c +++ b/src/ti/room.c @@ -114,7 +114,7 @@ static void room__emit_delete(ti_room_t * room) msgpack_sbuffer buffer; ti_pkg_t * pkg; - if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; @@ -122,7 +122,13 @@ static void room__emit_delete(ti_room_t * room) msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write); - msgpack_pack_map(&pk, 1); + msgpack_pack_map(&pk, 2); + + mp_pack_str(&pk, "scope"); + mp_pack_strn(&pk, + room->collection->scope->data, + room->collection->scope->n); + mp_pack_str(&pk, "id"); msgpack_pack_uint64(&pk, room->id); @@ -189,7 +195,12 @@ int ti_room_emit( sz = buffer.size; - msgpack_pack_map(&vp.pk, 3); + msgpack_pack_map(&vp.pk, 4); + + mp_pack_str(&vp.pk, "scope"); + mp_pack_strn(&vp.pk, + room->collection->scope->data, + room->collection->scope->n); mp_pack_str(&vp.pk, "id"); msgpack_pack_uint64(&vp.pk, room->id); @@ -403,6 +414,7 @@ void ti_room_emit_node_status(ti_room_t * room, const char * status) typedef struct { uint64_t room_id; + char scope[268]; ti_stream_t * stream; } room__async_t; @@ -419,7 +431,7 @@ static void room__async_emit_join_cb(uv_async_t * task) if (ti_stream_is_closed(w->stream)) goto done; - if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); goto done; @@ -427,13 +439,13 @@ static void room__async_emit_join_cb(uv_async_t * task) msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write); - if (msgpack_pack_map(&pk, 1) || - mp_pack_str(&pk, "id") || - msgpack_pack_uint64(&pk, w->room_id)) - { - log_critical(EX_MEMORY_S); - goto done; - } + msgpack_pack_map(&pk, 2); + + mp_pack_str(&pk, "scope"); + mp_pack_str(&pk, w->scope); + + mp_pack_str(&pk, "id"); + msgpack_pack_uint64(&pk, w->room_id); pkg = (ti_pkg_t *) buffer.data; pkg_init(pkg, TI_PROTO_EV_ID, TI_PROTO_CLIENT_ROOM_JOIN, buffer.size); @@ -451,12 +463,16 @@ static void room__async_emit_join(ti_room_t * room, ti_stream_t * stream) { uv_async_t * task = malloc(sizeof(uv_async_t)); room__async_t * w = malloc(sizeof(room__async_t)); + ti_raw_t * scope = room ->collection->scope; if (!task || !w) goto failed; w->stream = stream; w->room_id = room->id; + memcpy(w->scope, scope->data, scope->n); + w->scope[scope->n] = '\0'; + task->data = w; if (uv_async_init(ti.loop, task, (uv_async_cb) room__async_emit_join_cb) || @@ -485,7 +501,7 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream) if (ti_stream_is_closed(stream)) return; - if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; @@ -493,13 +509,15 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream) msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write); - if (msgpack_pack_map(&pk, 1) || - mp_pack_str(&pk, "id") || - msgpack_pack_uint64(&pk, room->id)) - { - log_critical(EX_MEMORY_S); - return; - } + msgpack_pack_map(&pk, 2); + + mp_pack_str(&pk, "scope"); + mp_pack_strn(&pk, + room->collection->scope->data, + room->collection->scope->n); + + mp_pack_str(&pk, "id"); + msgpack_pack_uint64(&pk, room->id); pkg = (ti_pkg_t *) buffer.data; pkg_init(pkg, TI_PROTO_EV_ID, TI_PROTO_CLIENT_ROOM_LEAVE, buffer.size);