EKS EFK 구축
By Bys on April 28, 2021
FluentD 설정
Create Configmap cluster-info
kubectl create configmap cluster-info \
--from-literal=cluster.name=MyClusterName
--from-literal=logs.region=ap-northeast-2 -n amazon-cloudwatch
Create Namespace
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml
Fluent DaemonSet
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluentd/fluentd.yaml
이렇게 배포를 하면 amazon-cloudwatch namespace상에 fluentd-cloudwatch-* 형태의 pod가 eks cluster에 배포가 된다.
배포가 된 Pod의 Container로그는 Cloudwatch Log Groups에서 확인 할 수 있다.
AWS ElasticSearch 설정
ElasticSearch
AWS에서 ES생성을 하게 되면 기본적인 값들을 셋팅하고 Fine-grained access control을 활성화 시켜 권한을 Control 한다.
Action -> Modify authentication에 ARN 셋팅과 Master user 생성이 가능하며, Master User는 Kibana 화면의 Master User가 된다.
FluentD가 수집한 로그를 ElasticSearch로 보내기 위해서는 Log Groups에서 Subscription filters의 Create - Create Elasticsearch subscription filter을 눌러 설정한다.
(Log Groups -> Subscription filters -> Create -> Create Elasticsearch subscription filter)
해당 설정이 완료되면 Lambda Function이 자동으로 생성되며 Cloudwatch 에서 Lambda를 통해 ElasticSearch로 데이터가 들어간다.
Lambda Function의 소스를 보면 데이터 Index를 다음과 같이 보낸다. (cwl-*)
CloudWatch 로그 그룹을 동일한 Amazon ES 도메인으로 스트리밍하기 위해서는 아래와 같이 indexNmae을 그룹별로 보낼 수 있도록 설정한다.
var indexName = [
'cwl-' + payload.logGroup.substring(payload.logGroup.lastIndexOf('/') + 1) + '-' + timestamp.getUTCFullYear(),
('0' + (timestamp.getUTCMonth() + 1)).slice(-2),
('0' + timestamp.getUTCDate()).slice(-2)
].join('.');
참고
Lambda function 수정 버전
// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
var endpoint = '<aws es endpoint>';
exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer(input.awslogs.data, 'base64');
// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }
// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));
// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);
// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
return;
}
// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
// 수정완료
console.log('Success: ' + JSON.stringify(failedItems));
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));
if (error) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
});
};
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}
var bulkRequestBody = '';
payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);
// index name format: cwl-YYYY.MM.DD
var namespace = payload.logGroup.toLowerCase().split('/')[5]
var apiname = payload.logGroup.toLowerCase().split('/')[6]
var indexName = [
'mydata-' + namespace + '-' + apiname + '-' + timestamp.getUTCFullYear(),
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2) // day
].join('.');
var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;
var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;
bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
});
return bulkRequestBody;
}
function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};
for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];
if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}
jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}
source[key] = value;
}
}
return source;
}
jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}
return {};
}
function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}
function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
function post(body, callback) {
var requestParams = buildRequest(endpoint, body);
var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}
var error = response.statusCode !== 200 || info.errors === true ? {
"statusCode": response.statusCode,
"responseBody": responseBody
} : null;
callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}
function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');
var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};
var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('\n');
var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');
var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');
var credentialString = [ date, region, service, 'aws4_request' ].join('/');
var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');
return request;
}
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
function logFailure(error, failedItems) {
if (logFailedResponses){
console.log('Error: ' + JSON.stringify(error,null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " + JSON.stringify(failedItems, null, 2));
}
}
}
Elastic Search의 SG에는 Lambda Function의 Inbound를 허용해주어야 하는데 해당 대역은 ElasticSearch가 생성된 Subnet의 80/443포트를 잡아주면 된다.
Kibana
Kibana는 AWS ES를 생성할 경우 Plugin으로 제공해주고 있으며 ElasticSearch의 VPC endpoint 주소의 /_plugin/kibana/ 를 붙여서 접속 가능하다.
Discover 메뉴의 CreateIndexPattern버튼을 눌러 Index Pattern을 잡아준다. Index Pattern은 위 Lambda Function에서 보내주는 Index Pattern(cwl-*)을 설정한다.
User와 권한 생성을 위해서는 메뉴 -> Security -> Internal User/Role 설정이 필요하다.
폐쇄망 환경이라면 아래 yaml파일에서 image 변경이 필요하다.
spec:
serviceAccountName: fluentd
terminationGracePeriodSeconds: 30
# Because the image's entrypoint requires to write on /fluentd/etc but we mount configmap there which is read-only,
# this initContainers workaround or other is needed.
# See https://github.com/fluent/fluentd-kubernetes-daemonset/issues/90
initContainers:
- name: copy-fluentd-config
# 수정필요
image: busybox
command: ['sh', '-c', 'cp /config-volume/..data/* /fluentd/etc']
volumeMounts:
- name: config-volume
mountPath: /config-volume
- name: fluentdconf
mountPath: /fluentd/etc
- name: update-log-driver
# 수정필요
image: busybox
command: ['sh','-c','']
containers:
- name: fluentd-cloudwatch
# 수정필요
image: fluent/fluentd-kubernetes-daemonset:v1.7.3-debian-cloudwatch-1.0
EFK환경에서 Kibana쪽에 로그 순서가 뒤섞여 나오는 현상이 있었는데 fluentd.yaml 파일에서 multiline_start_regexp 설정을 변경하여 조정하였다.
Log 패턴을 정규 표현식에 맞추어 하나의 로그 그룹으로 묶어 Clodwatch로 전송
11:09:04[301] logdata~~~~~~
(T) 11:09:04[301] logdata~~~~~~
logdata~~~~~~
11:09:04[301] logdata~~~~~~
<label @containers>
......
<filter **>
@type concat
key log
# 수정완료
multiline_start_regexp /\((D|E|T|I)\)\s\d{2}[:]\d{2}[:]\d{2}\[\d{3}\]/
#multiline_start_regexp /^(date:){0,1}\d{4}[-/]\d{1,2}[-/]\d{1,2}/
separator ""
flush_interval 5
timeout_label @NORMAL
</filter>
Kibana Index가 지속적으로 쌓임에 따라 관리의 필요성이 생겨 Policy를 적용하여 관리를 하였다.
아래와 같이 Delete Policy를 만들어 30일이 Index의 경우 Hot -> Delete로 보내는 정책을 만들어 사용하였다.
Index가 생성되는 패턴은 IndexName-yyyy.mm.dd 패턴이었기 때문에, ism_template으로 index_patterns를 잡아주면 신규로 생성되는 index에도 해당 정책이 바로 적용이 된다.
policy
{
"policy": {
"policy_id": "delete_policy",
"description": "Demonstrate a hot-delete workflow.",
"last_updated_time": 1625028517692,
"schema_version": 1,
"error_notification": null,
"default_state": "hot",
"states": [
{
"name": "hot",
"actions": [],
"transitions": [
{
"state_name": "delete",
"conditions": {
"min_index_age": "30d"
}
}
]
},
{
"name": "delete",
"actions": [
{
"delete": {}
}
],
"transitions": []
}
],
"ism_template": [
{
"index_patterns": [
"mydata-*"
],
"priority": 10
}
]
}
}
참고
fluentd.yaml 수정 버전
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd
namespace: amazon-cloudwatch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluentd-role
rules:
- apiGroups: [""]
resources:
- namespaces
- pods
- pods/logs
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluentd-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluentd-role
subjects:
- kind: ServiceAccount
name: fluentd
namespace: amazon-cloudwatch
---
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: amazon-cloudwatch
labels:
k8s-app: fluentd-cloudwatch
data:
fluent.conf: |
@include containers.conf
@include systemd.conf
@include host.conf
<match fluent.**>
@type null
</match>
containers.conf: |
<source>
@type tail
@id in_tail_container_logs
@label @containers
path /var/log/containers/*.log
exclude_path ["/var/log/containers/cloudwatch-agent*", "/var/log/containers/fluentd*"]
pos_file /var/log/fluentd-containers.log.pos
tag *
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<source>
@type tail
@id in_tail_cwagent_logs
@label @cwagentlogs
path /var/log/containers/cloudwatch-agent*
pos_file /var/log/cloudwatch-agent.log.pos
tag *
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<source>
@type tail
@id in_tail_fluentd_logs
@label @fluentdlogs
path /var/log/containers/fluentd*
pos_file /var/log/fluentd.log.pos
tag *
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<label @fluentdlogs>
<filter **>
@type kubernetes_metadata
@id filter_kube_metadata_fluentd
</filter>
<filter **>
@type record_transformer
@id filter_fluentd_stream_transformer
<record>
stream_name ${tag_parts[3]}
group_name "/aws/containerinsights/#{ENV.fetch('CLUSTER_NAME')}/application"
</record>
</filter>
<match **>
@type relabel
@label @NORMAL
</match>
</label>
<label @containers>
<filter **>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>
<filter **>
@type record_transformer
@id filter_containers_stream_transformer
enable_ruby
remove_keys $.docker, $.kubernetes.container_name, $.kubernetes.namespace_name, $.kubernetes.container_image, $.kubernetes.container_image_id, $.kubernetes.pod_id, $.kubernetes.labels, $.kubernetes.master_url, $.kubernetes.namespace_id
<record>
stream_name ${tag_parts[3]}
group_name "/aws/containerinsights/#{ENV.fetch('CLUSTER_NAME')}/application/${record['kubernetes']['namespace_name']}/${record['kubernetes']['container_name']}"
</record>
</filter>
<filter **>
@type concat
key log
multiline_start_regexp /\((D|E|T|I)\)\s\d{2}[:]\d{2}[:]\d{2}\[\d{3}\]/
separator ""
flush_interval 5
timeout_label @NORMAL
</filter>
<match **>
@type relabel
@label @NORMAL
</match>
</label>
<label @cwagentlogs>
<filter **>
@type kubernetes_metadata
@id filter_kube_metadata_cwagent
</filter>
<filter **>
@type record_transformer
@id filter_cwagent_stream_transformer
<record>
stream_name ${tag_parts[3]}
group_name "/aws/containerinsights/#{ENV.fetch('CLUSTER_NAME')}/application"
</record>
</filter>
<filter **>
@type concat
key log
multiline_start_regexp /^(date:){0,1}\d{4}[-/]\d{1,2}[-/]\d{1,2}/
separator ""
flush_interval 5
timeout_label @NORMAL
</filter>
<match **>
@type relabel
@label @NORMAL
</match>
</label>
<label @NORMAL>
<match **>
@type cloudwatch_logs
@id out_cloudwatch_logs_containers
region "#{ENV.fetch('REGION')}"
log_group_name_key group_name
remove_log_group_name_key false
log_stream_name_key stream_name
remove_log_stream_name_key true
auto_create_stream true
<buffer>
flush_interval 5
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
</buffer>
</match>
</label>
systemd.conf: |
<source>
@type systemd
@id in_systemd_kubelet
@label @systemd
filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
<entry>
field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"}
field_map_strict true
</entry>
path /var/log/journal
<storage>
@type local
persistent true
path /var/log/fluentd-journald-kubelet-pos.json
</storage>
read_from_head true
tag kubelet.service
</source>
<source>
@type systemd
@id in_systemd_kubeproxy
@label @systemd
filters [{ "_SYSTEMD_UNIT": "kubeproxy.service" }]
<entry>
field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"}
field_map_strict true
</entry>
path /var/log/journal
<storage>
@type local
persistent true
path /var/log/fluentd-journald-kubeproxy-pos.json
</storage>
read_from_head true
tag kubeproxy.service
</source>
<source>
@type systemd
@id in_systemd_docker
@label @systemd
filters [{ "_SYSTEMD_UNIT": "docker.service" }]
<entry>
field_map {"MESSAGE": "message", "_HOSTNAME": "hostname", "_SYSTEMD_UNIT": "systemd_unit"}
field_map_strict true
</entry>
path /var/log/journal
<storage>
@type local
persistent true
path /var/log/fluentd-journald-docker-pos.json
</storage>
read_from_head true
tag docker.service
</source>
<label @systemd>
<filter **>
@type kubernetes_metadata
@id filter_kube_metadata_systemd
</filter>
<filter **>
@type record_transformer
@id filter_systemd_stream_transformer
<record>
stream_name ${tag}-${record["hostname"]}
group_name "/aws/containerinsights/#{ENV.fetch('CLUSTER_NAME')}/dataplane"
</record>
</filter>
<match **>
@type cloudwatch_logs
@id out_cloudwatch_logs_systemd
region "#{ENV.fetch('REGION')}"
log_group_name_key group_name
remove_log_group_name_key false
log_stream_name_key stream_name
auto_create_stream true
remove_log_stream_name_key true
<buffer>
flush_interval 5
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
</buffer>
</match>
</label>
host.conf: |
<source>
@type tail
@id in_tail_dmesg
@label @hostlogs
path /var/log/dmesg
pos_file /var/log/dmesg.log.pos
tag host.dmesg
read_from_head true
<parse>
@type syslog
</parse>
</source>
<source>
@type tail
@id in_tail_secure
@label @hostlogs
path /var/log/secure
pos_file /var/log/secure.log.pos
tag host.secure
read_from_head true
<parse>
@type syslog
</parse>
</source>
<source>
@type tail
@id in_tail_messages
@label @hostlogs
path /var/log/messages
pos_file /var/log/messages.log.pos
tag host.messages
read_from_head true
<parse>
@type syslog
</parse>
</source>
<label @hostlogs>
<filter **>
@type kubernetes_metadata
@id filter_kube_metadata_host
</filter>
<filter **>
@type record_transformer
@id filter_containers_stream_transformer_host
<record>
stream_name ${tag}-${record["host"]}
group_name "/aws/containerinsights/#{ENV.fetch('CLUSTER_NAME')}/host"
</record>
</filter>
<match host.**>
@type cloudwatch_logs
@id out_cloudwatch_logs_host_logs
region "#{ENV.fetch('REGION')}"
log_group_name_key group_name
remove_log_group_name_key false
log_stream_name_key stream_name
remove_log_stream_name_key true
auto_create_stream true
<buffer>
flush_interval 5
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
</buffer>
</match>
</label>
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd-cloudwatch
namespace: amazon-cloudwatch
spec:
selector:
matchLabels:
k8s-app: fluentd-cloudwatch
template:
metadata:
labels:
k8s-app: fluentd-cloudwatch
annotations:
configHash: 8915de4cf9c3551a8dc74c0137a3e83569d28c71044b0359c2578d2e0461825
spec:
serviceAccountName: fluentd
terminationGracePeriodSeconds: 30
# Because the image's entrypoint requires to write on /fluentd/etc but we mount configmap there which is read-only,
# this initContainers workaround or other is needed.
# See https://github.com/fluent/fluentd-kubernetes-daemonset/issues/90
initContainers:
- name: copy-fluentd-config
image: busybox
command: ['sh', '-c', 'cp /config-volume/..data/* /fluentd/etc']
volumeMounts:
- name: config-volume
mountPath: /config-volume
- name: fluentdconf
mountPath: /fluentd/etc
- name: update-log-driver
image: busybox
command: ['sh','-c','']
containers:
- name: fluentd-cloudwatch
image: fluent/fluentd-kubernetes-daemonset:v1.7.3-debian-cloudwatch-1.0
env:
- name: REGION
valueFrom:
configMapKeyRef:
name: cluster-info
key: logs.region
- name: CLUSTER_NAME
valueFrom:
configMapKeyRef:
name: cluster-info
key: cluster.name
- name: CI_VERSION
value: "k8s/1.1.0"
resources:
limits:
memory: 400Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: config-volume
mountPath: /config-volume
- name: fluentdconf
mountPath: /fluentd/etc
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: runlogjournal
mountPath: /run/log/journal
readOnly: true
- name: dmesg
mountPath: /var/log/dmesg
readOnly: true
volumes:
- name: config-volume
configMap:
name: fluentd-config
- name: fluentdconf
emptyDir: {}
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: runlogjournal
hostPath:
path: /run/log/journal
- name: dmesg
hostPath:
path: /var/log/dmesg
eks
efk
fluentd
elasticsearch
kibana
]