Taskurotta or managing processes in a distributed system
Hello, abrowser!
We have the challenge of integrating the various services and existing systems in the managed processes. Need speed not space (i.e. not on stock exchange quotations response to create), but many processes and components (systems) to be used is also decent emerges. Don't want to do p2p linking. Want something beautiful and managed.
After reviewing the market, it was decided to make a remark in explanation Amazon Simple Workflow how to use it directly we are unable. The properties of the framework we need:
the
This is the minimum that I would like, but as practice shows, plus more. The project called Taskurotta in honor of the "Task" — tasks, and gopher in Finnish, which is still, which is not visible, but it is. Open source code available on GitHub. The project is implemented using Hazelcast to create a shared memory space and runtime environments between servers, Dropwizard for fast and convenient implementation of REST services and friends from Amazon which was the first and made a great product inspired us to in-house development. Documentation is hard, but soon recovered.
Let's move from theory to what is now on a real example.
Let's add to this non-functional requirement, that the services send messages already exist, are in other subnets and they need to be reused.
PS: All the source code of the described example is also available on GitHub taskurotta\taskurotta-getstarted
Taskurotta allows you to implement components of the system (Actors) interacting among themselves familiar to the developer by method calls to each other, but in an asynchronous manner. The actors are divided into two types — Performers and Coordinators. Artists must clearly fulfill their tasks. They represent the most independent modules, and, accordingly, maximally reusable. Performers can interact with the outside world (any threads input / output) when performing a task so as long as it takes. On the other hand, the Coordinators do not perform tasks related to the outside world. They must work as quickly as possible and not stumble on the direct interaction with the database, network and other potentially stable components. It is their responsibility to assign tasks to executors, to coordinate their actions and thereby provide the implementation (description) process. Coordinators can assign tasks to other coordinators, implementing the paradigm of reusable subprocesses.
The task of the Coordinator as soon as possible to give known at the moment of the task. Ie, it should not block on waiting for the result. It needs to build dependencies between tasks known to him and, if necessary, to form the asynchronous points of determination of further action.
For our process, the Coordinator should do the following:
To encode this sequence using the state machine, the methods change one message to several artists and other familiar, but bony ways we won't. Make it simple and beautiful with essence and Promise of our system, watching the actions of the Coordinator.
the
The example shows that the result of the invocation of the services we receive are not the real object, but a Promise for the result of execution of the task. This Promise we can pass as argument to other services (i.e. tasks). Calls to other services will be intercepted by the system (i.e. the real synchronous call will not happen) and built the relationship between them. The task is not transferred to execution to the services as long as all of their arguments of type Promise are ready, i.e. until you have made all necessary preparatory tasks.
Thus, the process control is carried out jointly by the coordinator and by our system. The coordinator builds the dependencies between tasks, and the system assumes, among other things, the function waits for the preliminary task and then running dependent tasks.
Let's now reveal what asynchronous points of determination of further action.
In this case, after sending the notice and before proceeding it is necessary to analyze result of sending. I.e. wait for the task execution, to analyze and depending on the result, to block or not. To solve this problem, the coordinator has the ability to create a task in itself — i.e. point to determine further action, in which to pass the required Promise. Below are how it looks.
the
The start () method is the start of the process. Next is the production of three tasks. The first receiving profile, a second and a third Coordinator puts himself for the subsequent analysis of the result (call methods sendToTransport and blockOnFail). Thus the Coordinator as if waiting for the decision of the first tasks, but without blocking. Once the problem is solved, the system calls the method Taskurotta coordinator sendToTransport, passing him a ready Promise object from which to get real data method get(). After the task runs a task sendToTransport blockOnFail where we aim to service userProfileService to block messages to user userId if an error occurred while sending notifications.
A follow-up action can implement different behaviours of the process:
the
PS: Call blockOnFail occurs through a decider object. It is an artificial object, intercepting the call, but not really calling method blockOnFail. We need to set a goal, and not to call it synchronously.
So as the scenario we already have Artists to send email and sms, we only need to create a user to work with profile. The By two problems:
Start with the announcement of its interface. This interface will work co-ordinator. Hereafter, for compactness omitted comments and other significant parts of the code.
the
The annotation
Let's move on to implementing the interface.
the
Here we have omitted the initialization of the profile Manager (ProfileUtil). It can work with DB, LDAP, or another registry. This example shows us that the Contractor receives tasks (calls) and delegates them to the real module.
At this point the Contractor is completed.
To solve the tasks, the Coordinator should pass a reference to have not obtained the user profile (a Promise) to the point of determining further action. There he will choose the transport or there will be nothing to send, when sending messages to this user is already blocked.
However, the interface of the contractor, as the contractor, receive and give the result of synchronously, and therefore are not in the Declaration of the results of the implementation in the form of Promise object and returns a pure data object. This is correct. The contractor is not required to know how to use it. For example, our Executor for obtaining a profile you can use if you already know the user ID, or if they are not known and need to pass a reference to a different problem, which this ID somewhere get. Thus we come to the interface with the Contractor. This interface defines the focal point for their needs. Ie, it is defined in the package (project) Coordinator. Add the interface with the Contractor to work with profile:
the
We see the interface is marked with annotation
Now on to the fun part — creating a focal point. For starters, below are the interface coordinator, using which customers Taskurotta will run their desired processes.
the
This interface is defined as
The start method is marked as
Now turn to the implementation coordinator
the
In this code, we have also omitted the initialization of private objects. Complete and working code example, you can see the project taskurotta-getstarted. Here we note only that the values of private fields are obtained through a special factory proxy object for the Coordinator.
In the example implementation there are two points of waiting for the results of performing the incomplete task by the Coordinator. This method sendToTransport and blockOnFail. These methods will be invoked only when all of their arguments of type Promise are ready,
ie performed the appropriate tasks.
The object type fields MailServiceClient and SMSServiceClient are the client interfaces to the respective Artists. Initialization can also be viewed in the project taskurotta-getstarted.
At the moment we have all realized the Performers and a Coordinator. Go directly to the start of the Actors (i.e. Performers and Coordinators).
Tasks can occur inside of application servers and a separate java application (this example uses a variant of a separate application module taskurotta\bootstrap). Making a standalone application:
the
To run a separate java application uses the bootstrap package, and concrete class EN.taskurotta.bootstrap.Main. Him in an argument you need to pass the location of the configuration file in YAML format.
As it try to start? Very simple. Below is a step by step build of the server, and run them from the source code. be careful, minor changes are necessary if you have not linux.
Let's say you have:
the
collect server Taskurotta
the
Start build. To speed up, disable the tests.
the
Now run a two node cluster (we use the same machine for demonstration purposes and therefore different ports in launch options). In the real environment can be run the required number of machines with same configuration.
Run the first cluster node:
the
Run the second node (We intentionally limit the memory to identify possible leaks at an early stage. In the configuration of this example uses no database, so for large volumes it is necessary to give more memory).
the
When both server are connected to each other, the log will be similar to this message:
the
Open the console in the browser. http://localhost:8081/index.html — the first node or http://localhost:8082/index.html second node.
You can use any node to work with the console. They display mostly the same information. Currently not all functions of the console function in this configuration (without a database). Everything works in a configuration with oracle DB and mongodb. Cm. deployment options in documentation.
Now let's run our process. Clone this repository taskurotta-getstarted
the
In order to start working actors, it is necessary to start the process. Run them such as 91 pieces.
the
Check the console in http://localhost:8081/index.html. Choose the tab "Queues". Will see what the coordinator 91на task, which corresponds to 91ому a running process.

