Skip to content

Commit

Permalink
chore: fix import and remove unuse implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Sma1lboy committed Oct 21, 2024
1 parent 6278447 commit 323d1ca
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 154 deletions.
5 changes: 3 additions & 2 deletions ee/tabby-db/src/slack_workspaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde_json::Value;
use sqlx::{prelude::FromRow, query, query_as};
use tabby_db_macros::query_paged_as;

#[derive(Debug, FromRow, Serialize, Deserialize)]
#[derive(Debug, FromRow, Serialize, Deserialize, Clone)]
pub struct SlackWorkspaceIntegrationDAO {
pub id: i64,
pub workspace_name: String,
Expand Down Expand Up @@ -68,7 +68,8 @@ impl DbConn {
bot_token: String,
channels: Option<Vec<String>>,
) -> Result<i64> {
let channels_json = channels.map(Json).unwrap_or_else(|| Json(vec![]));
let channels_json = serde_json::to_value(channels.unwrap_or_default())?;

let res = query!(
"INSERT INTO slack_workspaces(workspace_name, workspace_id, bot_token, channels) VALUES (?, ?, ?, ?);",
workspace_name,
Expand Down
131 changes: 8 additions & 123 deletions ee/tabby-webserver/src/service/background_job/slack_integration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::helper::Job;
use chrono::{DateTime, Utc};
use logkit::debug;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tabby_index::public::{DocIndexer, WebDocument};
Expand Down Expand Up @@ -122,145 +123,29 @@ struct SlackChannel {
name: String,
}

//TODO: Implement these functions
async fn fetch_all_channels(
bot_token: &str,
workspace_id: &str,
) -> Result<Vec<SlackChannel>, CoreError> {
let client = slack_api::default_client().map_err(|e| CoreError::Other(e.into()))?;

let mut channels = Vec::new();
let mut cursor = None;

loop {
let request = ListRequest {
token: bot_token.to_string(),
cursor: cursor.clone(),
exclude_archived: Some(true),
types: Some("public_channel,private_channel".to_string()),
..Default::default()
};

let response = slack_api::channels::list(&client, &request)
.await
.map_err(|e| CoreError::Other(e.into()))?;

if let Some(channel_list) = response.channels {
for channel in channel_list {
channels.push(SlackChannel {
id: channel.id.unwrap_or_default(),
name: channel.name.unwrap_or_default(),
});
}
}

if let Some(next_cursor) = response.response_metadata.and_then(|m| m.next_cursor) {
if next_cursor.is_empty() {
break;
}
cursor = Some(next_cursor);
} else {
break;
}
}

Ok(channels)
debug!("unimplemented: fetch_all_channels");
Ok(vec![])
}

async fn fetch_channel_messages(
bot_token: &str,
workspace_id: &str,
channel_id: &str,
) -> Result<Vec<SlackMessage>, CoreError> {
let client = slack_api::default_client().map_err(|e| CoreError::Other(e.into()))?;

let mut messages = Vec::new();
let mut cursor = None;

loop {
let request = HistoryRequest {
token: bot_token.to_string(),
channel: channel_id.to_string(),
cursor: cursor.clone(),
limit: Some(100),
..Default::default()
};

let response = slack_api::conversations::history(&client, &request)
.await
.map_err(|e| CoreError::Other(e.into()))?;

if let Some(message_list) = response.messages {
for msg in message_list {
let replies = if msg.thread_ts.is_some() {
fetch_message_replies(bot_token, channel_id, &msg.ts.unwrap_or_default())
.await?
} else {
Vec::new()
};

messages.push(SlackMessage {
id: msg.ts.unwrap_or_default(),
channel_id: channel_id.to_string(),
user: msg.user.unwrap_or_default(),
text: msg.text.unwrap_or_default(),
timestamp: chrono::DateTime::parse_from_str(
&msg.ts.unwrap_or_default(),
"%s%.f",
)
.map(|dt| dt.with_timezone(&chrono::Utc))
.map_err(|e| CoreError::Other(e.into()))?,
thread_ts: msg.thread_ts,
replies,
});
}
}

if let Some(next_cursor) = response.response_metadata.and_then(|m| m.next_cursor) {
if next_cursor.is_empty() {
break;
}
cursor = Some(next_cursor);
} else {
break;
}
}

Ok(messages)
debug!("unimplemented: fetch_channel_messages");
Ok(vec![])
}

async fn fetch_message_replies(
bot_token: &str,
channel_id: &str,
thread_ts: &str,
) -> Result<Vec<SlackReply>, CoreError> {
let client = slack_api::default_client().map_err(|e| CoreError::Other(e.into()))?;

let request = slack_api::conversations::RepliesRequest {
token: bot_token.to_string(),
channel: channel_id.to_string(),
ts: thread_ts.to_string(),
..Default::default()
};

let response = slack_api::conversations::replies(&client, &request)
.await
.map_err(|e| CoreError::Other(e.into()))?;

let mut replies = Vec::new();

if let Some(message_list) = response.messages {
// Skip the first message as it's the parent message
for msg in message_list.into_iter().skip(1) {
replies.push(SlackReply {
id: msg.ts.unwrap_or_default(),
user: msg.user.unwrap_or_default(),
text: msg.text.unwrap_or_default(),
timestamp: chrono::DateTime::parse_from_str(&msg.ts.unwrap_or_default(), "%s%.f")
.map(|dt| dt.with_timezone(&chrono::Utc))
.map_err(|e| CoreError::Other(e.into()))?,
});
}
}

Ok(replies)
debug!("unimplemented: fetch_message_replies");
Ok(vec![])
}
58 changes: 32 additions & 26 deletions ee/tabby-webserver/src/service/slack_workspaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use juniper::ID;
use tabby_db::DbConn;
use tabby_schema::{
integration,
job::{JobInfo, JobService},
slack_workspaces::{
to_slack_workspace_integration, CreateSlackWorkspaceIntegrationInput,
Expand Down Expand Up @@ -58,7 +59,7 @@ impl SlackWorkspaceIntegrationService for SlackWorkspaceIntegrationServiceImpl {
integration.id.to_string(),
integration.workspace_id.clone(),
integration.bot_token.clone(),
integration.get_channels(),
Some(integration.get_channels().unwrap_or_default()),
));

let job_info = self.job_service.get_job_info(event.to_command()).await?;
Expand All @@ -71,63 +72,75 @@ impl SlackWorkspaceIntegrationService for SlackWorkspaceIntegrationServiceImpl {
&self,
input: CreateSlackWorkspaceIntegrationInput,
) -> Result<ID> {
let workspace_id = input.workspace_id.clone();
let bot_token = input.bot_token.clone();
let channels = input.channels.clone();
//create in db
let id = self
.db
.create_slack_workspace_integration(
input.workspace_name,
input.workspace_id,
input.bot_token,
input.channels,
workspace_id,
bot_token,
channels,
)
.await?;

let workspace_id = input.workspace_id.clone();
let bot_token = input.bot_token.clone();
let channels = input.channels.clone();
//trigger in background job
let _ = self
.job_service
.trigger(
BackgroundJobEvent::SlackIntegration(SlackIntegrationJob::new(
id.to_string(),
input.workspace_id,
input.bot_token,
input.channels,
workspace_id,
bot_token,
channels,
))
.to_command(),
)
.await;

Ok(id.as_id())
}

async fn delete_slack_workspace_integration(&self, id: ID) -> Result<bool> {
let rowid = id.as_rowid()?;
let row_id = id.as_rowid()?;

let integration = {
let mut x = self
.db
.list_slack_workspace_integrations(Some(vec![rowid]), None, None, false)
.list_slack_workspace_integrations(Some(vec![row_id]), None, None, false)
.await?;

x.pop()
.context("Slack workspace integration doesn't exist")?
};
self.db.delete_slack_workspace_integration(rowid).await?;

self.db.delete_slack_workspace_integration(row_id).await?;

// Clone the necessary fields
let workspace_id = integration.workspace_id.clone();
let bot_token = integration.bot_token.clone();
let channels = integration.get_channels().unwrap_or_default();

self.job_service
.clear(
BackgroundJobEvent::SlackIntegration(SlackIntegrationJob::new(
rowid.to_string(),
integration.workspace_id,
integration.bot_token,
integration.get_channels(),
row_id.to_string(),
workspace_id,
bot_token,
Some(channels),
))
.to_command(),
)
.await?;

self.job_service
.trigger(BackgroundJobEvent::IndexGarbageCollection.to_command())
.await?;

Ok(true)
}

// async fn trigger_slack_integration_job(&self, id: ID) -> Result<JobInfo> {
// let integration = self.db.get_slack_workspace_integration(id).await;

Expand All @@ -153,7 +166,7 @@ mod tests {
workspace_name: "Test Workspace".to_string(),
workspace_id: "W12345".to_string(),
bot_token: "xoxb-test-token".to_string(),
channels: vec![],
channels: Some(vec![]),
};
let id = service
.create_slack_workspace_integration(input)
Expand All @@ -168,13 +181,6 @@ mod tests {
assert_eq!(1, integrations.len());
assert_eq!(id, integrations[0].id);

// Test trigger job
let job_info = service
.trigger_slack_integration_job(id.clone())
.await
.unwrap();
assert!(job_info.last_job_run.is_some());

// Test delete
let result = service
.delete_slack_workspace_integration(id)
Expand Down
6 changes: 3 additions & 3 deletions ee/tabby-webserver/src/webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use crate::{
routes,
service::{
create_service_locator, event_logger::create_event_logger, integration, job, repository,
web_documents,
slack_workspaces, web_documents,
},
slack_workspaces,
};

pub struct Webserver {
Expand Down Expand Up @@ -82,7 +81,8 @@ impl Webserver {

let web_documents = Arc::new(web_documents::create(db.clone(), job.clone()));

let slack = Arc::new(slack_workspaces::create(db, job_service));
let slack = Arc::new(slack_workspaces::create(db, job.clone()));

let context = Arc::new(crate::service::context::create(
repository.clone(),
web_documents.clone(),
Expand Down

0 comments on commit 323d1ca

Please sign in to comment.