Detailed changes
@@ -36,7 +36,7 @@ impl ChannelBuffer {
) -> Task<Result<ModelHandle<Self>>> {
cx.spawn(|mut cx| async move {
let response = client
- .request(proto::OpenChannelBuffer { channel_id })
+ .request(proto::JoinChannelBuffer { channel_id })
.await?;
let base_text = response.base_text;
@@ -238,15 +238,16 @@ CREATE TABLE "buffer_snapshots" (
CREATE TABLE "channel_buffer_collaborators" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
- "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+ "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE,
"connection_id" INTEGER NOT NULL,
"connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+ "connection_lost" BOOLEAN NOT NULL DEFAULT false,
"user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"replica_id" INTEGER NOT NULL
);
-CREATE INDEX "index_channel_buffer_collaborators_on_buffer_id" ON "channel_buffer_collaborators" ("buffer_id");
-CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_and_replica_id" ON "channel_buffer_collaborators" ("buffer_id", "replica_id");
+CREATE INDEX "index_channel_buffer_collaborators_on_channel_id" ON "channel_buffer_collaborators" ("channel_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_and_replica_id" ON "channel_buffer_collaborators" ("channel_id", "replica_id");
CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id");
CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id");
-CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("buffer_id", "connection_id", "connection_server_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("channel_id", "connection_id", "connection_server_id");
@@ -27,15 +27,16 @@ CREATE TABLE "buffer_snapshots" (
CREATE TABLE "channel_buffer_collaborators" (
"id" SERIAL PRIMARY KEY,
- "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+ "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE,
"connection_id" INTEGER NOT NULL,
"connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+ "connection_lost" BOOLEAN NOT NULL DEFAULT FALSE,
"user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"replica_id" INTEGER NOT NULL
);
-CREATE INDEX "index_channel_buffer_collaborators_on_buffer_id" ON "channel_buffer_collaborators" ("buffer_id");
-CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_and_replica_id" ON "channel_buffer_collaborators" ("buffer_id", "replica_id");
+CREATE INDEX "index_channel_buffer_collaborators_on_channel_id" ON "channel_buffer_collaborators" ("channel_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_and_replica_id" ON "channel_buffer_collaborators" ("channel_id", "replica_id");
CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id");
CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id");
-CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("buffer_id", "connection_id", "connection_server_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("channel_id", "connection_id", "connection_server_id");
@@ -2,66 +2,12 @@ use super::*;
use prost::Message;
impl Database {
- pub async fn update_buffer(
- &self,
- buffer_id: BufferId,
- operations: &[proto::Operation],
- ) -> Result<()> {
- self.transaction(|tx| async move {
- let buffer = buffer::Entity::find_by_id(buffer_id)
- .one(&*tx)
- .await?
- .ok_or_else(|| anyhow!("no such buffer"))?;
- buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| {
- match operation.variant.as_ref()? {
- proto::operation::Variant::Edit(operation) => {
- let value =
- serialize_edit_operation(&operation.ranges, &operation.new_text);
- let version = serialize_version(&operation.version);
- Some(buffer_operation::ActiveModel {
- buffer_id: ActiveValue::Set(buffer_id),
- epoch: ActiveValue::Set(buffer.epoch),
- replica_id: ActiveValue::Set(operation.replica_id as i32),
- lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
- local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
- is_undo: ActiveValue::Set(false),
- version: ActiveValue::Set(version),
- value: ActiveValue::Set(value),
- })
- }
- proto::operation::Variant::Undo(operation) => {
- let value = serialize_undo_operation(&operation.counts);
- let version = serialize_version(&operation.version);
- Some(buffer_operation::ActiveModel {
- buffer_id: ActiveValue::Set(buffer_id),
- epoch: ActiveValue::Set(buffer.epoch),
- replica_id: ActiveValue::Set(operation.replica_id as i32),
- lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
- local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
- is_undo: ActiveValue::Set(true),
- version: ActiveValue::Set(version),
- value: ActiveValue::Set(value),
- })
- }
- proto::operation::Variant::UpdateSelections(_) => None,
- proto::operation::Variant::UpdateDiagnostics(_) => None,
- proto::operation::Variant::UpdateCompletionTriggers(_) => None,
- }
- }))
- .exec(&*tx)
- .await?;
-
- Ok(())
- })
- .await
- }
-
- pub async fn join_buffer_for_channel(
+ pub async fn join_channel_buffer(
&self,
channel_id: ChannelId,
user_id: UserId,
connection: ConnectionId,
- ) -> Result<proto::OpenChannelBufferResponse> {
+ ) -> Result<proto::JoinChannelBufferResponse> {
self.transaction(|tx| async move {
let tx = tx;
@@ -90,8 +36,8 @@ impl Database {
};
// Join the collaborators
- let mut collaborators = buffer
- .find_related(channel_buffer_collaborator::Entity)
+ let mut collaborators = channel_buffer_collaborator::Entity::find()
+ .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
.all(&*tx)
.await?;
let replica_ids = collaborators
@@ -103,7 +49,7 @@ impl Database {
replica_id.0 += 1;
}
let collaborator = channel_buffer_collaborator::ActiveModel {
- buffer_id: ActiveValue::Set(buffer.id),
+ channel_id: ActiveValue::Set(channel_id),
connection_id: ActiveValue::Set(connection.id as i32),
connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
user_id: ActiveValue::Set(user_id),
@@ -168,7 +114,7 @@ impl Database {
})
}
- Ok(proto::OpenChannelBufferResponse {
+ Ok(proto::JoinChannelBufferResponse {
buffer_id: buffer.id.to_proto(),
base_text,
operations,
@@ -185,32 +131,112 @@ impl Database {
.await
}
- pub async fn get_buffer_collaborators(&self, buffer: BufferId) -> Result<()> {
+ pub async fn leave_channel_buffer(
+ &self,
+ channel_id: ChannelId,
+ connection: ConnectionId,
+ ) -> Result<Vec<ConnectionId>> {
+ self.transaction(|tx| async move {
+ let result = channel_buffer_collaborator::Entity::delete_many()
+ .filter(
+ Condition::all()
+ .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
+ .add(
+ channel_buffer_collaborator::Column::ConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ channel_buffer_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
+ .exec(&*tx)
+ .await?;
+ if result.rows_affected == 0 {
+ Err(anyhow!("not a collaborator on this project"))?;
+ }
+
+ let mut connections = Vec::new();
+ let mut rows = channel_buffer_collaborator::Entity::find()
+ .filter(
+ Condition::all()
+ .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
+ )
+ .stream(&*tx)
+ .await?;
+ while let Some(row) = rows.next().await {
+ let row = row?;
+ connections.push(ConnectionId {
+ id: row.connection_id as u32,
+ owner_id: row.connection_server_id.0 as u32,
+ });
+ }
+
+ Ok(connections)
+ })
+ .await
+ }
+
+ pub async fn leave_channel_buffers(
+ &self,
+ connection: ConnectionId,
+ ) -> Result<Option<LeftChannelBuffers>> {
+ //
+ }
+
+ pub async fn get_channel_buffer_collaborators(&self, channel_id: ChannelId) -> Result<()> {
todo!()
}
- pub async fn leave_buffer(&self, buffer: BufferId, user: UserId) -> Result<()> {
+ pub async fn update_channel_buffer(
+ &self,
+ buffer_id: BufferId,
+ operations: &[proto::Operation],
+ ) -> Result<()> {
self.transaction(|tx| async move {
- //TODO
- // let tx = tx;
- // let channel = channel::Entity::find_by_id(channel_id)
- // .one(&*tx)
- // .await?
- // .ok_or_else(|| anyhow!("invalid channel"))?;
+ let buffer = buffer::Entity::find_by_id(buffer_id)
+ .one(&*tx)
+ .await?
+ .ok_or_else(|| anyhow!("no such buffer"))?;
+ buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| {
+ match operation.variant.as_ref()? {
+ proto::operation::Variant::Edit(operation) => {
+ let value =
+ serialize_edit_operation(&operation.ranges, &operation.new_text);
+ let version = serialize_version(&operation.version);
+ Some(buffer_operation::ActiveModel {
+ buffer_id: ActiveValue::Set(buffer_id),
+ epoch: ActiveValue::Set(buffer.epoch),
+ replica_id: ActiveValue::Set(operation.replica_id as i32),
+ lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
+ local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
+ is_undo: ActiveValue::Set(false),
+ version: ActiveValue::Set(version),
+ value: ActiveValue::Set(value),
+ })
+ }
+ proto::operation::Variant::Undo(operation) => {
+ let value = serialize_undo_operation(&operation.counts);
+ let version = serialize_version(&operation.version);
+ Some(buffer_operation::ActiveModel {
+ buffer_id: ActiveValue::Set(buffer_id),
+ epoch: ActiveValue::Set(buffer.epoch),
+ replica_id: ActiveValue::Set(operation.replica_id as i32),
+ lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
+ local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
+ is_undo: ActiveValue::Set(true),
+ version: ActiveValue::Set(version),
+ value: ActiveValue::Set(value),
+ })
+ }
+ proto::operation::Variant::UpdateSelections(_) => None,
+ proto::operation::Variant::UpdateDiagnostics(_) => None,
+ proto::operation::Variant::UpdateCompletionTriggers(_) => None,
+ }
+ }))
+ .exec(&*tx)
+ .await?;
- // if let Some(id) = channel.main_buffer_id {
- // return Ok(id);
- // } else {
- // let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
- // channel::ActiveModel {
- // id: ActiveValue::Unchanged(channel_id),
- // main_buffer_id: ActiveValue::Set(Some(buffer.id)),
- // ..Default::default()
- // }
- // .update(&*tx)
- // .await?;
- // Ok(buffer.id)
- // }
Ok(())
})
.await
@@ -903,15 +903,35 @@ impl Database {
),
)
.one(&*tx)
- .await?
- .ok_or_else(|| anyhow!("not a participant in any room"))?;
+ .await?;
- room_participant::Entity::update(room_participant::ActiveModel {
- answering_connection_lost: ActiveValue::set(true),
- ..participant.into_active_model()
- })
- .exec(&*tx)
- .await?;
+ if let Some(participant) = participant {
+ room_participant::Entity::update(room_participant::ActiveModel {
+ answering_connection_lost: ActiveValue::set(true),
+ ..participant.into_active_model()
+ })
+ .exec(&*tx)
+ .await?;
+ }
+
+ channel_buffer_collaborator::Entity::update_many()
+ .filter(
+ Condition::all()
+ .add(
+ channel_buffer_collaborator::Column::ConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ channel_buffer_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
+ .set(channel_buffer_collaborator::ActiveModel {
+ connection_lost: ActiveValue::set(true),
+ ..Default::default()
+ })
+ .exec(&*tx)
+ .await?;
Ok(())
})
@@ -22,8 +22,6 @@ pub enum Relation {
to = "super::channel::Column::Id"
)]
Channel,
- #[sea_orm(has_many = "super::channel_buffer_collaborator::Entity")]
- Collaborators,
}
impl Related<super::buffer_operation::Entity> for Entity {
@@ -44,10 +42,4 @@ impl Related<super::channel::Entity> for Entity {
}
}
-impl Related<super::channel_buffer_collaborator::Entity> for Entity {
- fn to() -> RelationDef {
- Relation::Collaborators.def()
- }
-}
-
impl ActiveModelBehavior for ActiveModel {}
@@ -19,6 +19,8 @@ pub enum Relation {
Buffer,
#[sea_orm(has_many = "super::channel_member::Entity")]
Member,
+ #[sea_orm(has_many = "super::channel_buffer_collaborator::Entity")]
+ BufferCollaborators,
}
impl Related<super::channel_member::Entity> for Entity {
@@ -38,3 +40,9 @@ impl Related<super::buffer::Entity> for Entity {
Relation::Buffer.def()
}
}
+
+impl Related<super::channel_buffer_collaborator::Entity> for Entity {
+ fn to() -> RelationDef {
+ Relation::BufferCollaborators.def()
+ }
+}
@@ -1,4 +1,4 @@
-use crate::db::{BufferId, ChannelBufferCollaboratorId, ReplicaId, ServerId, UserId};
+use crate::db::{ChannelBufferCollaboratorId, ChannelId, ReplicaId, ServerId, UserId};
use rpc::ConnectionId;
use sea_orm::entity::prelude::*;
@@ -7,9 +7,10 @@ use sea_orm::entity::prelude::*;
pub struct Model {
#[sea_orm(primary_key)]
pub id: ChannelBufferCollaboratorId,
- pub buffer_id: BufferId,
+ pub channel_id: ChannelId,
pub connection_id: i32,
pub connection_server_id: ServerId,
+ pub connection_lost: bool,
pub user_id: UserId,
pub replica_id: ReplicaId,
}
@@ -26,16 +27,16 @@ impl Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
- belongs_to = "super::buffer::Entity",
- from = "Column::BufferId",
- to = "super::buffer::Column::Id"
+ belongs_to = "super::channel::Entity",
+ from = "Column::ChannelId",
+ to = "super::channel::Column::Id"
)]
- Buffer,
+ Channel,
}
-impl Related<super::buffer::Entity> for Entity {
+impl Related<super::channel::Entity> for Entity {
fn to() -> RelationDef {
- Relation::Buffer.def()
+ Relation::Channel.def()
}
}
@@ -37,13 +37,14 @@ async fn test_channel_buffers(db: &Arc<Database>) {
.await
.unwrap()
.user_id;
+
// This user will not be a part of the channel
let c_id = db
.create_user(
- "user_b@example.com",
+ "user_c@example.com",
false,
NewUserParams {
- github_login: "user_b".into(),
+ github_login: "user_c".into(),
github_user_id: 102,
invite_count: 0,
},
@@ -64,8 +65,9 @@ async fn test_channel_buffers(db: &Arc<Database>) {
.await
.unwrap();
+ let connection_id_a = ConnectionId { owner_id, id: 1 };
let buffer_response_a = db
- .join_buffer_for_channel(zed_id, a_id, ConnectionId { owner_id, id: 1 })
+ .join_channel_buffer(zed_id, a_id, connection_id_a)
.await
.unwrap();
let buffer_id = BufferId::from_proto(buffer_response_a.buffer_id);
@@ -83,10 +85,13 @@ async fn test_channel_buffers(db: &Arc<Database>) {
.map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
.collect::<Vec<_>>();
- db.update_buffer(buffer_id, &operations).await.unwrap();
+ db.update_channel_buffer(buffer_id, &operations)
+ .await
+ .unwrap();
+ let connection_id_b = ConnectionId { owner_id, id: 2 };
let buffer_response_b = db
- .join_buffer_for_channel(zed_id, b_id, ConnectionId { owner_id, id: 2 })
+ .join_channel_buffer(zed_id, b_id, connection_id_b)
.await
.unwrap();
@@ -106,7 +111,7 @@ async fn test_channel_buffers(db: &Arc<Database>) {
// Ensure that C fails to open the buffer
assert!(db
- .join_buffer_for_channel(zed_id, c_id, ConnectionId { owner_id, id: 3 })
+ .join_channel_buffer(zed_id, c_id, ConnectionId { owner_id, id: 3 })
.await
.is_err());
@@ -127,5 +132,14 @@ async fn test_channel_buffers(db: &Arc<Database>) {
]
);
- // Leave buffer
+ let collaborators = db
+ .leave_channel_buffer(zed_id, connection_id_b)
+ .await
+ .unwrap();
+
+ assert_eq!(collaborators, &[connection_id_a],);
+
+ db.connection_lost(connection_id_a).await.unwrap();
+ // assert!()
+ // Test buffer epoch incrementing?
}
@@ -39,7 +39,7 @@ use prometheus::{register_int_gauge, IntGauge};
use rpc::{
proto::{
self, Ack, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo,
- OpenChannelBufferResponse, RequestMessage,
+ RequestMessage,
},
Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
};
@@ -251,8 +251,8 @@ impl Server {
.add_request_handler(remove_channel_member)
.add_request_handler(set_channel_member_admin)
.add_request_handler(rename_channel)
- .add_request_handler(open_channel_buffer)
- .add_request_handler(close_channel_buffer)
+ .add_request_handler(join_channel_buffer)
+ .add_request_handler(leave_channel_buffer)
.add_message_handler(update_channel_buffer)
.add_request_handler(get_channel_members)
.add_request_handler(respond_to_channel_invite)
@@ -2484,16 +2484,16 @@ async fn join_channel(
Ok(())
}
-async fn open_channel_buffer(
- request: proto::OpenChannelBuffer,
- response: Response<proto::OpenChannelBuffer>,
+async fn join_channel_buffer(
+ request: proto::JoinChannelBuffer,
+ response: Response<proto::JoinChannelBuffer>,
session: Session,
) -> Result<()> {
let db = session.db().await;
let channel_id = ChannelId::from_proto(request.channel_id);
let open_response = db
- .join_buffer_for_channel(channel_id, session.user_id, session.connection_id)
+ .join_channel_buffer(channel_id, session.user_id, session.connection_id)
.await?;
response.send(open_response)?;
@@ -2501,16 +2501,18 @@ async fn open_channel_buffer(
Ok(())
}
-async fn close_channel_buffer(
- request: proto::CloseChannelBuffer,
- response: Response<proto::CloseChannelBuffer>,
+async fn leave_channel_buffer(
+ request: proto::LeaveChannelBuffer,
+ response: Response<proto::LeaveChannelBuffer>,
session: Session,
) -> Result<()> {
let db = session.db().await;
- let buffer_id = BufferId::from_proto(request.buffer_id);
+ let channel_id = ChannelId::from_proto(request.channel_id);
+
+ let collaborators_to_notify = db
+ .leave_channel_buffer(channel_id, session.connection_id)
+ .await?;
- // TODO: close channel buffer here
- //
response.send(Ack {})?;
Ok(())
@@ -143,10 +143,10 @@ message Envelope {
SetChannelMemberAdmin set_channel_member_admin = 129;
RenameChannel rename_channel = 130;
- OpenChannelBuffer open_channel_buffer = 131;
- OpenChannelBufferResponse open_channel_buffer_response = 132;
+ JoinChannelBuffer join_channel_buffer = 131;
+ JoinChannelBufferResponse join_channel_buffer_response = 132;
UpdateChannelBuffer update_channel_buffer = 133;
- CloseChannelBuffer close_channel_buffer = 134;
+ LeaveChannelBuffer leave_channel_buffer = 134;
}
}
@@ -958,19 +958,19 @@ message RenameChannel {
string name = 2;
}
-message OpenChannelBuffer {
+message JoinChannelBuffer {
uint64 channel_id = 1;
}
-message OpenChannelBufferResponse {
+message JoinChannelBufferResponse {
uint64 buffer_id = 1;
string base_text = 2;
repeated Operation operations = 3;
repeated Collaborator collaborators = 4;
}
-message CloseChannelBuffer {
- uint64 buffer_id = 1;
+message LeaveChannelBuffer {
+ uint64 channel_id = 1;
}
message RespondToChannelInvite {
@@ -249,9 +249,9 @@ messages!(
(GetPrivateUserInfoResponse, Foreground),
(GetChannelMembers, Foreground),
(GetChannelMembersResponse, Foreground),
- (OpenChannelBuffer, Foreground),
- (OpenChannelBufferResponse, Foreground),
- (CloseChannelBuffer, Background),
+ (JoinChannelBuffer, Foreground),
+ (JoinChannelBufferResponse, Foreground),
+ (LeaveChannelBuffer, Background),
(UpdateChannelBuffer, Foreground)
);
@@ -319,8 +319,8 @@ request_messages!(
(UpdateParticipantLocation, Ack),
(UpdateProject, Ack),
(UpdateWorktree, Ack),
- (OpenChannelBuffer, OpenChannelBufferResponse),
- (CloseChannelBuffer, Ack)
+ (JoinChannelBuffer, JoinChannelBufferResponse),
+ (LeaveChannelBuffer, Ack)
);
entity_messages!(