Now run coordinator. In a YAML configuration file is defined only he is without the performers. Therefore after start will not work all the tasks of the process and we will see in the queue of task performers.
the
The configuration file defines the first node of our cluster as the server Taskurotta for the coordinator
the
You can refresh the list of queues in the console and see that there are tasks waiting to be artists.
Now let's run the performers (leave coordinator to work with) and to demonstrate them will be sent to the second node of the cluster. Because cluster nodes share memory and run an internal task, it doesn't matter on which server will get the request from the contractor.
the
The artists installed the second node in the cluster to communicate:
the
In the end, all processes should fully develop and this is evident in the queues in the management console.

That's all what I would like to share at the moment. Welcome suggestions and constructive criticism.
Article based on information from habrahabr.ru
We have the challenge of integrating the various services and existing systems in the managed processes. Need speed not space (i.e. not on stock exchange quotations response to create), but many processes and components (systems) to be used is also decent emerges. Don't want to do p2p linking. Want something beautiful and managed.
After reviewing the market, it was decided to make a remark in explanation Amazon Simple Workflow how to use it directly we are unable. The properties of the framework we need:
the
-
the
- Low start threshold (a good programmer now roads). Low threshold then more in part start programming as everything is done at a high level — almost on the level of interaction with interfaces. But to adequately asynchronously to manage the process of course it is necessary to increase the experience the
- When you save the task parameters and execution results obtained analysis and the basis for regression testing performers of process tasks the
- Focus to the control logic process in certain places (in the Coordinator). It may at first glance not obvious, but this is the greatest benefit compared with the possible alternative when each actor has its own logic — which other components continue to call (transfer control to). Often leads to complication of the system and the inability to reuse components
This is the minimum that I would like, but as practice shows, plus more. The project called Taskurotta in honor of the "Task" — tasks, and gopher in Finnish, which is still, which is not visible, but it is. Open source code available on GitHub. The project is implemented using Hazelcast to create a shared memory space and runtime environments between servers, Dropwizard for fast and convenient implementation of REST services and friends from Amazon which was the first and made a great product inspired us to in-house development. Documentation is hard, but soon recovered.
Let's move from theory to what is now on a real example.
Suppose that we need to develop an application, send a string message to the user. At the entrance we are received user Id and the character set. From his profile (by Id) get the data about the preference — to receive messages via email, or phone number. Phone number and email are also available in the profile. Then send the message to the right transport. If sending failed (due to not correct address or number), it should be noted that in the profile for prevention of repeated attempts in the future.
Let's add to this non-functional requirement, that the services send messages already exist, are in other subnets and they need to be reused.
PS: All the source code of the described example is also available on GitHub taskurotta\taskurotta-getstarted
Taskurotta allows you to implement components of the system (Actors) interacting among themselves familiar to the developer by method calls to each other, but in an asynchronous manner. The actors are divided into two types — Performers and Coordinators. Artists must clearly fulfill their tasks. They represent the most independent modules, and, accordingly, maximally reusable. Performers can interact with the outside world (any threads input / output) when performing a task so as long as it takes. On the other hand, the Coordinators do not perform tasks related to the outside world. They must work as quickly as possible and not stumble on the direct interaction with the database, network and other potentially stable components. It is their responsibility to assign tasks to executors, to coordinate their actions and thereby provide the implementation (description) process. Coordinators can assign tasks to other coordinators, implementing the paradigm of reusable subprocesses.
The task of the Coordinator as soon as possible to give known at the moment of the task. Ie, it should not block on waiting for the result. It needs to build dependencies between tasks known to him and, if necessary, to form the asynchronous points of determination of further action.
For our process, the Coordinator should do the following:
-
the
- to Request a user profile the
- to Wait for the profile the
- to Send a message to the user the
- Wait for the result of administration
To encode this sequence using the state machine, the methods change one message to several artists and other familiar, but bony ways we won't. Make it simple and beautiful with essence and Promise of our system, watching the actions of the Coordinator.
the
Promise<Profile> profilePromise = userProfileService.get(userId);
Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
The example shows that the result of the invocation of the services we receive are not the real object, but a Promise for the result of execution of the task. This Promise we can pass as argument to other services (i.e. tasks). Calls to other services will be intercepted by the system (i.e. the real synchronous call will not happen) and built the relationship between them. The task is not transferred to execution to the services as long as all of their arguments of type Promise are ready, i.e. until you have made all necessary preparatory tasks.
Thus, the process control is carried out jointly by the coordinator and by our system. The coordinator builds the dependencies between tasks, and the system assumes, among other things, the function waits for the preliminary task and then running dependent tasks.
Let's now reveal what asynchronous points of determination of further action.
We need to make sure that the send notification was successful and if not then you need to block the sending of notifications to the user.
In this case, after sending the notice and before proceeding it is necessary to analyze result of sending. I.e. wait for the task execution, to analyze and depending on the result, to block or not. To solve this problem, the coordinator has the ability to create a task in itself — i.e. point to determine further action, in which to pass the required Promise. Below are how it looks.
the
public void start(String userId, String message) {
Promise<Profile> profilePromise = userProfileService.get(userId);
Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
decider.blockOnFail(userId, sendResultPromise);
}
@Asynchronous
public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) {
logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", "userId" sendResultPromise);
if (!sendResultPromise.get()) {
userProfileService.blockNotification(userId);
}
}
The start () method is the start of the process. Next is the production of three tasks. The first receiving profile, a second and a third Coordinator puts himself for the subsequent analysis of the result (call methods sendToTransport and blockOnFail). Thus the Coordinator as if waiting for the decision of the first tasks, but without blocking. Once the problem is solved, the system calls the method Taskurotta coordinator sendToTransport, passing him a ready Promise object from which to get real data method get(). After the task runs a task sendToTransport blockOnFail where we aim to service userProfileService to block messages to user userId if an error occurred while sending notifications.
A follow-up action can implement different behaviours of the process:
the
-
the
- Parallel on different branches the
- Further merging of independent streams of the process at one point using Promise forwarding and @NoWait annotation the
- Asynchronous calls
parallel execution of similar tasks, such as checking a digital signature of all the files and waiting for the results of a single decision point
PS: Call blockOnFail occurs through a decider object. It is an artificial object, intercepting the call, but not really calling method blockOnFail. We need to set a goal, and not to call it synchronously.
So as the scenario we already have Artists to send email and sms, we only need to create a user to work with profile. The By two problems:
-
the
- Return the profile ID of the user the
- to Make the profile a mark of being able to send messages for a specific user
Start with the announcement of its interface. This interface will work co-ordinator. Hereafter, for compactness omitted comments and other significant parts of the code.
the
@Worker
public interface UserProfileService {
public Profile get(String userId);
public void blockNotification(String userId);
}
The annotation
@Worker
defines this interface as a Performer. The annotations have optional attributes define the name and the version (of the contract). The default filename is the full name of the interface, and the version is "1.0". Performers of the different versions can run simultaneously for different processes without any conflicts.Let's move on to implementing the interface.
the
public class UserProfileServiceImpl implements UserProfileService {
private static final Logger logger = LoggerFactory.getLogger(UserProfileServiceImpl.class);
@Override
public Profile get(String userId) {
return ProfileUtil.createRandomProfile(userId);
}
@Override
public void blockNotification(String userId) {
logger.info(".blockNotification(userId = [{}]", userId);
}
}
Here we have omitted the initialization of the profile Manager (ProfileUtil). It can work with DB, LDAP, or another registry. This example shows us that the Contractor receives tasks (calls) and delegates them to the real module.
At this point the Contractor is completed.
To solve the tasks, the Coordinator should pass a reference to have not obtained the user profile (a Promise) to the point of determining further action. There he will choose the transport or there will be nothing to send, when sending messages to this user is already blocked.
However, the interface of the contractor, as the contractor, receive and give the result of synchronously, and therefore are not in the Declaration of the results of the implementation in the form of Promise object and returns a pure data object. This is correct. The contractor is not required to know how to use it. For example, our Executor for obtaining a profile you can use if you already know the user ID, or if they are not known and need to pass a reference to a different problem, which this ID somewhere get. Thus we come to the interface with the Contractor. This interface defines the focal point for their needs. Ie, it is defined in the package (project) Coordinator. Add the interface with the Contractor to work with profile:
the
@WorkerClient(worker = UserProfileService.class)
public interface UserProfileServiceClient {
public Promise<Profile> get(String userId);
public void blockNotification(String userId);
}
We see the interface is marked with annotation
@WorkerClient
. The annotation parameter refers to the actual interface class of the Contractor. Thus the connection between the existing interface and the necessary interface for the specific Coordinator. Let's call this interface "client interface of the Executor." This client interface should contain all methods necessary for the coordinator (it is possible not to declare is not used) and with an identical signature of arguments. Any argument can be type of Promise if you want to pass as argument the result has not yet completed tasks.Now on to the fun part — creating a focal point. For starters, below are the interface coordinator, using which customers Taskurotta will run their desired processes.
the
@Decider
public interface NotificationDecider {
@Execute
public void start(String userId, String message);
This interface is defined as
@Decider
— i.e., as a Coordinator. This annotation has the same properties as the annotation @Worker
— the name and version. The default name is the full name of the interface, and for the version — "1.0".The start method is marked as
@Execute
. This means that through this method you can start the process.Now turn to the implementation coordinator
the
public class NotificationDeciderImpl implements NotificationDecider {
private static final Logger logger = LoggerFactory.getLogger(NotificationDeciderImpl.class);
UserProfileServiceClient private userProfileService;
private MailServiceClient mailService;
private SMSServiceClient smsService;
private NotificationDeciderImpl decider;
@Override
public void start(String userId, String message) {
logger.info(".start(userId = [{}], message = [{}])", userId, message);
Promise<Profile> profilePromise = userProfileService.get(userId);
Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
decider.blockOnFail(userId, sendResultPromise);
}
@Asynchronous
public Promise<Boolean> sendToTransport(Promise<Profile> profilePromise, String message) {
logger.info(".sendToTransport(profilePromise = [{}], message = [{}])", profilePromise, message);
Profile profile = profilePromise.get();
switch (profile.getDeliveryType()) {
case SMS: {
return smsService.send(profile.getPhone(), message);
}
case EMAIL: {
return mailService.send(profile.getEmail(), message);
}
}
return Promise.asPromise(Boolean.TRUE);
}
@Asynchronous
public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) {
logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", "userId" sendResultPromise);
if (!sendResultPromise.get()) {
userProfileService.blockNotification(userId);
}
}
}
In this code, we have also omitted the initialization of private objects. Complete and working code example, you can see the project taskurotta-getstarted. Here we note only that the values of private fields are obtained through a special factory proxy object for the Coordinator.
In the example implementation there are two points of waiting for the results of performing the incomplete task by the Coordinator. This method sendToTransport and blockOnFail. These methods will be invoked only when all of their arguments of type Promise are ready,
ie performed the appropriate tasks.
The object type fields MailServiceClient and SMSServiceClient are the client interfaces to the respective Artists. Initialization can also be viewed in the project taskurotta-getstarted.
At the moment we have all realized the Performers and a Coordinator. Go directly to the start of the Actors (i.e. Performers and Coordinators).
Tasks can occur inside of application servers and a separate java application (this example uses a variant of a separate application module taskurotta\bootstrap). Making a standalone application:
the
-
the
- is Logged on the server Taskurotta the
- Starts a pool of N threads to perform tasks the
- Gets the tasks from servers Taskurotta the
- start execution the
- Sends the result to the server Taskurotta
To run a separate java application uses the bootstrap package, and concrete class EN.taskurotta.bootstrap.Main. Him in an argument you need to pass the location of the configuration file in YAML format.
As it try to start? Very simple. Below is a step by step build of the server, and run them from the source code. be careful, minor changes are necessary if you have not linux.
Let's say you have:
the
-
the
- jdk 1.7 the
- maven 3 the
- git
collect server Taskurotta
the
git clone https://github.com/taskurotta/taskurotta.git
cd taskurotta/
Start build. To speed up, disable the tests.
the
mvn clean install -DskipTests
Now run a two node cluster (we use the same machine for demonstration purposes and therefore different ports in launch options). In the real environment can be run the required number of machines with same configuration.
Run the first cluster node:
the
java-Xmx64m-Ddw.http.port=8081-Ddw.http.adminPort=9081-Ddw.logging.file.currentLogFilename="assemble/target/server1.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml
Run the second node (We intentionally limit the memory to identify possible leaks at an early stage. In the configuration of this example uses no database, so for large volumes it is necessary to give more memory).
the
java-Xmx64m-Ddw.http.port=8082 -Ddw.http.adminPort=9082 -Ddw.logging.file.currentLogFilename="assemble/target/server2.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml
When both server are connected to each other, the log will be similar to this message:
the
Members [2] {
Member [192.168.1.2]:7777
Member [192.168.1.2]:7778 this
}
Open the console in the browser. http://localhost:8081/index.html — the first node or http://localhost:8082/index.html second node.
You can use any node to work with the console. They display mostly the same information. Currently not all functions of the console function in this configuration (without a database). Everything works in a configuration with oracle DB and mongodb. Cm. deployment options in documentation.
Now let's run our process. Clone this repository taskurotta-getstarted
the
git clone https://github.com/taskurotta/taskurotta-getstarted.git
cd taskurotta-getstarted/
mvn clean install
In order to start working actors, it is necessary to start the process. Run them such as 91 pieces.
the
java -cp target/getstarted-process-1.0-SNAPSHOT.jar EN.taskurotta.example.starter.NotificationModule http://localhost:8081 91
Check the console in http://localhost:8081/index.html. Choose the tab "Queues". Will see what the coordinator 91на task, which corresponds to 91ому a running process.

Now run coordinator. In a YAML configuration file is defined only he is without the performers. Therefore after start will not work all the tasks of the process and we will see in the queue of task performers.
the
java-Xmx64m-jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-decider.yml
The configuration file defines the first node of our cluster as the server Taskurotta for the coordinator
the
spreader:
- Spreader:
class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig
instance:
endpoint: "http://localhost:8081"
threadPoolSize: 10
readTimeout: 0
connectTimeout: 3000
You can refresh the list of queues in the console and see that there are tasks waiting to be artists.
Now let's run the performers (leave coordinator to work with) and to demonstrate them will be sent to the second node of the cluster. Because cluster nodes share memory and run an internal task, it doesn't matter on which server will get the request from the contractor.
the
java-Xmx64m-jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-workers.yml
The artists installed the second node in the cluster to communicate:
the
spreader:
- Spreader:
class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig
instance:
endpoint: "http://localhost:8082"
threadPoolSize: 10
readTimeout: 0
connectTimeout: 3000
In the end, all processes should fully develop and this is evident in the queues in the management console.

That's all what I would like to share at the moment. Welcome suggestions and constructive criticism.
Комментарии
Отправить комментарий