Skip to content

Commit 5b45346

Browse files
author
Johan Hermansson
committed
Merge pull request #13 from johandahlberg/quarantine_runfolders
Quarantine runfolders after pickup
2 parents 80f4062 + 82ca795 commit 5b45346

File tree

6 files changed

+84
-66
lines changed

6 files changed

+84
-66
lines changed

requirements/prod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
git+https://github.com/arteria-project/arteria-core.git@v1.0.1#egg=arteria-core
1+
git+https://github.com/arteria-project/arteria-core.git@v1.1.0#egg=arteria-core
22
jsonpickle==0.9.2
33
tornado==4.2.1
44
PyYAML==3.11

runfolder/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def start():
1313
routes = [
1414
(r"/api/1.0/runfolders", ListAvailableRunfoldersHandler, args),
1515
(r"/api/1.0/runfolders/next", NextAvailableRunfolderHandler, args),
16+
(r"/api/1.0/runfolders/pickup", PickupAvailableRunfolderHandler, args),
1617
(r"/api/1.0/runfolders/path(/.*)", RunfolderHandler, args),
1718
(r"/api/1.0/runfolders/test/markasready/path(/.*)", TestFakeSequencerReadyHandler, args)
1819
]

runfolder/handlers.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
2+
import tornado.web
3+
14
import arteria
5+
from arteria.web.state import State
6+
from arteria.exceptions import InvalidArteriaStateException
27
from arteria.web.handlers import BaseRestHandler
3-
from runfolder.services import *
4-
import tornado.web
58

9+
from runfolder.services import *
610

711
class BaseRunfolderHandler(BaseRestHandler):
812
"""Provides core logic for all runfolder handlers"""
@@ -40,7 +44,7 @@ def get(self):
4044
def get_runfolders():
4145
try:
4246
# TODO: This list should be paged. The unfiltered list can be large
43-
state = self.get_argument("state", RunfolderState.READY)
47+
state = self.get_argument("state", State.READY)
4448
if state == "*":
4549
state = None
4650
for runfolder_info in self.runfolder_svc.list_runfolders(state):
@@ -56,15 +60,32 @@ class NextAvailableRunfolderHandler(BaseRunfolderHandler):
5660
"""Handles fetching the next available runfolder"""
5761
def get(self):
5862
"""
59-
Returns the next runfolder to process. Note that it's currently assumed
60-
that only one process polls this endpoint. No locking mechanism is in place.
63+
Returns the next runfolder to process. Note that it will not lock the runfolder, and unless its
64+
state is changed by the polling client quickly enough it will be presented again.
6165
"""
6266
runfolder_info = self.runfolder_svc.next_runfolder()
6367
if runfolder_info:
6468
self.append_runfolder_link(runfolder_info)
6569
self.write_object(runfolder_info)
6670

6771

72+
class PickupAvailableRunfolderHandler(BaseRunfolderHandler):
73+
"""Handles fetching the next available runfolder"""
74+
def get(self):
75+
"""
76+
Returns the next runfolder to process and set it's state to PENDING.
77+
"""
78+
runfolder_info = self.runfolder_svc.next_runfolder()
79+
if runfolder_info:
80+
self.append_runfolder_link(runfolder_info)
81+
self.runfolder_svc.set_runfolder_state(runfolder_info.path, State.PENDING)
82+
runfolder_info.state = State.PENDING
83+
self.write_object(runfolder_info)
84+
else:
85+
self.set_status(204, reason="No ready runfolders available.")
86+
self.write(dict())
87+
88+
6889
class RunfolderHandler(BaseRunfolderHandler):
6990
"""Handles a particular runfolder, identified by path"""
7091
def get(self, path):
@@ -96,7 +117,7 @@ def post(self, path):
96117

97118
try:
98119
self.runfolder_svc.set_runfolder_state(path, state)
99-
except InvalidRunfolderState:
120+
except InvalidArteriaStateException:
100121
raise tornado.web.HTTPError(400, "The state '{}' is not valid".format(state))
101122

102123
@arteria.undocumented

runfolder/services.py

Lines changed: 13 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,8 @@
33
import logging
44
from runfolder import __version__ as version
55

