Skip to content

Commit b49d489

Browse files
committed
Added Discord, Teams, Slack ingest tools.
1 parent 89b9be4 commit b49d489

7 files changed

+499
-2
lines changed

.pylintrc

+2-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ disable=raw-checker-failed,
449449
too-many-branches,
450450
too-many-positional-arguments,
451451
too-many-statements,
452-
too-few-public-methods
452+
too-few-public-methods,
453+
too-many-locals
453454

454455
# Enable the message, report, category or checker with the given id(s). You can
455456
# either give multiple identifier separated by comma (,) or put this option

__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,8 @@
1717
GitHubIssueIngestTool,
1818
JiraIssueIngestTool,
1919
LinearIssueIngestTool,
20+
MicrosoftTeamsIngestTool,
21+
DiscordIngestTool,
22+
SlackIngestTool,
2023
CrewAIConverter
2124
)

graphlit_tools/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,7 @@
1717
from .ingest.github_issue_ingest_tool import GitHubIssueIngestTool
1818
from .ingest.jira_issue_ingest_tool import JiraIssueIngestTool
1919
from .ingest.linear_issue_ingest_tool import LinearIssueIngestTool
20+
from .ingest.microsoft_teams_ingest_tool import MicrosoftTeamsIngestTool
21+
from .ingest.discord_ingest_tool import DiscordIngestTool
22+
from .ingest.slack_ingest_tool import SlackIngestTool
2023
from .ingest.rss_ingest_tool import RSSIngestTool
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import logging
2+
import time
3+
import os
4+
from typing import Optional, Type
5+
6+
from graphlit import Graphlit
7+
from graphlit_api import exceptions, input_types, enums
8+
from pydantic import BaseModel, Field
9+
10+
from ..base_tool import BaseTool
11+
from ..exceptions import ToolException
12+
from .. import helpers
13+
14+
logger = logging.getLogger(__name__)
15+
16+
class DiscordIngestInput(BaseModel):
17+
channel_name: str = Field(default=None, description="Discord channel name")
18+
read_limit: Optional[int] = Field(default=None, description="Maximum number of messages from Discord channel to be read")
19+
20+
class DiscordIngestTool(BaseTool):
21+
name: str = "Graphlit Discord ingest tool"
22+
description: str = """Ingests messages from Discord channel into knowledge base.
23+
Accepts Discord channel name.
24+
Returns extracted Markdown text and metadata from messages."""
25+
args_schema: Type[BaseModel] = DiscordIngestInput
26+
27+
graphlit: Graphlit = Field(None, exclude=True)
28+
29+
workflow_id: Optional[str] = Field(None, exclude=True)
30+
correlation_id: Optional[str] = Field(None, exclude=True)
31+
32+
model_config = {
33+
"arbitrary_types_allowed": True
34+
}
35+
36+
def __init__(self, graphlit: Optional[Graphlit] = None, workflow_id: Optional[str] = None, correlation_id: Optional[str] = None, **kwargs):
37+
"""
38+
Initializes the DiscordIngestTool.
39+
40+
Args:
41+
graphlit (Optional[Graphlit]): An optional Graphlit instance to interact with the Graphlit API.
42+
If not provided, a new Graphlit instance will be created.
43+
workflow_id (Optional[str]): ID for the workflow to use when ingesting messages. Defaults to None.
44+
correlation_id (Optional[str]): Correlation ID for tracking requests. Defaults to None.
45+
**kwargs: Additional keyword arguments for the BaseTool superclass.
46+
"""
47+
super().__init__(**kwargs)
48+
self.graphlit = graphlit or Graphlit()
49+
self.workflow_id = workflow_id
50+
self.correlation_id = correlation_id
51+
52+
async def _arun(self, channel_name: str, read_limit: Optional[int] = None) -> Optional[str]:
53+
feed_id = None
54+
55+
token = os.environ['DISCORD_BOT_TOKEN']
56+
57+
if token is None:
58+
raise ToolException('Invalid Discord bot token. Need to assign DISCORD_BOT_TOKEN environment variable.')
59+
60+
try:
61+
response = await self.graphlit.client.create_feed(
62+
feed=input_types.FeedInput(
63+
name='Discord',
64+
type=enums.FeedTypes.DISCORD,
65+
discord=input_types.DiscordFeedPropertiesInput(
66+
type=enums.FeedListingTypes.PAST,
67+
channel=channel_name,
68+
token=token,
69+
includeAttachments=True,
70+
readLimit=read_limit if read_limit is not None else 10
71+
),
72+
workflow=input_types.EntityReferenceInput(id=self.workflow_id) if self.workflow_id is not None else None,
73+
),
74+
correlation_id=self.correlation_id
75+
)
76+
77+
feed_id = response.create_feed.id if response.create_feed is not None else None
78+
79+
if feed_id is None:
80+
return None
81+
82+
logger.debug(f'Created feed [{feed_id}].')
83+
84+
# Wait for feed to complete, since ingestion happens asychronously
85+
done = False
86+
time.sleep(5)
87+
88+
while not done:
89+
done = await self.is_feed_done(feed_id)
90+
91+
if done is None:
92+
break
93+
94+
if not done:
95+
time.sleep(5)
96+
97+
logger.debug(f'Completed feed [{feed_id}].')
98+
except exceptions.GraphQLClientError as e:
99+
logger.error(str(e))
100+
raise ToolException(str(e)) from e
101+
102+
try:
103+
contents = await self.query_contents(feed_id)
104+
105+
results = []
106+
107+
for content in contents:
108+
results.extend(helpers.format_content(content))
109+
110+
text = "\n".join(results)
111+
112+
return text
113+
except exceptions.GraphQLClientError as e:
114+
logger.error(str(e))
115+
raise ToolException(str(e)) from e
116+
117+
def _run(self, channel_name: str, read_limit: Optional[int] = None) -> str:
118+
return helpers.run_async(self._arun, channel_name, read_limit)
119+
120+
async def is_feed_done(self, feed_id: str):
121+
if self.graphlit.client is None:
122+
return None
123+
124+
response = await self.graphlit.client.is_feed_done(feed_id)
125+
126+
return response.is_feed_done.result if response.is_feed_done is not None else None
127+
128+
async def query_contents(self, feed_id: str):
129+
if self.graphlit.client is None:
130+
return None
131+
132+
try:
133+
response = await self.graphlit.client.query_contents(
134+
filter=input_types.ContentFilter(
135+
feeds=[
136+
input_types.EntityReferenceFilter(
137+
id=feed_id
138+
)
139+
]
140+
)
141+
)
142+
143+
return response.contents.results if response.contents is not None else None
144+
except exceptions.GraphQLClientError as e:
145+
logger.error(str(e))
146+
return None

