1
1
import asyncio
2
2
import getpass
3
-
3
+ import os
4
+ import json
4
5
import pyaudio
5
6
import oracledb
7
+ from datetime import datetime
6
8
import oci
7
9
from oci .config import from_file
8
10
from oci .auth .signers .security_token_signer import SecurityTokenSigner
11
13
RealtimeClientListener ,
12
14
RealtimeParameters ,
13
15
)
14
-
16
+ from aiohttp import web
17
+
18
+ # Global variables to store the latest data
19
+ latest_thetime = None
20
+ latest_question = None
21
+ latest_answer = None
22
+ compartment_id = os .getenv ('COMPARTMENT_ID' )
23
+ print (f"compartment_id: { compartment_id } " )
15
24
pw = getpass .getpass ("Enter database user password:" )
16
25
17
26
# Use this when making a connection with a wallet
18
27
connection = oracledb .connect (
19
28
user = "moviestream" ,
20
29
password = pw ,
21
30
dsn = "selectaidb_high" ,
22
- config_dir = "/ Users/pparkins/ Downloads/ Wallet_SelectAIDB" ,
23
- wallet_location = "/ Users/pparkins/ Downloads/ Wallet_SelectAIDB"
31
+ config_dir = r"C:\ Users\paulp\ Downloads\ Wallet_SelectAIDB" ,
32
+ wallet_location = r"C:\ Users\paulp\ Downloads\ Wallet_SelectAIDB"
24
33
)
25
- print ("Successfully connected to Oracle Database" )
26
- print (f"Connection details: { connection } " )
34
+ print (f"Successfully connected to Oracle Database Connection: { connection } " )
27
35
28
36
# Create a FIFO queue
29
37
queue = asyncio .Queue ()
43
51
last_result_time = None
44
52
45
53
def authenticator ():
46
- config = from_file ("~/.oci/config" , "paulspeechai " )
54
+ config = from_file ("~/.oci/config" , "MYSPEECHAIPROFILE " )
47
55
with open (config ["security_token_file" ], "r" ) as f :
48
56
token = f .readline ()
49
57
private_key = oci .signer .load_private_key_from_file (config ["key_file" ])
@@ -69,6 +77,7 @@ def audio_callback(in_data, frame_count, time_info, status):
69
77
70
78
stream .start_stream ()
71
79
config = from_file ()
80
+ isInsertResults = True
72
81
73
82
async def send_audio (client ):
74
83
while True :
@@ -86,10 +95,15 @@ def on_result(self, result):
86
95
print (f"Current cummulative result: { cummulativeResult } " )
87
96
if cummulativeResult .lower ().startswith ("select ai" ):
88
97
isSelect = True
98
+ elif cummulativeResult .lower ().startswith ("Select the eye" ):
99
+ isSelect = True
100
+ else :
101
+ cummulativeResult = ""
89
102
last_result_time = asyncio .get_event_loop ().time ()
90
103
else :
91
104
print (f"Received partial results: { result ['transcriptions' ][0 ]['transcription' ]} " )
92
105
106
+
93
107
def on_ack_message (self , ackmessage ):
94
108
return super ().on_ack_message (ackmessage )
95
109
@@ -113,54 +127,72 @@ async def check_idle():
113
127
isSelect = False
114
128
await asyncio .sleep (1 )
115
129
130
+ # Function to execute AI query and optionally insert results into the table
131
+ # For example Select AI I am looking for the top five selling movies for the latest month please
116
132
def executeSelectAI ():
117
- global cummulativeResult
133
+ global cummulativeResult , isInsertResults , latest_thetime , latest_question , latest_answer
118
134
print (f"executeSelectAI called cummulative result: { cummulativeResult } " )
119
- # for example prompt => 'select ai I am looking for the top 5 selling movies for the latest month please',
135
+
136
+ # AI query
120
137
query = """SELECT DBMS_CLOUD_AI.GENERATE(
121
138
prompt => :prompt,
122
139
profile_name => 'openai_gpt35',
123
140
action => 'narrate')
124
141
FROM dual"""
125
- with connection .cursor () as cursor :
126
- cursor .execute (query , prompt = cummulativeResult )
127
- result = cursor .fetchone ()
128
- if result and isinstance (result [0 ], oracledb .LOB ):
129
- text_result = result [0 ].read ()
130
- print (text_result )
131
- else :
132
- print (result )
142
+
143
+ try :
144
+ with connection .cursor () as cursor :
145
+ # Execute AI query
146
+ cursor .execute (query , prompt = cummulativeResult )
147
+ result = cursor .fetchone ()
148
+
149
+ if result and isinstance (result [0 ], oracledb .LOB ):
150
+ text_result = result [0 ].read ()
151
+ print (text_result )
152
+
153
+ # Update the global variables
154
+ latest_thetime = datetime .now ()
155
+ latest_question = cummulativeResult
156
+ latest_answer = text_result [:3000 ] # Truncate if necessary
157
+ cummulativeResult = ""
158
+
159
+ # Insert the prompt and result into the table if isInsertResults is True
160
+ if isInsertResults :
161
+ insert_query = """
162
+ INSERT INTO selectai_data (thetime, question, answer)
163
+ VALUES (:thetime, :question, :answer)
164
+ """
165
+ cursor .execute (insert_query , {
166
+ 'thetime' : latest_thetime ,
167
+ 'question' : latest_question ,
168
+ 'answer' : latest_answer
169
+ })
170
+ connection .commit ()
171
+ print ("Insert successful." )
172
+ else :
173
+ print (result )
174
+ except Exception as e :
175
+ print (f"An error occurred: { e } " )
176
+
133
177
# Reset cumulativeResult after execution
134
178
cummulativeResult = ""
135
179
180
+ async def handle_request (request ):
181
+ global latest_thetime , latest_question , latest_answer
182
+ data = {
183
+ "thetime" : latest_thetime .isoformat () if latest_thetime else None , # Convert datetime to ISO format
184
+ "question" : latest_question ,
185
+ "answer" : latest_answer
186
+ }
187
+ return web .json_response (data )
136
188
137
- # logic such as the following could be added to make the app further dynamic as far as action type...
138
- # actionValue = 'narrate'
139
- # if cummulativeResult.lower().startswith("select ai narrate"):
140
- # actionValue = "narrate"
141
- # elif cummulativeResult.lower().startswith("select ai chat"):
142
- # actionValue = "chat"
143
- # elif cummulativeResult.lower().startswith("select ai showsql"):
144
- # actionValue = "showsql"
145
- # elif cummulativeResult.lower().startswith("select ai show sql"):
146
- # actionValue = "showsql"
147
- # elif cummulativeResult.lower().startswith("select ai runsql"):
148
- # actionValue = "runsql"
149
- # elif cummulativeResult.lower().startswith("select ai run sql"):
150
- # actionValue = "runsql"
151
- # # Note that "runsql" is not currently supported as action value
152
- # query = """SELECT DBMS_CLOUD_AI.GENERATE(
153
- # prompt => :prompt,
154
- # profile_name => 'openai_gpt35',
155
- # action => :actionValue)
156
- # FROM dual"""
157
189
158
190
if __name__ == "__main__" :
159
191
# Run the event loop
160
192
def message_callback (message ):
161
193
print (f"Received message: { message } " )
162
194
163
- realtime_speech_parameters : RealtimeParameters = RealtimeParameters ()
195
+ realtime_speech_parameters = RealtimeParameters ()
164
196
realtime_speech_parameters .language_code = "en-US"
165
197
realtime_speech_parameters .model_domain = (
166
198
realtime_speech_parameters .MODEL_DOMAIN_GENERIC
@@ -179,15 +211,24 @@ def message_callback(message):
179
211
listener = SpeechListener (),
180
212
service_endpoint = realtime_speech_url ,
181
213
signer = authenticator (),
182
- compartment_id = "ocid1.compartment.oc1..MYCOMPARMENTID" ,
214
+ compartment_id = compartment_id ,
183
215
)
184
216
185
217
loop = asyncio .get_event_loop ()
186
218
loop .create_task (send_audio (client ))
187
219
loop .create_task (check_idle ())
220
+
221
+ # Set up the HTTP server
222
+ app = web .Application ()
223
+ app .router .add_get ('/selectai_data' , handle_request )
224
+ runner = web .AppRunner (app )
225
+ loop .run_until_complete (runner .setup ())
226
+ site = web .TCPSite (runner , 'localhost' , 8080 )
227
+ loop .run_until_complete (site .start ())
228
+
188
229
loop .run_until_complete (client .connect ())
189
230
190
231
if stream .is_active ():
191
232
stream .close ()
192
233
193
- print ("Closed now" )
234
+ print ("Closed now" )
0 commit comments