1
1
"""notion target sink class, which handles writing streams."""
2
2
3
3
from __future__ import annotations
4
-
4
+ import os
5
5
from caseconverter import snakecase
6
6
from notion_client import Client
7
7
from notion_client .errors import HTTPResponseError
@@ -18,6 +18,14 @@ def __init__(self, **kwargs) -> None: # noqa: ANN003
18
18
"""Initialize the sink."""
19
19
super ().__init__ (** kwargs )
20
20
self .client = Client (auth = self .config ["api_key" ])
21
+ database_mapping = {
22
+ mapping ["extractor_namespace" ]: {mapping ["stream_name" ]: mapping ["database_id" ]}
23
+ for mapping in self .config ["streams" ]
24
+ }
25
+ self .database_id = database_mapping .get (os .environ .get ("MELTANO_EXTRACTOR_NAMESPACE" ), {}).get (self .stream_name )
26
+ if not self .database_id :
27
+ msg = f"Database ID not found for stream { self .stream_name } ."
28
+ raise ValueError (msg )
21
29
self .database_schema = self .get_database_schema ()
22
30
self .key_property = self .key_properties [0 ]
23
31
self .snake_key_property = snakecase (self .key_property )
@@ -58,7 +66,7 @@ def get_existing_pages(self, records: list[dict]) -> list:
58
66
existing_pages = {}
59
67
while has_more :
60
68
pages = self .client .databases .query (
61
- database_id = self .config [ " database_id" ] ,
69
+ database_id = self .database_id ,
62
70
start_cursor = start_cursor ,
63
71
filter_properties = [],
64
72
filter = _filter ,
@@ -82,7 +90,7 @@ def create_page(self, record: dict) -> None:
82
90
context: Stream partition or context dictionary.
83
91
"""
84
92
self .client .pages .create (
85
- parent = {"database_id" : self .config [ " database_id" ] },
93
+ parent = {"database_id" : self .database_id },
86
94
properties = self .create_page_properties (record ),
87
95
)
88
96
@@ -92,7 +100,7 @@ def get_database_schema(self) -> dict:
92
100
Returns:
93
101
dict: The database schema.
94
102
"""
95
- db = self .client .databases .retrieve (self .config [ " database_id" ] )
103
+ db = self .client .databases .retrieve (self .database_id )
96
104
return {
97
105
snakecase (name ): {"name" : name , "type" : _property ["type" ]} for name , _property in db ["properties" ].items ()
98
106
}
0 commit comments