graphlit_tools/ingest/microsoft_email_ingest_tool.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class MicrosoftEmailIngestTool(BaseTool):
3333

3434
def __init__(self, graphlit: Optional[Graphlit] = None, workflow_id: Optional[str] = None, correlation_id: Optional[str] = None, **kwargs):
3535
"""
36-
Initializes the GmailIngestTool.
36+
Initializes the MicrosoftEmailIngestTool.
3737
3838
Args:
3939
graphlit (Optional[Graphlit]): An optional Graphlit instance to interact with the Graphlit API.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import logging
2+
import time
3+
import os
4+
from typing import Optional, Type
5+
6+
from graphlit import Graphlit
7+
from graphlit_api import exceptions, input_types, enums
8+
from pydantic import BaseModel, Field
9+
10+
from ..base_tool import BaseTool
11+
from ..exceptions import ToolException
12+
from .. import helpers
13+
14+
logger = logging.getLogger(__name__)
15+
16+
class MicrosoftTeamsIngestInput(BaseModel):
17+
team_name: str = Field(default=None, description="Microsoft Teams team name")
18+
channel_name: str = Field(default=None, description="Microsoft Teams channel name")
19+
read_limit: Optional[int] = Field(default=None, description="Maximum number of messages from Microsoft Teams channel to be read")
20+
21+
class MicrosoftTeamsIngestTool(BaseTool):
22+
name: str = "Graphlit Microsoft Teams ingest tool"
23+
description: str = """Ingests messages from Microsoft Teams channel into knowledge base.
24+
Returns extracted Markdown text and metadata from messages."""
25+
args_schema: Type[BaseModel] = MicrosoftTeamsIngestInput
26+
27+
graphlit: Graphlit = Field(None, exclude=True)
28+
29+
workflow_id: Optional[str] = Field(None, exclude=True)
30+
correlation_id: Optional[str] = Field(None, exclude=True)
31+
32+
model_config = {
33+
"arbitrary_types_allowed": True
34+
}
35+
36+
def __init__(self, graphlit: Optional[Graphlit] = None, workflow_id: Optional[str] = None, correlation_id: Optional[str] = None, **kwargs):
37+
"""
38+
Initializes the MicrosoftTeamsIngestTool.
39+
40+
Args:
41+
graphlit (Optional[Graphlit]): An optional Graphlit instance to interact with the Graphlit API.
42+
If not provided, a new Graphlit instance will be created.
43+
workflow_id (Optional[str]): ID for the workflow to use when ingesting messages. Defaults to None.
44+
correlation_id (Optional[str]): Correlation ID for tracking requests. Defaults to None.
45+
**kwargs: Additional keyword arguments for the BaseTool superclass.
46+
"""
47+
super().__init__(**kwargs)
48+
self.graphlit = graphlit or Graphlit()
49+
self.workflow_id = workflow_id
50+
self.correlation_id = correlation_id
51+
52+
async def _arun(self, team_name: Optional[str] = None, channel_name: Optional[str] = None, read_limit: Optional[int] = None) -> Optional[str]:
53+
feed_id = None
54+
55+
team_id = os.environ['MICROSOFT_TEAMS_TEAM_ID']
56+
channel_id = os.environ['MICROSOFT_TEAMS_CHANNEL_ID']
57+
58+
refresh_token = os.environ['MICROSOFT_TEAMS_REFRESH_TOKEN']
59+
60+
if refresh_token is None:
61+
raise ToolException('Invalid Microsoft Teams refresh token. Need to assign MICROSOFT_TEAMS_REFRESH_TOKEN environment variable.')
62+
63+
teams = None
64+
65+
try:
66+
response = await self.graphlit.client.query_microsoft_teams_teams(
67+
properties=input_types.MicrosoftTeamsTeamsInput(
68+
refreshToken=refresh_token,
69+
)
70+
)
71+
72+
teams = response.microsoft_teams_teams.results if response.microsoft_teams_teams is not None else None
73+
except exceptions.GraphQLClientError as e:
74+
logger.error(str(e))
75+
raise ToolException(str(e)) from e
76+
77+
if len(teams) == 0:
78+
raise ToolException('No Microsoft Teams teams were found.')
79+
80+
team = next(filter(lambda x: x['team_name'] == team_name, teams), None) if team_name is not None else teams[0]
81+
82+
team_id = team.team_id if team is not None else team_id
83+
84+
if team_id is None:
85+
raise ToolException('Invalid Microsoft Teams team identifier. Need to assign MICROSOFT_TEAMS_TEAM_ID environment variable.')
86+
87+
channels = None
88+
89+
try:
90+
response = await self.graphlit.client.query_microsoft_teams_channels(
91+
properties=input_types.MicrosoftTeamsChannelsInput(
92+
refreshToken=refresh_token,
93+
),
94+
team_id=team_id
95+
)
96+
97+
channels = response.microsoft_teams_channels.results if response.microsoft_teams_channels is not None else None
98+
except exceptions.GraphQLClientError as e:
99+
logger.error(str(e))
100+
raise ToolException(str(e)) from e
101+
102+
if len(channels) == 0:
103+
raise ToolException('No Microsoft Teams channels were found.')
104+
105+
channel = next(filter(lambda x: x['channel_name'] == channel_name, channels), None) if channel_name is not None else channels[0]
106+
107+
channel_id = channel.channel_id if channel is not None else channel_id
108+
109+
if channel_id is None:
110+
raise ToolException('Invalid Microsoft Teams channel identifier. Need to assign MICROSOFT_TEAMS_CHANNEL_ID environment variable.')
111+
112+
try:
113+
response = await self.graphlit.client.create_feed(
114+
feed=input_types.FeedInput(
115+
name='Microsoft Teams',
116+
type=enums.FeedTypes.MICROSOFT_TEAMS,
117+
microsoftTeams=input_types.MicrosoftTeamsFeedPropertiesInput(
118+
type=enums.FeedListingTypes.PAST,
119+
teamId=team_id,
120+
channelId=channel_id,
121+
refreshToken=refresh_token,
122+
readLimit=read_limit if read_limit is not None else 10
123+
),
124+
workflow=input_types.EntityReferenceInput(id=self.workflow_id) if self.workflow_id is not None else None,
125+
),
126+
correlation_id=self.correlation_id
127+
)
128+
129+
feed_id = response.create_feed.id if response.create_feed is not None else None
130+
131+
if feed_id is None:
132+
return None
133+
134+
logger.debug(f'Created feed [{feed_id}].')
135+
136+
# Wait for feed to complete, since ingestion happens asychronously
137+
done = False
138+
time.sleep(5)
139+
140+
while not done:
141+
done = await self.is_feed_done(feed_id)
142+
143+
if done is None:
144+
break
145+
146+
if not done:
147+
time.sleep(5)
148+
149+
logger.debug(f'Completed feed [{feed_id}].')
150+
except exceptions.GraphQLClientError as e:
151+
logger.error(str(e))
152+
raise ToolException(str(e)) from e
153+
154+
try:
155+
contents = await self.query_contents(feed_id)
156+
157+
results = []
158+
159+
for content in contents:
160+
results.extend(helpers.format_content(content))
161+
162+
text = "\n".join(results)
163+
164+
return text
165+
except exceptions.GraphQLClientError as e:
166+
logger.error(str(e))
167+
raise ToolException(str(e)) from e
168+
169+
def _run(self, team_name: Optional[str] = None, channel_name: Optional[str] = None, read_limit: Optional[int] = None) -> str:
170+
return helpers.run_async(self._arun, team_name, channel_name, read_limit)
171+
172+
async def is_feed_done(self, feed_id: str):
173+
if self.graphlit.client is None:
174+
return None
175+
176+
response = await self.graphlit.client.is_feed_done(feed_id)
177+
178+
return response.is_feed_done.result if response.is_feed_done is not None else None
179+
180+
async def query_contents(self, feed_id: str):
181+
if self.graphlit.client is None:
182+
return None
183+
184+
try:
185+
response = await self.graphlit.client.query_contents(
186+
filter=input_types.ContentFilter(
187+
feeds=[
188+
input_types.EntityReferenceFilter(
189+
id=feed_id
190+
)
191+
]
192+
)
193+
)
194+
195+
return response.contents.results if response.contents is not None else None
196+
except exceptions.GraphQLClientError as e:
197+
logger.error(str(e))
198+
return None

0 commit comments

Comments
 (0)