Skip to content

Build #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ fabric.properties
target

pom-development.xml
.vscode/settings.json
9 changes: 4 additions & 5 deletions src/main/java/cws/k8s/scheduler/model/SchedulerConfig.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package cws.k8s.scheduler.model;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.List;
import java.util.Map;

@ToString
@NoArgsConstructor(access = AccessLevel.PRIVATE,force = true)
Expand All @@ -17,10 +19,7 @@ public class SchedulerConfig {
public final String namespace;
public final String costFunction;
public final String strategy;

public final Integer maxCopyTasksPerNode;

public final Integer maxWaitingCopyTasksPerNode;
public final Map<String, JsonNode> additional;

@ToString
@NoArgsConstructor(access = AccessLevel.PRIVATE,force = true)
Expand All @@ -30,4 +29,4 @@ public static class VolumeClaim {
public final String subPath;
}

}
}
2 changes: 2 additions & 0 deletions src/main/java/cws/k8s/scheduler/model/TaskInput.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cws.k8s.scheduler.model;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;

Expand All @@ -10,6 +11,7 @@
/**
* Only for testing
*/
@Getter
@RequiredArgsConstructor( access = AccessLevel.PACKAGE )
public class TaskInput {

Expand Down
17 changes: 15 additions & 2 deletions src/main/java/cws/k8s/scheduler/rest/SchedulerRestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import cws.k8s.scheduler.dag.Vertex;
import cws.k8s.scheduler.model.SchedulerConfig;
import cws.k8s.scheduler.model.TaskConfig;
import cws.k8s.scheduler.scheduler.NodeLabelAssign;
import cws.k8s.scheduler.scheduler.PrioritizeAssignScheduler;
import cws.k8s.scheduler.scheduler.Scheduler;
import cws.k8s.scheduler.scheduler.prioritize.*;
import cws.k8s.scheduler.scheduler.nodeassign.FairAssign;
import cws.k8s.scheduler.scheduler.nodeassign.LabelAssign;
import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign;
import cws.k8s.scheduler.scheduler.nodeassign.RandomNodeAssign;
import cws.k8s.scheduler.scheduler.nodeassign.RoundRobinAssign;
Expand All @@ -32,6 +34,8 @@
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this unused import


@RestController
@Slf4j
@EnableScheduling
Expand Down Expand Up @@ -99,16 +103,25 @@ ResponseEntity<String> registerScheduler(

Scheduler scheduler;

ObjectMapper objectMapper = new ObjectMapper();
Map<String,String> nodelabel = objectMapper.convertValue(config.additional.get("myconfig"),Map.class);

if ( schedulerHolder.containsKey( execution ) ) {
return noSchedulerFor( execution );
}

switch ( strategy.toLowerCase() ){
default: {
final String[] split = strategy.split( "-" );
case "nodelabelassign":
Prioritize prioritize;
NodeAssign labelassign;
NodeAssign assign;
prioritize = new RankMaxPrioritize();
labelassign = new LabelAssign(nodelabel);
assign = new FairAssign();
scheduler = new NodeLabelAssign(execution, client, namespace, config, prioritize, labelassign, assign);
break;
default: {
final String[] split = strategy.split( "-" );
if ( split.length <= 2 ) {
switch ( split[0].toLowerCase() ) {
case "fifo": prioritize = new FifoPrioritize(); break;
Expand Down
97 changes: 97 additions & 0 deletions src/main/java/cws/k8s/scheduler/scheduler/NodeLabelAssign.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package cws.k8s.scheduler.scheduler;

import cws.k8s.scheduler.model.*;
import cws.k8s.scheduler.scheduler.prioritize.Prioritize;
import cws.k8s.scheduler.client.Informable;
import cws.k8s.scheduler.client.KubernetesClient;
import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign;
import cws.k8s.scheduler.util.NodeTaskAlignment;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;

@Slf4j
public class NodeLabelAssign extends Scheduler {

private final Prioritize prioritize;
private final NodeAssign nodeAssigner;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name this defaultNodeAssigner or fallbackNodeAssigner to clearify its function

private final NodeAssign nodeLabelAssigner;

public NodeLabelAssign( String execution,
KubernetesClient client,
String namespace,
SchedulerConfig config,
Prioritize prioritize,
NodeAssign nodeLabelAssigner,
NodeAssign nodeAssigner ) {
super(execution, client, namespace, config);
this.prioritize = prioritize;
this.nodeLabelAssigner = nodeLabelAssigner;
this.nodeAssigner = nodeAssigner;
nodeAssigner.registerScheduler( this );
if ( nodeAssigner instanceof Informable ){
client.addInformable( (Informable) nodeAssigner );
}
}

@Override
public void close() {
super.close();
if ( nodeAssigner instanceof Informable ){
client.removeInformable( (Informable) nodeAssigner );
}
}

@Override
public ScheduleObject getTaskNodeAlignment(
final List<Task> unscheduledTasks,
final Map<NodeWithAlloc, Requirements> availableByNode
){
long start = System.currentTimeMillis();
if ( traceEnabled ) {
int index = 1;
for ( Task unscheduledTask : unscheduledTasks ) {
unscheduledTask.getTraceRecord().setSchedulerPlaceInQueue( index++ );
}
}
prioritize.sortTasks( unscheduledTasks );

// print Tasks
System.out.println("Tasks before Label Alignment");
unscheduledTasks.stream().map(obj -> obj.getConfig().getName()).forEach(System.out::println);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not log this in stable version

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you use Stdout.
Do not log this at all, or log at debug level if you need


// first alignemnt
List<NodeTaskAlignment> alignmentLabelAssign = nodeLabelAssigner.getTaskNodeAlignment(unscheduledTasks, availableByNode);
List<String> namesList = alignmentLabelAssign.stream().map(obj -> obj.task.getConfig().getName()).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listOfUnassignedTasks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to make variable alignmentLabelAssign from type NodeLabelAssign and then store a list of unscheduled tasks and fetch it here. Then you don't need to calculate this here again.

System.out.println(namesList.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not log this



List<Task> filteredTasks = new LinkedList<>();

for (final Task task : unscheduledTasks) {
if (!namesList.contains(task.getConfig().getName())) {
filteredTasks.add(task);
}
}

// print Tasks
System.out.println("Tasks after Label Alignment");
filteredTasks.stream().map(obj -> obj.getConfig().getName()).forEach(System.out::println);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How rare are these events? I would not log this on a regular base.


// second alignemnt
List<NodeTaskAlignment> alignment = nodeAssigner.getTaskNodeAlignment(filteredTasks, availableByNode);


alignmentLabelAssign.addAll(alignment);
long timeDelta = System.currentTimeMillis() - start;
for ( Task unscheduledTask : unscheduledTasks ) {
unscheduledTask.getTraceRecord().setSchedulerTimeToSchedule( (int) timeDelta );
}

final ScheduleObject scheduleObject = new ScheduleObject(alignmentLabelAssign);
scheduleObject.setCheckStillPossible( false );
return scheduleObject;
}
}
3 changes: 2 additions & 1 deletion src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public boolean validSchedulePlan( List<NodeTaskAlignment> taskNodeAlignment ){
return true;
}

abstract ScheduleObject getTaskNodeAlignment(
public abstract ScheduleObject getTaskNodeAlignment(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why making this public?

final List<Task> unscheduledTasks,
final Map<NodeWithAlloc, Requirements> availableByNode
);
Expand Down Expand Up @@ -468,6 +468,7 @@ Map<NodeWithAlloc, Requirements> getAvailableByNode(){
}
logInfo.add("------------------------------------");
log.info(String.join("\n", logInfo));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can leave this class as it was.

return availableByNode;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cws.k8s.scheduler.scheduler.nodeassign;

import cws.k8s.scheduler.model.NodeWithAlloc;
import cws.k8s.scheduler.model.PodWithAge;
import cws.k8s.scheduler.model.Requirements;
import cws.k8s.scheduler.model.Task;
import cws.k8s.scheduler.util.NodeTaskAlignment;
import lombok.extern.slf4j.Slf4j;

import java.util.*;

@Slf4j
public class LabelAssign extends NodeAssign {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name this NodeLabelAssign to show it is a node assigner


public Map<String, String> nodelabel;

public LabelAssign(
final Map<String, String> nodelabel
){
this.nodelabel = nodelabel;
}

@Override
public List<NodeTaskAlignment> getTaskNodeAlignment( List<Task> unscheduledTasks, Map<NodeWithAlloc, Requirements> availableByNode ) {
LinkedList<NodeTaskAlignment> alignment = new LinkedList<>();
final ArrayList<Map.Entry<NodeWithAlloc, Requirements>> entries = new ArrayList<>( availableByNode.entrySet() );
for ( final Task task : unscheduledTasks ) {

String taskName = null;
String taskLabel = null;

try {
taskName = task.getConfig().getName();
taskLabel = taskName.split("~")[1];
// ~ is used for a special case in which subtasks from one process in nextflow are generated
// the labels in the nextflow config have to be named like this: ~label~

log.info("Label for task: " + task.getConfig().getName() + " : " + taskLabel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not log this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to debug level or remove

} catch ( Exception e ){
log.warn( "Cannot find a label for task: " + task.getConfig().getName(), e );
continue;
}

final PodWithAge pod = task.getPod();
log.info("Pod: " + pod.getName() + " Requested Resources: " + pod.getRequest() );

if(nodelabel.containsKey(taskLabel)){
String nodeName = nodelabel.get(taskLabel);

for ( Map.Entry<NodeWithAlloc, Requirements> e : entries ) {
final NodeWithAlloc node = e.getKey();

if(nodeName.equals(node.getName())){
System.out.println("Aligned Pod to node: " + node.getName());
alignment.add( new NodeTaskAlignment( node, task ) );
availableByNode.get( node ).subFromThis(pod.getRequest());
log.info("--> " + node.getName());
task.getTraceRecord().foundAlignment();
break;
}
}
} else
{
log.info( "Task Label: " + taskLabel + " doesn't exist in config file.");
}
}
return alignment;
}
}