Skip to content

Commit 48e99fb

Browse files
committed
More Fusion fixes and testing
1 parent 4d3bec5 commit 48e99fb

File tree

7 files changed

+368
-76
lines changed

7 files changed

+368
-76
lines changed

docs/src/post-process-html

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ for file in sorted(txt_files):
5555
# Change ShowAccessor to Connection.show
5656
txt = re.sub(r'>ShowAccessor\.', r'>Connection.show.', txt)
5757

58+
# Change workspace.Stages to workspace.stages
59+
txt = re.sub(r'>workspace\.Stages\.', r'>workspace.stages.', txt)
60+
5861
# Fix class / method links
5962
txt = re.sub(
6063
r'(<a\s+[^>]+>)?(\s*<code[^>]*>\s*<span\s+class="pre">\s*)([\w\.]+)(\s*</span>\s*</code>)',

singlestoredb/fusion/handler.py

+23-9
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
from ..connection import Connection
2121

2222
CORE_GRAMMAR = r'''
23-
ws = ~r"(\s*(/\*.*\*/)*\s*)*"
23+
ws = ~r"(\s+|(\s*/\*.*\*/\s*)+)"
2424
qs = ~r"\"([^\"]*)\"|'([^\']*)'|`([^\`]*)`|([A-Za-z0-9_\-\.]+)"
2525
number = ~r"[-+]?(\d*\.)?\d+(e[-+]?\d+)?"i
2626
integer = ~r"-?\d+"
27-
comma = ws "," ws
28-
open_paren = ws "(" ws
29-
close_paren = ws ")" ws
30-
select = ~r"SELECT"i ws ~r".+" ws
27+
comma = ws* "," ws*
28+
eq = ws* "=" ws*
29+
open_paren = ws* "(" ws*
30+
close_paren = ws* ")" ws*
31+
select = ~r"SELECT"i ws+ ~r".+" ws*
3132
'''
3233

3334
BUILTINS = {
@@ -56,7 +57,7 @@
5657

5758
def get_keywords(grammar: str) -> Tuple[str, ...]:
5859
"""Return all all-caps words from the beginning of the line."""
59-
m = re.match(r'^\s*((?:[A-Z0-9_]+|=)(\s+|$|;))+', grammar)
60+
m = re.match(r'^\s*((?:[A-Z0-9_]+)(\s+|$|;))+', grammar)
6061
if not m:
6162
return tuple()
6263
return tuple(re.split(r'\s+', m.group(0).replace(';', '').strip()))
@@ -86,7 +87,7 @@ def process_alternates(m: Any) -> str:
8687
def process_repeats(m: Any) -> str:
8788
"""Add repeated patterns."""
8889
sql = m.group(1).strip()
89-
return f'open_paren? {sql} ws ( comma {sql} ws )* close_paren?'
90+
return f'open_paren? {sql} ws* ( comma {sql} ws* )* close_paren?'
9091

9192

9293
def lower_and_regex(m: Any) -> str:
@@ -277,7 +278,7 @@ def process_grammar(grammar: str) -> Tuple[Grammar, Tuple[str, ...], Dict[str, A
277278
sql = re.sub(r"'[^']+'", r'qs', sql)
278279

279280
# Convert special characters to literal tokens
280-
sql = re.sub(r'([=]) ', r" '\1' ", sql)
281+
sql = re.sub(r'([=]) ', r' eq ', sql)
281282

282283
# Convert [...] groups to (...)*
283284
sql = re.sub(r'\[([^\]]+)\]', process_optional, sql)
@@ -310,6 +311,15 @@ def process_grammar(grammar: str) -> Tuple[Grammar, Tuple[str, ...], Dict[str, A
310311

311312
# Make sure every operation ends with ws
312313
sql = re.sub(r'\s+ws\s+ws$', r' ws', sql + ' ws')
314+
sql = re.sub(r'(\s+ws)*\s+ws\*$', r' ws*', sql)
315+
sql = re.sub(r'\s+ws$', r' ws*', sql)
316+
sql = re.sub(r'\s+ws\s+\(', r' ws* (', sql)
317+
sql = re.sub(r'\)\s+ws\s+', r') ws* ', sql)
318+
sql = re.sub(r'\s+ws\s+', r' ws+ ', sql)
319+
sql = re.sub(r'\?\s+ws\+', r'? ws*', sql)
320+
321+
# Remove extra ws around eq
322+
sql = re.sub(r'ws\+\s*eq\b', r'eq', sql)
313323

314324
out.append(f'{op} = {sql}')
315325

@@ -324,7 +334,7 @@ def process_grammar(grammar: str) -> Tuple[Grammar, Tuple[str, ...], Dict[str, A
324334
rules[k] = v
325335

326336
cmds = ' / '.join(x for x in rules if x.endswith('_cmd'))
327-
cmds = f'init = ws ( {cmds} ) ws ";"? ws\n'
337+
cmds = f'init = ws* ( {cmds} ) ws* ";"? ws*\n'
328338

329339
return Grammar(cmds + CORE_GRAMMAR + '\n'.join(out)), command_key, rule_info, help_txt
330340

@@ -500,6 +510,10 @@ def visit_ws(self, node: Node, visited_children: Iterable[Any]) -> Any:
500510
"""Whitespace and comments."""
501511
return
502512

513+
def visit_eq(self, node: Node, visited_children: Iterable[Any]) -> Any:
514+
"""Equals sign."""
515+
return
516+
503517
def visit_comma(self, node: Node, visited_children: Iterable[Any]) -> Any:
504518
"""Single comma."""
505519
return
+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
#!/usr/bin/env python3
2+
from typing import Any
3+
from typing import Dict
4+
from typing import Optional
5+
6+
from .. import result
7+
from ..handler import SQLHandler
8+
from ..result import FusionSQLResult
9+
from .utils import dt_isoformat
10+
from .utils import get_workspace_group
11+
12+
13+
class ShowStageFilesHandler(SQLHandler):
14+
"""
15+
SHOW STAGE FILES IN GROUP { group_id | group_name } [ at_path ]
16+
[ <like> ] [ <order-by> ] [ <limit> ] [ recursive ] [ extended ];
17+
18+
group_id = ID '<group-id>'
19+
group_name = '<group-name>'
20+
at_path = AT '<path>'
21+
recursive = RECURSIVE
22+
extended = EXTENDED
23+
24+
"""
25+
26+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
27+
wg = get_workspace_group(params)
28+
29+
res = FusionSQLResult()
30+
res.add_field('Name', result.STRING)
31+
32+
if params['extended']:
33+
res.add_field('Type', result.STRING)
34+
res.add_field('Size', result.INTEGER)
35+
res.add_field('Writable', result.STRING)
36+
res.add_field('CreatedAt', result.DATETIME)
37+
res.add_field('LastModifiedAt', result.DATETIME)
38+
39+
files = []
40+
for x in wg.stages.listdir(
41+
params['at_path'] or '/',
42+
recursive=params['recursive'],
43+
):
44+
info = wg.stages.info(x)
45+
files.append(
46+
tuple([
47+
x, info.type, info.size or 0, info.writable,
48+
dt_isoformat(info.created_at),
49+
dt_isoformat(info.last_modified_at),
50+
]),
51+
)
52+
res.set_rows(files)
53+
54+
else:
55+
res.set_rows([(x,) for x in wg.stages.listdir(
56+
params['at_path'] or '/',
57+
recursive=params['recursive'],
58+
)])
59+
60+
if params['like']:
61+
res = res.like(Name=params['like'])
62+
63+
return res.order_by(**params['order_by']).limit(params['limit'])
64+
65+
66+
ShowStageFilesHandler.register(overwrite=True)
67+
68+
69+
class UploadStageFileHandler(SQLHandler):
70+
"""
71+
UPLOAD FILE local_path TO STAGE stage_path
72+
IN GROUP { group_id | group_name } [ overwrite ];
73+
74+
local_path = '<local-path>'
75+
stage_path = '<stage-path>'
76+
group_id = ID '<group-id>'
77+
group_name = '<group-name>'
78+
overwrite = OVERWRITE
79+
80+
"""
81+
82+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
83+
wg = get_workspace_group(params)
84+
wg.stages.upload_file(
85+
params['local_path'], params['stage_path'],
86+
overwrite=params['overwrite'],
87+
)
88+
return None
89+
90+
91+
UploadStageFileHandler.register()
92+
93+
94+
class DownloadStageFileHandler(SQLHandler):
95+
"""
96+
DOWNLOAD STAGE FILE stage_path IN GROUP { group_id | group_name }
97+
[ local_path ] [ overwrite ] [ encoding ];
98+
99+
stage_path = '<stage-path>'
100+
group_id = ID '<group-id>'
101+
group_name = '<group-name>'
102+
local_path = TO '<local-path>'
103+
overwrite = OVERWRITE
104+
encoding = ENCODING '<encoding>'
105+
106+
"""
107+
108+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
109+
wg = get_workspace_group(params)
110+
111+
out = wg.stages.download(
112+
params['stage_path'],
113+
local_path=params['local_path'] or None,
114+
overwrite=params['overwrite'],
115+
encoding=params['encoding'] or None,
116+
)
117+
118+
if not params['local_path']:
119+
res = FusionSQLResult()
120+
if params['encoding']:
121+
res.add_field('Data', result.STRING)
122+
else:
123+
res.add_field('Data', result.BLOB)
124+
res.set_rows([(out,)])
125+
return res
126+
127+
return None
128+
129+
130+
DownloadStageFileHandler.register()
131+
132+
133+
class DropStageFileHandler(SQLHandler):
134+
"""
135+
DROP STAGE FILE stage_path IN GROUP { group_id | group_name };
136+
137+
stage_path = '<stage-path>'
138+
group_id = ID '<group-id>'
139+
group_name = '<group-name>'
140+
141+
"""
142+
143+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
144+
wg = get_workspace_group(params)
145+
wg.stages.remove(params['stage_path'])
146+
return None
147+
148+
149+
DropStageFileHandler.register()
150+
151+
152+
class DropStageFolderHandler(SQLHandler):
153+
"""
154+
DROP STAGE FOLDER stage_path IN GROUP { group_id | group_name } [ recursive ];
155+
156+
stage_path = '<stage-path>'
157+
group_id = ID '<group-id>'
158+
group_name = '<group-name>'
159+
recursive = RECURSIVE
160+
161+
"""
162+
163+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
164+
wg = get_workspace_group(params)
165+
if params['recursive']:
166+
wg.stages.rmdir(params['stage_path'])
167+
else:
168+
wg.stages.removedirs(params['stage_path'])
169+
return None
170+
171+
172+
DropStageFolderHandler.register()
173+
174+
175+
class CreateStageFolderHandler(SQLHandler):
176+
"""
177+
CREATE STAGE FOLDER stage_path IN GROUP { group_id | group_name } [ overwrite ];
178+
179+
stage_path = '<stage-path>'
180+
group_id = ID '<group-id>'
181+
group_name = '<group-name>'
182+
overwrite = OVERWRITE
183+
184+
"""
185+
186+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
187+
wg = get_workspace_group(params)
188+
wg.stages.mkdir(params['stage_path'], overwrite=params['overwrite'])
189+
return None
190+
191+
192+
CreateStageFolderHandler.register()
+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#!/usr/bin/env python
2+
import datetime
3+
import os
4+
from typing import Any
5+
from typing import Dict
6+
from typing import Optional
7+
from urllib.parse import urlparse
8+
9+
import jwt
10+
11+
from ...config import get_option
12+
from ...management import manage_workspaces
13+
from ...management.workspace import WorkspaceGroup
14+
from ...management.workspace import WorkspaceManager
15+
16+
17+
def get_management_token() -> Optional[str]:
18+
"""Return the token for the Management API."""
19+
# See if an API key is configured
20+
tok = get_option('management.token')
21+
if tok:
22+
return tok
23+
24+
# See if the connection URL contains a JWT
25+
url = os.environ.get('SINGLESTOREDB_URL')
26+
if not url:
27+
return None
28+
29+
urlp = urlparse(url, scheme='singlestoredb', allow_fragments=True)
30+
if urlp.password:
31+
try:
32+
jwt.decode(urlp.password, options={'verify_signature': False})
33+
return urlp.password
34+
except jwt.DecodeError:
35+
pass
36+
37+
# Didn't find a key anywhere
38+
return None
39+
40+
41+
def get_workspace_manager() -> WorkspaceManager:
42+
"""Return a new workspace manager."""
43+
return manage_workspaces(get_management_token())
44+
45+
46+
def dt_isoformat(dt: Optional[datetime.datetime]) -> Optional[str]:
47+
"""Convert datetime to string."""
48+
if dt is None:
49+
return None
50+
return dt.isoformat()
51+
52+
53+
def get_workspace_group(params: Dict[str, Any]) -> WorkspaceGroup:
54+
"""Find a workspace group matching group_id or group_name."""
55+
manager = get_workspace_manager()
56+
57+
if params['group_name']:
58+
workspace_groups = [
59+
x for x in manager.workspace_groups
60+
if x.name == params['group_name']
61+
]
62+
63+
if not workspace_groups:
64+
raise KeyError(
65+
'no workspace group found with name "{}"'.format(params['group_name']),
66+
)
67+
68+
if len(workspace_groups) > 1:
69+
ids = ', '.join(x.id for x in workspace_groups)
70+
raise ValueError(
71+
f'more than one workspace group with given name was found: {ids}',
72+
)
73+
74+
return workspace_groups[0]
75+
76+
return manager.get_workspace_group(params['group_id'])

0 commit comments

Comments
 (0)