1use crate::{Channel, ChannelId, ChannelStore};
2use anyhow::Result;
3use client::Client;
4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
5use rpc::{proto, TypedEnvelope};
6use std::sync::Arc;
7use util::ResultExt;
8
9pub(crate) fn init(client: &Arc<Client>) {
10 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
11 client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
12 client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
13}
14
15pub struct ChannelBuffer {
16 channel_id: ChannelId,
17 collaborators: Vec<proto::Collaborator>,
18 buffer: ModelHandle<language::Buffer>,
19 channel_store: ModelHandle<ChannelStore>,
20 client: Arc<Client>,
21 _subscription: client::Subscription,
22}
23
24impl Entity for ChannelBuffer {
25 type Event = ();
26
27 fn release(&mut self, _: &mut AppContext) {
28 self.client
29 .send(proto::LeaveChannelBuffer {
30 channel_id: self.channel_id,
31 })
32 .log_err();
33 }
34}
35
36impl ChannelBuffer {
37 pub(crate) fn new(
38 channel_store: ModelHandle<ChannelStore>,
39 channel_id: ChannelId,
40 client: Arc<Client>,
41 cx: &mut AppContext,
42 ) -> Task<Result<ModelHandle<Self>>> {
43 cx.spawn(|mut cx| async move {
44 let response = client
45 .request(proto::JoinChannelBuffer { channel_id })
46 .await?;
47
48 let base_text = response.base_text;
49 let operations = response
50 .operations
51 .into_iter()
52 .map(language::proto::deserialize_operation)
53 .collect::<Result<Vec<_>, _>>()?;
54
55 let collaborators = response.collaborators;
56
57 let buffer =
58 cx.add_model(|cx| language::Buffer::new(response.replica_id as u16, base_text, cx));
59 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
60
61 let subscription = client.subscribe_to_entity(channel_id)?;
62
63 anyhow::Ok(cx.add_model(|cx| {
64 cx.subscribe(&buffer, Self::on_buffer_update).detach();
65
66 Self {
67 buffer,
68 client,
69 channel_id,
70 channel_store,
71 collaborators,
72 _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
73 }
74 }))
75 })
76 }
77
78 async fn handle_update_channel_buffer(
79 this: ModelHandle<Self>,
80 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
81 _: Arc<Client>,
82 mut cx: AsyncAppContext,
83 ) -> Result<()> {
84 let ops = update_channel_buffer
85 .payload
86 .operations
87 .into_iter()
88 .map(language::proto::deserialize_operation)
89 .collect::<Result<Vec<_>, _>>()?;
90
91 this.update(&mut cx, |this, cx| {
92 cx.notify();
93 this.buffer
94 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
95 })?;
96
97 Ok(())
98 }
99
100 async fn handle_add_channel_buffer_collaborator(
101 this: ModelHandle<Self>,
102 envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
103 _: Arc<Client>,
104 mut cx: AsyncAppContext,
105 ) -> Result<()> {
106 let collaborator = envelope.payload.collaborator.ok_or_else(|| {
107 anyhow::anyhow!(
108 "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
109 )
110 })?;
111
112 this.update(&mut cx, |this, cx| {
113 this.collaborators.push(collaborator);
114 cx.notify();
115 });
116
117 Ok(())
118 }
119
120 async fn handle_remove_channel_buffer_collaborator(
121 this: ModelHandle<Self>,
122 message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
123 _: Arc<Client>,
124 mut cx: AsyncAppContext,
125 ) -> Result<()> {
126 this.update(&mut cx, |this, cx| {
127 this.collaborators.retain(|collaborator| {
128 if collaborator.peer_id == message.payload.peer_id {
129 this.buffer.update(cx, |buffer, cx| {
130 buffer.remove_peer(collaborator.replica_id as u16, cx)
131 });
132 false
133 } else {
134 true
135 }
136 });
137 cx.notify();
138 });
139
140 Ok(())
141 }
142
143 fn on_buffer_update(
144 &mut self,
145 _: ModelHandle<language::Buffer>,
146 event: &language::Event,
147 _: &mut ModelContext<Self>,
148 ) {
149 if let language::Event::Operation(operation) = event {
150 let operation = language::proto::serialize_operation(operation);
151 self.client
152 .send(proto::UpdateChannelBuffer {
153 channel_id: self.channel_id,
154 operations: vec![operation],
155 })
156 .log_err();
157 }
158 }
159
160 pub fn buffer(&self) -> ModelHandle<language::Buffer> {
161 self.buffer.clone()
162 }
163
164 pub fn collaborators(&self) -> &[proto::Collaborator] {
165 &self.collaborators
166 }
167
168 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
169 self.channel_store
170 .read(cx)
171 .channel_for_id(self.channel_id)
172 .cloned()
173 }
174
175 pub fn replica_id(&self, cx: &AppContext) -> u16 {
176 self.buffer.read(cx).replica_id()
177 }
178}