6-
7-
class Enum(set):
8-
"""
9-
Defines an enumeration which values are a string representation of the
10-
specified attribute, i.e. EnumInstance.VAL1 == "VAL1"
11-
12-
Usage: EnumInstance = Enum(["VAL1", "VAL2"])
13-
14-
print EnumInstance.VAL1
15-
> "VAL1"
16-
17-
if "VAL3" not in EnumInstance:
18-
raise ...
19-
"""
20-
def __getattr__(self, name):
21-
if name in self:
22-
return name
23-
raise AttributeError
24-
25-
def __setattr__(self, key, value):
26-
raise NotImplementedError("Values cannot be set directly")
27-
28-
"""
29-
NONE: Not ready for processing or invalid
30-
READY: Ready for processing
31-
STARTED: Started processing the runfolder
32-
DONE: Done processing the runfolder
33-
ERROR: Started processing the runfolder but there was an error
34-
"""
35-
RunfolderState = Enum(["NONE", "READY", "STARTED", "DONE", "ERROR"])
36-
6+
from arteria.web.state import State
7+
from arteria.web.state import validate_state
378

389
class RunfolderInfo:
3910
"""
@@ -163,7 +134,7 @@ def get_runfolder_by_path(self, path):
163134
def _get_runfolder_state_from_state_file(self, runfolder):
164135
"""
165136
Reads the state in the state file at .arteria/state, returns
166-
RunfolderState.NONE if nothing is available
137+
State.NONE if nothing is available
167138
"""
168139
state_file = os.path.join(runfolder, ".arteria", "state")
169140
if self._file_exists(state_file):
@@ -172,34 +143,28 @@ def _get_runfolder_state_from_state_file(self, runfolder):
172143
state = state.strip()
173144
return state
174145
else:
175-
return RunfolderState.NONE
146+
return State.NONE
176147

177148
def get_runfolder_state(self, runfolder):
178149
"""
179150
Returns the state of a runfolder. The possible states are defined in
180-
RunfolderState
151+
State
181152
182153
If the file .arteria/state exists, it will determine the state. If it doesn't
183154
exist, the existence of the marker file RTAComplete.txt determines the state.
184155
"""
185156
state = self._get_runfolder_state_from_state_file(runfolder)
186-
if state == RunfolderState.NONE:
157+
if state == State.NONE:
187158
completed_marker = os.path.join(runfolder, "RTAComplete.txt")
188159
ready = self._file_exists(completed_marker)
189160
if ready:
190-
state = RunfolderState.READY
161+
state = State.READY
191162
return state
192163

193-
@staticmethod
194-
def validate_state(state):
195-
"""Raises InvalidRunfolderState if the state is not known"""
196-
if state not in RunfolderState:
197-
raise InvalidRunfolderState("The state '{}' is not valid".format(state))
198-
199164
@staticmethod
200165
def set_runfolder_state(runfolder, state):
201166
"""Sets the state of a runfolder"""
202-
RunfolderService.validate_state(state)
167+
validate_state(state)
203168
arteria_dir = os.path.join(runfolder, ".arteria")
204169
state_file = os.path.join(arteria_dir, "state")
205170
if not os.path.exists(arteria_dir):
@@ -210,8 +175,8 @@ def set_runfolder_state(runfolder, state):
210175
def is_runfolder_ready(self, directory):
211176
"""Returns True if the runfolder is ready"""
212177
state = self.get_runfolder_state(directory)
213-
self._logger.debug("Checking {0}. state={1}".format(directory, state))
214-
return state == RunfolderState.READY
178+
from arteria.testhelpers import TestFunctionDelta, BaseRestTest
179+
return state == State.READY
215180

216181
def _monitored_directories(self):
217182
"""Lists all directories monitored for new runfolders"""
@@ -225,7 +190,7 @@ def _monitored_directories(self):
225190

226191
def next_runfolder(self):
227192
"""Returns the next available runfolder. Returns None if there is none available."""
228-
available = self.list_runfolders(state=RunfolderState.READY)
193+
available = self.list_runfolders(state=State.READY)
229194
try:
230195
first = available.next()
231196
except StopIteration:
@@ -236,7 +201,7 @@ def next_runfolder(self):
236201
return first
237202

238203
def list_available_runfolders(self):
239-
return self.list_runfolders(RunfolderState.READY)
204+
return self.list_runfolders(State.READY)
240205

241206
def list_runfolders(self, state):
242207
"""
@@ -245,7 +210,7 @@ def list_runfolders(self, state):
245210
"""
246211
runfolders = self._enumerate_runfolders()
247212
if state:
248-
RunfolderService.validate_state(state)
213+
validate_state(state)
249214
return (runfolder for runfolder in runfolders if runfolder.state == state)
250215
else:
251216
return runfolders

runfolder_tests/integration/rest_tests.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import unittest
22
import time
3-
from arteria.testhelpers import TestFunctionDelta, BaseRestTest
43
import os
54
import logging
65
import requests
76
import jsonpickle
87
import mock
8+
import shutil
9+
10+
from arteria.testhelpers import TestFunctionDelta, BaseRestTest
11+
from arteria.web.state import State
12+
913

1014
log = logging.getLogger(__name__)
1115

@@ -85,6 +89,9 @@ def test_can_create_and_update_state(self):
8589
matching = [runfolder for runfolder in runfolders if runfolder["path"] == path]
8690
self.assertEqual(len(matching), 1)
8791

92+
# Remove the path created, so it does not interfere with other tests
93+
shutil.rmtree(path)
94+
8895
# TODO: Change state to "processing" and ensure it doesn't show up in /runfolders
8996
self.messages_logged.assert_changed_by_total(2)
9097

@@ -98,14 +105,39 @@ def test_updating_state_removes_runfolder_from_candidates(self):
98105
path = self._create_ready_runfolder()
99106
self.assertTrue(self._exists(path))
100107
# Mark the folder as processing
101-
self.post("./runfolders/path{}".format(path), {"state": "STARTED"}, expect=200)
108+
self.post("./runfolders/path{}".format(path), {"state": State.STARTED}, expect=200)
102109
# Ensure that the folder is not listed anymore:
103110
self.assertFalse(self._exists(path))
111+
# Remove the path created, so it does not interfere with other tests
112+
shutil.rmtree(path)
104113

105114
def test_invalid_state_is_not_accepted(self):
106115
path = self._create_ready_runfolder()
107116
self.assertTrue(self._exists(path))
108117
self.post("./runfolders/path{}".format(path), {"state": "NOT-AVAILABLE"}, expect=400)
118+
# Remove the path created, so it does not interfere with other tests
119+
shutil.rmtree(path)
120+
121+
def test_pickup_runfolder(self):
122+
path = self._create_ready_runfolder()
123+
self.assertTrue(self._exists(path))
124+
response = self.get("./runfolders/pickup", expect=200)
125+
response_json = jsonpickle.loads(response.text)
126+
self.assertEqual(response_json["path"], path)
127+
self.assertEqual(response_json["state"], State.PENDING)
128+
# Remove the path created, so it does not interfere with other tests
129+
shutil.rmtree(path)
130+
131+
132+
def test_next_runfolder(self):
133+
path = self._create_ready_runfolder()
134+
self.assertTrue(self._exists(path))
135+
response = self.get("./runfolders/next", expect=200)
136+
response_json = jsonpickle.loads(response.text)
137+
self.assertEqual(response_json["path"], path)
138+
self.assertEqual(response_json["state"], State.READY)
139+
# Remove the path created, so it does not interfere with other tests
140+
shutil.rmtree(path)
109141

110142
def _exists(self, path):
111143
resp = self.get("./runfolders")

runfolder_tests/unit/runfolder_tests.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import unittest
22
import logging
3-
from runfolder.services import RunfolderService, RunfolderState
3+
4+
from arteria.web.state import State
5+
6+
from runfolder.services import RunfolderService
7+
48

59
logger = logging.getLogger(__name__)
610

@@ -34,8 +38,8 @@ def test_list_available_runfolders(self):
3438
self.assertEqual(len(runfolders), 2)
3539

3640
runfolders_str = sorted([str(runfolder) for runfolder in runfolders])
37-
expected = ["READY: /data/testarteria1/mon1/runfolder001@localhost",
38-
"READY: /data/testarteria1/mon2/runfolder001@localhost"]
41+
expected = ["ready: /data/testarteria1/mon1/runfolder001@localhost",
42+
"ready: /data/testarteria1/mon2/runfolder001@localhost"]
3943
self.assertEqual(runfolders_str, expected)
4044

4145
def test_next_runfolder(self):
@@ -54,14 +58,9 @@ def test_next_runfolder(self):
5458

5559
# Test
5660
runfolder = runfolder_svc.next_runfolder()
57-
expected = "READY: /data/testarteria1/mon1/runfolder001@localhost"
61+
expected = "ready: /data/testarteria1/mon1/runfolder001@localhost"
5862
self.assertEqual(str(runfolder), expected)
5963

60-
def test_runfolder_state_cant_be_set(self):
61-
def assign_by_accident():
62-
RunfolderState.READY = "by accident"
63-
self.assertRaises(NotImplementedError, assign_by_accident)
64-
6564
def test_monitored_directory_validates(self):
6665
configuration_svc = dict()
6766
configuration_svc["monitored_directories"] = ["/data/testarteria1/runfolders"]

0 commit comments

Comments
 (0)