1
- package uploader
1
+ package service
2
2
3
3
import (
4
4
"bufio"
5
5
"encoding/json"
6
6
"fmt"
7
+ "os"
8
+ "strings"
9
+ "sync"
10
+
7
11
"github.com/TRON-US/btfs-migration-toolkit/constants"
8
12
"github.com/TRON-US/btfs-migration-toolkit/core"
9
13
"github.com/TRON-US/btfs-migration-toolkit/log"
10
14
"github.com/TRON-US/soter-sdk-go/soter"
11
- "os"
12
- "strings"
13
- "sync"
15
+
16
+ "github.com/ipfs/go-ipfs-api"
14
17
)
15
18
16
19
func BatchUpload (inputFilename string ) {
17
- batchSize := core .Conf .BatchSize
18
-
19
20
inputHashFile , err := os .Open (inputFilename )
20
21
if err != nil {
21
22
log .Logger ().Error (fmt .Sprintf ("Failed to open file %s, reason=[%v]" , inputFilename , err ))
@@ -27,7 +28,7 @@ func BatchUpload(inputFilename string) {
27
28
}
28
29
}()
29
30
30
- outputHashFile , err := os .Create (fmt . Sprintf ( "./%s" , constants .OutputHashFileName ) )
31
+ outputHashFile , err := os .Create (constants .OutputHashFileName )
31
32
if err != nil {
32
33
log .Logger ().Error (fmt .Sprintf ("Failed to open file %s, reason=[%v]" , constants .OutputHashFileName , err ))
33
34
os .Exit (1 )
@@ -38,10 +39,18 @@ func BatchUpload(inputFilename string) {
38
39
constants .OutputHashFileName , err ))
39
40
}
40
41
}()
41
- outputRetryFile , err := os .Create (fmt .Sprintf ("./%s" , constants .OutputRetryFileName ))
42
+
43
+ outputRetryFile , err := os .Create (constants .OutputRetryFileName )
42
44
if err != nil {
43
- log .Logger ().Error (fmt .Sprintf ("Failed to open %s, reason=[%v]" , constants .OutputRetryFileName , err ))
45
+ log .Logger ().Error (fmt .Sprintf ("Failed to open file %s, reason=[%v]" , constants .OutputRetryFileName , err ))
46
+ os .Exit (1 )
44
47
}
48
+ defer func () {
49
+ if err := outputRetryFile .Close (); err != nil {
50
+ log .Logger ().Error (fmt .Sprintf ("Failed to close file %s, reason=[%v]" ,
51
+ constants .OutputRetryFileName , err ))
52
+ }
53
+ }()
45
54
46
55
wg := sync.WaitGroup {}
47
56
scanner := bufio .NewScanner (inputHashFile )
@@ -50,44 +59,50 @@ func BatchUpload(inputFilename string) {
50
59
hash := scanner .Text ()
51
60
wg .Add (1 )
52
61
counter ++
53
- go func (h string , outFile * os. File , retryFile * os.File ) {
62
+ go func (h string , outFile , retryFile * os.File ) {
54
63
defer wg .Done ()
55
64
res , err := migrate (h )
56
65
if err != nil {
57
- log .Logger ().Error (fmt .Sprintf ("ipfs_hash=%s, reason=[%v]" , h , err ))
66
+ log .Logger ().Error (fmt .Sprintf ("[ ipfs_hash=%s] Failed to migrate , reason=[%v]" , h , err ))
58
67
// definitely failed to upload through soter; write to output_retry.csv
59
68
_ , err = fmt .Fprintln (retryFile , h )
60
69
if err != nil {
61
- errMsg := fmt .Sprintf ("Failed to write to file %s, hash= %s, reason=[%v]" ,
62
- constants .OutputRetryFileName , h , err )
70
+ errMsg := fmt .Sprintf ("[ipfs_hash=%s] Failed to write to file %s, reason=[%v]" ,
71
+ h , constants .OutputRetryFileName , err )
63
72
log .Logger ().Error (errMsg )
64
73
}
65
74
return
66
75
}
67
- log .Logger ().Debug (fmt .Sprintf ("[%s,%s,%s]" , h , res [0 ], res [1 ]))
68
- // write <ipfs_hash, request_id, btfs_hash> to output_hash.csv
76
+ // write (ipfs_hash, request_id, btfs_hash) to output_hash.csv
77
+ log .Logger ().Debug (fmt .Sprintf ("[ipfs_hash=%s] Write to file %s, (%s,%s,%s)" ,
78
+ h , constants .OutputHashFileName , h , res [0 ], res [1 ]))
69
79
line := fmt .Sprintf ("%s,%s,%s" , h , res [0 ], res [1 ])
70
80
_ , err = fmt .Fprintln (outFile , line )
71
81
if err != nil {
72
82
log .Logger ().Error (err .Error ())
73
83
}
74
84
}(hash , outputHashFile , outputRetryFile )
75
- if counter % batchSize == 0 {
85
+ if counter % core . Conf . BatchSize == 0 {
76
86
wg .Wait ()
77
87
counter = 0
78
88
}
79
89
}
80
90
// wait here because counter < batchSize and no more lines to read
81
91
wg .Wait ()
82
92
if err := scanner .Err (); err != nil {
83
- log .Logger ().Error (err .Error ())
93
+ errMsg := fmt .Sprintf ("Failed to scan input file, reason=[%v]" , err )
94
+ log .Logger ().Error (errMsg )
84
95
}
96
+
97
+ fmt .Printf ("\n Migration complete.\n " +
98
+ "Please checkout %s and %s for batch migration.\n " ,
99
+ constants .OutputHashFileName , constants .OutputRetryFileName )
85
100
}
86
101
87
102
func SingleUpload (ipfsHash string ) {
88
103
res , err := migrate (ipfsHash )
89
104
if err != nil {
90
- log .Logger ().Error (err . Error ( ))
105
+ log .Logger ().Error (fmt . Sprintf ( "[ipfs_hash=%s] Failed to migrate, reason=[%v]" , ipfsHash , err ))
91
106
os .Exit (1 )
92
107
}
93
108
fmt .Printf ("IPFS hash: %s\n " , ipfsHash )
@@ -97,12 +112,11 @@ func SingleUpload(ipfsHash string) {
97
112
98
113
func migrate (ipfsHash string ) ([]string , error ) {
99
114
if ! strings .HasPrefix (ipfsHash , "Qm" ) {
100
- errMsg := fmt .Sprintf ("input with invalid IPFS hash [%s]" , ipfsHash )
101
- log .Logger ().Debug (errMsg )
115
+ errMsg := fmt .Sprintf ("[ipfs_hash=%s] Input with invalid ipfs hash" , ipfsHash )
102
116
return nil , fmt .Errorf (errMsg )
103
117
}
104
118
// download file from IPFS network to local file system
105
- log .Logger ().Debug (fmt .Sprintf ("downloading file from IPFS network, %s " , ipfsHash ))
119
+ log .Logger ().Debug (fmt .Sprintf ("[ipfs_hash=%s] Downloading the file from IPFS network" , ipfsHash ))
106
120
if err := downloadFromIPFS (ipfsHash ); err != nil {
107
121
return nil , err
108
122
}
@@ -111,12 +125,13 @@ func migrate(ipfsHash string) ([]string, error) {
111
125
defer func (h string ) {
112
126
// delete local files
113
127
if err := os .Remove (fmt .Sprintf ("./%s" , h )); err != nil {
114
- errMsg := fmt .Sprintf ("Failed to delete file %s " , h )
128
+ errMsg := fmt .Sprintf ("[ipfs_hash=%s] Failed to delete local file " , h )
115
129
log .Logger ().Error (errMsg )
116
130
}
117
131
}(ipfsHash )
118
132
119
133
// upload the file to BTFS through soter
134
+ log .Logger ().Debug (fmt .Sprintf ("[ipfs_hash=%s] Uploading the file to BTFS network" , ipfsHash ))
120
135
res , err := uploadToBTFS (ipfsHash )
121
136
if err != nil {
122
137
return nil , err
@@ -126,8 +141,9 @@ func migrate(ipfsHash string) ([]string, error) {
126
141
127
142
func downloadFromIPFS (hash string ) error {
128
143
// go-ipfs-api, sdk: get
129
- if err := core .Sh .Get (hash , hash ); err != nil {
130
- return err
144
+ sh := shell .NewShell (core .Conf .IpfsUrl )
145
+ if err := sh .Get (hash , hash ); err != nil {
146
+ return fmt .Errorf ("downloading from IPFS errors out, %v" , err )
131
147
}
132
148
133
149
return nil
@@ -138,27 +154,24 @@ func uploadToBTFS(filename string) ([]string, error) {
138
154
filePath := fmt .Sprintf ("./%s" , filename )
139
155
resp , err := sh .AddFile (core .Conf .UserAddress , filePath )
140
156
if err != nil {
141
- log . Logger (). Error ( err . Error () )
142
- return nil , err
157
+ errMsg := fmt . Sprintf ( "failed to add file, reason=[%v]" , err )
158
+ return nil , fmt . Errorf ( errMsg )
143
159
}
144
160
if resp .Code != constants .OkCode {
145
- errMsg := fmt .Sprintf ("Error: code=%d, message=%s" , resp .Code , resp .Message )
146
- log .Logger ().Error (errMsg )
161
+ errMsg := fmt .Sprintf ("response code error: code=%d, message=%s" , resp .Code , resp .Message )
147
162
if resp .Code == constants .InsufficientBalanceCode {
148
163
os .Exit (1 )
149
164
}
150
165
return nil , fmt .Errorf (errMsg )
151
166
}
152
167
s , err := json .Marshal (resp .Data )
153
168
if err != nil {
154
- log .Logger ().Error (err .Error ())
155
- return nil , err
169
+ return nil , fmt .Errorf ("failed to marshal response data, %v" , err )
156
170
}
157
- var soterResponse core.SoterResponse
171
+ var soterResponse core.SoterAddFileResponse
158
172
err = json .Unmarshal (s , & soterResponse )
159
173
if err != nil {
160
- log .Logger ().Error (err .Error ())
161
- return nil , err
174
+ return nil , fmt .Errorf ("failed to unmarshal soter response data, %v" , err )
162
175
}
163
176
res := [... ]string {soterResponse .RequestId , soterResponse .Cid }
164
177
return res [:], nil
0 commit comments