Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# v1.8.6-alpha0
# v1.8.6-alpha1

* Read `type_count(..)` from cache if available, pr #443.
* Add `scope` in format `@collection:...` to room events, pr #444.

# v1.8.5

Expand Down
1 change: 1 addition & 0 deletions inc/ti/collection.t.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct ti_collection_s
uint64_t created_at; /* UNIX time-stamp in seconds */
ti_tz_t * tz;
ti_raw_t * name;
ti_raw_t * scope;
imap_t * things; /* weak map for ti_thing_t */
imap_t * rooms; /* weak map for ti_room_t */
queue_t * gc; /* ti_gc_t */
Expand Down
2 changes: 1 addition & 1 deletion inc/ti/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* "-rc0"
* ""
*/
#define TI_VERSION_PRE_RELEASE "-alpha0"
#define TI_VERSION_PRE_RELEASE "-alpha1"

#define TI_MAINTAINER \
"Jeroen van der Heijden <jeroen@cesbit.com>"
Expand Down
2 changes: 1 addition & 1 deletion itest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
psutil
pycodestyle
msgpack
python-thingsdb>=1.3.0
python-thingsdb>=1.4.0
requests
pytz
websockets
32 changes: 32 additions & 0 deletions itest/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,38 @@ async def test_room_peer_only(self, cl0, cl1, cl2):
'from_0_to_peers',
]), sorted(actions1))

async def test_multi_collection_room(self, cl0, cl1, cl2):
await cl0.query(r"""//ti
new_collection('a');
new_collection('b');
""", scope='/t')
room_id_a = await cl0.query(r"""//ti
.room = room();
.room.id();
""", scope='//a')
room_id_b = await cl0.query(r"""//ti
.room = room();
.room.id();
""", scope='//b')

self.assertEqual(room_id_a, room_id_b)

actions0 = []
actions1 = []
room0 = ORoom(actions0, room_id_a, scope='//a')
room1 = ORoom(actions1, room_id_b, scope='//b')

await room0.join(cl0)
await room1.join(cl0)

await room0.emit('add', 'room0')
await room1.emit('add', 'room1')

await asyncio.sleep(1.5)

self.assertEqual(['room0'], actions0)
self.assertEqual(['room1'], actions1)


if __name__ == '__main__':
run_test(TestRoom())
4 changes: 3 additions & 1 deletion src/ti/collection.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ ti_collection_t * ti_collection_create(
collection->id = collection_id;
collection->next_free_id = next_free_id;
collection->name = ti_str_create(name, n);
collection->scope = ti_str_from_fmt("@collection:%.*s", (int) n, name);
collection->things = imap_create();
collection->rooms = imap_create();
collection->gc = queue_new(20);
Expand All @@ -77,7 +78,7 @@ ti_collection_t * ti_collection_create(
if (!collection->name || !collection->things || !collection->gc ||
!collection->access || !collection->procedures || !collection->lock ||
!collection->types || !collection->enums || !collection->futures ||
!collection->rooms || !collection->named_rooms ||
!collection->rooms || !collection->named_rooms || !collection->scope ||
!collection->ano_types || uv_mutex_init(collection->lock))
{
ti_collection_drop(collection);
Expand All @@ -100,6 +101,7 @@ void ti_collection_destroy(ti_collection_t * collection)
imap_destroy(collection->rooms, NULL);
queue_destroy(collection->gc, NULL);
ti_val_drop((ti_val_t *) collection->name);
ti_val_drop((ti_val_t *) collection->scope);
vec_destroy(collection->access, (vec_destroy_cb) ti_auth_destroy);
vec_destroy(collection->vtasks, (vec_destroy_cb) ti_vtask_drop);
vec_destroy(collection->commits, (vec_destroy_cb) ti_commit_destroy);
Expand Down
56 changes: 37 additions & 19 deletions src/ti/room.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,21 @@ static void room__emit_delete(ti_room_t * room)
msgpack_sbuffer buffer;
ti_pkg_t * pkg;

if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t)))
if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t)))
{
log_critical(EX_MEMORY_S);
return;
}

msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write);

msgpack_pack_map(&pk, 1);
msgpack_pack_map(&pk, 2);

mp_pack_str(&pk, "scope");
mp_pack_strn(&pk,
room->collection->scope->data,
room->collection->scope->n);

mp_pack_str(&pk, "id");
msgpack_pack_uint64(&pk, room->id);

Expand Down Expand Up @@ -189,7 +195,12 @@ int ti_room_emit(

sz = buffer.size;

msgpack_pack_map(&vp.pk, 3);
msgpack_pack_map(&vp.pk, 4);

mp_pack_str(&vp.pk, "scope");
mp_pack_strn(&vp.pk,
room->collection->scope->data,
room->collection->scope->n);

mp_pack_str(&vp.pk, "id");
msgpack_pack_uint64(&vp.pk, room->id);
Expand Down Expand Up @@ -403,6 +414,7 @@ void ti_room_emit_node_status(ti_room_t * room, const char * status)
typedef struct
{
uint64_t room_id;
char scope[268];
ti_stream_t * stream;
} room__async_t;

Expand All @@ -419,21 +431,21 @@ static void room__async_emit_join_cb(uv_async_t * task)
if (ti_stream_is_closed(w->stream))
goto done;

if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t)))
if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t)))
{
log_critical(EX_MEMORY_S);
goto done;
}

msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write);

if (msgpack_pack_map(&pk, 1) ||
mp_pack_str(&pk, "id") ||
msgpack_pack_uint64(&pk, w->room_id))
{
log_critical(EX_MEMORY_S);
goto done;
}
msgpack_pack_map(&pk, 2);

mp_pack_str(&pk, "scope");
mp_pack_str(&pk, w->scope);

mp_pack_str(&pk, "id");
msgpack_pack_uint64(&pk, w->room_id);

pkg = (ti_pkg_t *) buffer.data;
pkg_init(pkg, TI_PROTO_EV_ID, TI_PROTO_CLIENT_ROOM_JOIN, buffer.size);
Expand All @@ -451,12 +463,16 @@ static void room__async_emit_join(ti_room_t * room, ti_stream_t * stream)
{
uv_async_t * task = malloc(sizeof(uv_async_t));
room__async_t * w = malloc(sizeof(room__async_t));
ti_raw_t * scope = room ->collection->scope;

if (!task || !w)
goto failed;

w->stream = stream;
w->room_id = room->id;
memcpy(w->scope, scope->data, scope->n);
w->scope[scope->n] = '\0';

task->data = w;

if (uv_async_init(ti.loop, task, (uv_async_cb) room__async_emit_join_cb) ||
Expand Down Expand Up @@ -485,21 +501,23 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream)
if (ti_stream_is_closed(stream))
return;

if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t)))
if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t)))
{
log_critical(EX_MEMORY_S);
return;
}

msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write);

if (msgpack_pack_map(&pk, 1) ||
mp_pack_str(&pk, "id") ||
msgpack_pack_uint64(&pk, room->id))
{
log_critical(EX_MEMORY_S);
return;
}
msgpack_pack_map(&pk, 2);

mp_pack_str(&pk, "scope");
mp_pack_strn(&pk,
room->collection->scope->data,
room->collection->scope->n);

mp_pack_str(&pk, "id");
msgpack_pack_uint64(&pk, room->id);

pkg = (ti_pkg_t *) buffer.data;
pkg_init(pkg, TI_PROTO_EV_ID, TI_PROTO_CLIENT_ROOM_LEAVE, buffer.size);
Expand Down