1use crate::{Channel, ChannelId, ChannelStore};
2use anyhow::Result;
3use client::Client;
4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
5use rpc::{proto, TypedEnvelope};
6use std::sync::Arc;
7use util::ResultExt;
8
9pub(crate) fn init(client: &Arc<Client>) {
10 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
11 client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
12 client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
13}
14
15pub struct ChannelBuffer {
16 channel_id: ChannelId,
17 collaborators: Vec<proto::Collaborator>,
18 buffer: ModelHandle<language::Buffer>,
19 channel_store: ModelHandle<ChannelStore>,
20 client: Arc<Client>,
21 _subscription: client::Subscription,
22}
23
24pub enum Event {
25 CollaboratorsChanged,
26}
27
28impl Entity for ChannelBuffer {
29 type Event = Event;
30
31 fn release(&mut self, _: &mut AppContext) {
32 self.client
33 .send(proto::LeaveChannelBuffer {
34 channel_id: self.channel_id,
35 })
36 .log_err();
37 }
38}
39
40impl ChannelBuffer {
41 pub(crate) async fn new(
42 channel_store: ModelHandle<ChannelStore>,
43 channel_id: ChannelId,
44 client: Arc<Client>,
45 mut cx: AsyncAppContext,
46 ) -> Result<ModelHandle<Self>> {
47 let response = client
48 .request(proto::JoinChannelBuffer { channel_id })
49 .await?;
50
51 let base_text = response.base_text;
52 let operations = response
53 .operations
54 .into_iter()
55 .map(language::proto::deserialize_operation)
56 .collect::<Result<Vec<_>, _>>()?;
57
58 let collaborators = response.collaborators;
59
60 let buffer = cx.add_model(|_| {
61 language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
62 });
63 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
64
65 let subscription = client.subscribe_to_entity(channel_id)?;
66
67 anyhow::Ok(cx.add_model(|cx| {
68 cx.subscribe(&buffer, Self::on_buffer_update).detach();
69
70 Self {
71 buffer,
72 client,
73 channel_id,
74 channel_store,
75 collaborators,
76 _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
77 }
78 }))
79 }
80
81 async fn handle_update_channel_buffer(
82 this: ModelHandle<Self>,
83 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
84 _: Arc<Client>,
85 mut cx: AsyncAppContext,
86 ) -> Result<()> {
87 let ops = update_channel_buffer
88 .payload
89 .operations
90 .into_iter()
91 .map(language::proto::deserialize_operation)
92 .collect::<Result<Vec<_>, _>>()?;
93
94 this.update(&mut cx, |this, cx| {
95 cx.notify();
96 this.buffer
97 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
98 })?;
99
100 Ok(())
101 }
102
103 async fn handle_add_channel_buffer_collaborator(
104 this: ModelHandle<Self>,
105 envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
106 _: Arc<Client>,
107 mut cx: AsyncAppContext,
108 ) -> Result<()> {
109 let collaborator = envelope.payload.collaborator.ok_or_else(|| {
110 anyhow::anyhow!(
111 "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
112 )
113 })?;
114
115 this.update(&mut cx, |this, cx| {
116 this.collaborators.push(collaborator);
117 cx.emit(Event::CollaboratorsChanged);
118 cx.notify();
119 });
120
121 Ok(())
122 }
123
124 async fn handle_remove_channel_buffer_collaborator(
125 this: ModelHandle<Self>,
126 message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
127 _: Arc<Client>,
128 mut cx: AsyncAppContext,
129 ) -> Result<()> {
130 this.update(&mut cx, |this, cx| {
131 this.collaborators.retain(|collaborator| {
132 if collaborator.peer_id == message.payload.peer_id {
133 this.buffer.update(cx, |buffer, cx| {
134 buffer.remove_peer(collaborator.replica_id as u16, cx)
135 });
136 false
137 } else {
138 true
139 }
140 });
141 cx.emit(Event::CollaboratorsChanged);
142 cx.notify();
143 });
144
145 Ok(())
146 }
147
148 fn on_buffer_update(
149 &mut self,
150 _: ModelHandle<language::Buffer>,
151 event: &language::Event,
152 _: &mut ModelContext<Self>,
153 ) {
154 if let language::Event::Operation(operation) = event {
155 let operation = language::proto::serialize_operation(operation);
156 self.client
157 .send(proto::UpdateChannelBuffer {
158 channel_id: self.channel_id,
159 operations: vec![operation],
160 })
161 .log_err();
162 }
163 }
164
165 pub fn buffer(&self) -> ModelHandle<language::Buffer> {
166 self.buffer.clone()
167 }
168
169 pub fn collaborators(&self) -> &[proto::Collaborator] {
170 &self.collaborators
171 }
172
173 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
174 self.channel_store
175 .read(cx)
176 .channel_for_id(self.channel_id)
177 .cloned()
178 }
179
180 pub fn replica_id(&self, cx: &AppContext) -> u16 {
181 self.buffer.read(cx).replica_id()
182 }
183}