These instructions have been tested on a Mac. I would expect them to work on Linux as well. This project was built and tested with JDK11. The web server uses port 8080.
-
Get a copy (clone or unzip)
-
cd log_aggregator
-
./gradlew build
-
make a logs directory - so we can redirect stderr and stdout
mkdir logs
-
run the agent
java -jar agent/build/libs/agent.jar > logs/agent.out 2>&1 &
-
run the datapump
java -jar datapump/build/libs/datapump.jar > logs/datapump.out 2>&1 &
-
run the server
java -jar server/build/libs/server.jar > logs/server.out 2>&1 &
-
run the datadecooder
java -jar datadecoder/build/libs/datadecoder.jar > logs/datadecoder.out 2>&1 &
-
generate some logs
- Make the log directory
mkdir LogSource
- Generate some logs
nohup ./src/test/logGen > LogSource/log1.log &
nohup ./src/test/logGen > LogSource/log2.log &
nohup ./src/test/logGen > LogSource/log3.log &
- Make the log directory
-
Sanity check the logs.
./src/test/logCheck
This script can be used as a simple check for the logs. The LogGen script emits a counter for each line. This script does a wc -l on the log file and also checks the last count. If they are the same then all is well. There's an outside chance that the file changes between the wc and tail. If there is a discrepancy and then it recovers we can assume all is well. The script won't behave will if multiple sessions are present in the log file either.
Here's an example output:
log1.log WC=3964 LAST=3964 DELTA=0 log2.log WC=2492 LAST=2492 DELTA=0 log3.log WC=1473 LAST=1473 DELTA=0
This is telling us that log 1 has 3964 lines and the last index read is also 3964. The DELTA is just the difference between WC and LAST.
Here's an example of the first two lines of the log file:
Monday October 28, 2019 23:00:16.124950000 Hello, World! 1 Monday October 28, 2019 23:00:16.133206000 Hello, World! 2
-
Check the backpressure setting:
curl http://localhost:8080/throttle?seconds=-1
-
Change the backpressure to 2 minutes:
curl http://localhost:8080/throttle?seconds=120
After a few minutes you should be able to check the logs to see if the datapump has backed off:
grep 'DPumpPusher-1' logs/datapump.out | grep -A1 sleep
Here's an example output:
2019-10-28 22:23:18.716 WARN 40052 --- [ DPumpPusher-1] tjmike.logaggregator.datapump.AsyncPusher : Will sleep: 120000 2019-10-28 22:25:18.728 INFO 40052 --- [ DPumpPusher-1] tjmike.logaggregator .datapump.AsyncPusher : PUSH: log3.log_1572314081_260.pbData Code: 200 Message: Throttle: 120
We can see tha the the the DataPumpPusher got the throttle message and went to sleep for 2 minutes. This example is for DPumpPusher-1. There are 5 threads for the data pump. They should all behave the same.
Set the backpressure to 0 seconds:
curl http://localhost:8080/throttle?seconds=0
Give the jobs a couple minutes to catch back up.
-
kill all the processes. Another test can be performed if we kill the logGen jobs first:
jobs [1] Running java -jar agent/build/libs/agent.jar > logs/agent.out 2>&1 & [2] Running java -jar datapump/build/libs/datapump.jar > logs/datapump.out 2>&1 & [3] Running java -jar server/build/libs/server.jar > logs/server.out 2>&1 & [4] Running java -jar datadecoder/build/libs/datadecoder.jar > logs/datadecoder.out 2>&1 & [5] Running nohup ./src/test/logGen > LogSource/log1.log & [6]- Running nohup ./src/test/logGen > LogSource/log2.log & [7]+ Running nohup ./src/test/logGen > LogSource/log3.log & kill %5 %6 %7
Now that all three logGen jobs have been stopped we can perform another test. If the throttle setting has been put back to 0 for a couple of minutes then the data should catch up quickly. This can be tested by performing a diff on the input and output files:
```
diff LogSource/log1.log ServerLog/log1.log*[0-9]
diff LogSource/log2.log ServerLog/log2.log*[0-9]
diff LogSource/log3.log ServerLog/log3.log*[0-9]
```
These files should be the same. The output log file has the session id (UNIX timestamp of when the agent
was started) appended to it. This allows us to differentiate between loger sessions.
Kill the rest of the processes
```
kill %1 %2 %3 %4
'''
Below is a discussion of each component.
- Tail and encode
- Monitor multiple log files via continuous tail
- Accept a log chunk and serialize into a protocol buffer
- Save the serialized Protocol Buffer (cache chunks) to a local cache
The agent continuously tails a number of log files. Data is read from each file, serialized and pushed to the cache. If no data is captured for a short time (5 seconds) the agent will sleep just to prevent spinning. This approach represents an effort to be timely at the expense of capturing smaller chunks.
- Monitor the file system cache of log file chunks
- When new files are detected use http to send the files from the cache to the server
- Delete files from the cache upon successful delivery to server
- Read a throttle value from the server which will cause the client to sleep for a specified number of seconds before sending more chunks
The data pump is a separate program that monitors the log cache and sends data to the server. By making it a separate program it can be deployed and maintained separate from the agent.
- Accept log file chunks
- Write to local cache
- Accept throttling parameter to forward to clients
The server listens for data to be pushed to /data. The server responds with a string of the form:
Throttle: 123
The client is expected to parse this and back off future request by the number of seconds provided.
The server also listens for at /throttle. A request like this:
http://localhost:8080/throttle?seconds=60
tells the server to inform clients to delay by the amount specified. This request also returns the current throttle setting. A request < 0 will be ignored but the current setting will be returned.
For a better throttle strategy a separate process could monitor key metrics such as disk space, bandwidth, load, etc. and the process could tell the server what throttling should be set. The command to the server could be more sophisticated too. For example, the command could accept a logID and could then set throttling values on a per log basis.
- Monitor the local server cache for new files
- Assemble the log file chunks into a new log file
The data decoder re-assembles the log files into a single log file. The reconstituted log file will have a a name of the form NAME.SESSION where NAME is the original log file name and SESSION is the session id of the agent session that captured the log. The order of log files with the same name can be determined from the SESSION but there's no way to know how much (if any) data was lost between sessions.
The server supports multiple clients as long as the rules are followed. Each log file name must be unique. Multiple agent instances may be run, sharing the same datapump, or multiple instance/datapump pair may be run. It should be possible to run the agent/datapump pair on multiple hosts too.
Protocol Buffers are used as format for serializing log file chunks. The format is:
message LogPart {
string id = 1;
fixed64 session = 2;
int64 seq = 3;
bytes payload = 4;
}
The ID is a unique string that encapsulates the log file name. For this exercise we use the log file name as a unique string id.The session is the milliseconds since the UNIX epoch from when the Agent was started. Within the context of a session the seq is the sequence of he log chunk, starting at 1. The payload is the binary log data. The log server makes no assumption about the nature of the log file, it just captures bytes.
-
A log file has a globally unique name string (id).
-
Every logging session for a particular log file has sessionID that's unique to that log file. The sessionID must increase for each logging agent session. The UNIX millis timestamp is a good candidate for this id.
-
Every logging chunk must have a sequence that starts at one and increases by one for each chunk.
-
There are no constraints on the payload and it's opaque to the system. Very small payloads will be inefficient and large ones will use a lot of resources and cause delays in the system. Extremely large payloads will likely cause instability.
-
Logging chunks must be serialized into protocol buffers (v3). See the LogPart message definition above.
-
The protocol buffer chunks are sent to the server via an HTTP Post to the path /data. A 200 reply means the post was accepted and the sender need not keep that buffer around anymore.
-
The server will reply to every post with a Throttle command. The client is expected to back off sending data for the number of seconds specified in the command.
-
The client is responsible for ensuring that buffer segments are sequential and increase by a value of 1. The server will not assemble log entries if a segment is missing.
-
Log segments need not arrive at the server in order.
The current design supports tailing multiple files concurrently. The approach is to leave the files open for reading and to continuously poll for new data. The protocol has the concept of an ID that uniquely identifies a log file. The ID could be defined differently. For example rather than log file name it could be a url describing a unique host and file on that host.
It would be possible to add more fields to the protocol as well to better identify logs. Examples include hosts, applications, instances etc. These fields could be incorporated into the name to support some form of routing.
The existing model could be extended and scaled where a cluster of web servers is used to accept log file chunks. The web servers could store the data. A back end process similar to the data decoder could then process the stored data to rebuild the logs.
An interesting approach to investigate would be to have the agent send data directly to an S3 bucket. A lambda function would get fired when the data shows up. The lambda could stitch the files back together. If the lambda approach uses too many resources the lambda could be changed to just be a notifier that fires an event to a back end service that processes the bucket. This back end service could be serverless as well - something like Fargate.
The file watch service occasionally throws an exception:
JNA: Callback io.methvin.watchservice.MacOSXListeningWatchService$MacOSXListeningCallback@3013909b threw the following exception:
java.lang.IllegalStateException: Could not recursively list files for {PATH}/AgentLogCache/
Caused by: java.nio.file.NoSuchFileException: {PATH}/AgentLogCache/log2.log_1572297490_1786.pbData.tmp
This appears to be related to the file system changing state in the middle of a watch operation. The exception doesn't appear to have a negative impact on the program. I would try to resolve this before placing this library into production.
The system will consist of two components - a log forwarding agent and a log aggregation service.
The agent is responsible for tailing a given log file and delivering its contents to a remote service in a timely manner.
The service is responsible for accepting chunked log contents from the client and stitching it together, creating a copy of the original log. The contents of the log reconstructed on the server should preserve the original order of entries (lines).
Design a protocol that will satisfy the above requirements and implement a server and client using this protocol.
- The protocol itself could be based on HTTP or custom TCP/UDP or protocol as long as it is documented and has a working service/agent
- The server can accept log files delivered from multiple concurrent agents
- Avoid losing parts of a log due to network unreliability
- Make sure that the resulting log files on the server don’t contain any duplicate lines
- Consider strategies for coping with back pressure and overflow