AWS Kinesis agent 代理服务的安装和使用
环境准备:
- 创建IAM:
创建一个具备kinesis读写权限的role
- 搭建EC2:
我这边是搭建了一台linux-centos的机器,当然也可以是其他的服务器
- 安装java8:
搭建的服务器必须安装java,且版本要大于8的。
- 创建一个DataStream
非常简单,就不截图说明了
说下分片:分片一共有两种模式按需和预置
按需:Kinesis Data Streams 自动管理分片以提供必要的吞吐量。您只需为使用的实际吞吐量付费,而 Kinesis Data Streams 在工作负载上升或下降时自动适应它们的吞吐量需求,默认情况下,采用按需模式的数据流会自动扩展吞吐量,以便容纳最高每秒 200MiB 的流量和每秒 200,000 条记录的写入容量。如果流量超出容量,您的数据流将受到限制。
预置:必须指定数据流的分片数。数据流的总容量是其分片容量的总和。您可以根据需要增加或减少数据流中的分片数,并按小时费率支付分片数。
个人见解:如果流的数据小于1000条/s,那就选预置1-2分片,如果流的数据不确定,设备太多,一秒几千上万 那就选按需的。
- 创建一个Data Firehose
Data Firehose 安装选项比较多,但是也比较简单没有啥。firehose不管分区,但是可以创建分区路径。所以glue table 需要分区查询就需要自己创建。
agent安装和常用命令(Linux):
安装:
- 安装:sudo yum install –y aws-kinesis-agent
- 创建文件路径:sudo mkdir -p /tmp/agent_data
- 配置agent文件
{"cloudwatch.emitMetrics": true,"kinesis.endpoint": "kinesis.ap-northeast-1.amazonaws.com","firehose.endpoint": "firehose.ap-northeast-1.amazonaws.com",“awsAccessKeyId”: “","awsSecretAccessKey": “*","flows": [{"filePattern": "/tmp/agent_data/*.log*","kinesisStream": "poc_test_2","partitionKeyOption": "RANDOM","kinesis.roleArn": "arn:aws:iam::11111111:role/kinesis_DefaultRole"}]
}}
kinesis.endpoint :kinesis data stream endpoint ,如果不是EC2服务器,可能前面还需要加http:
firehose.endpoint :firehose endpoint ,如果不是EC2服务器,可能前面还需要加http:
flows 是数组 所以内部配置可以是多个。
filePattern:需要监测的的文件路径,比如上面就是监测/tmp/agent_data目录下所有后缀包含log的文件。
kinesisStream:kinesis data stream 流名称
partitionKeyOption:分片设置,默认随机
kinesis.roleArn :具有kinesis 读写权限的role
配置文件还有很多其他选项,这些算是常用的
其余命令可参考文档:使用 Kinesis 代理写入 Amazon Kinesis Data Streams - Amazon Kinesis Data Streams
命令:
- 启动或停止:sudo service aws-kinesis-agent start/stop
- 重启:sudo service aws-kinesis-agent restart,每次修改完配置文件后都需要重启服务
- 开机自动启动服务:sudo chkconfig aws-kinesis-agent on
- 查看log日志:taif –F /var/log/aws-kinesis-agent/aws-kinesis-agent.log
样例代码
python写的
import logging
import json
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(‘test_data.log’)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(‘%(message)s’)
handler.setFormatter(formatter)
logger.addHandler(handler)
# data_json.json is sample json format data
with open(‘data_json.json’,‘rb’) as f:js = json.loads(f.read().decode('utf-8'))
# for 100 times and put it in the test_data.log file
for num in range(1, 100):json_str = json.dumps(js)logger.info(json_str)
课题研究:
以下测试都是个人结果,可能存在问题。如有不同,可留言告知。
1.覆写或者修改某一行数据 agent 是否会重复发送
答:不会
如:比如先输入一条,被datastream消费走了,然后覆盖写入4条,结果是只会发送3条,因为第一条数据被覆盖,会报错:Error when processing current input file or when tracking its status. 覆盖写入会缺少一条数据。
2.创建新的流,是否会拉取同路径下其他的 agent log文件里的数据
答:不会
如:创建一个新的datastream,向旧的log文件(有数据)中插入新的数据,新的datastream并不会将旧的数据重新读取,而是只会将新插入的数据传入到datastream-firehose中。
但是如果新建的firehose会消费输入datastream内的所有数据,不管该datastream流是否被消费过,所以datastream有存储功能,数据被消费依然存在。
3.当agent服务意外停止后,继续输入数据,当agent启动后是否会传输这些数据
答:会
当服务停止后,启动服务后,agent会将监测路径下的log文件内的新内容发送到datastream。
意味着,当监测文件中的数据被读取后,将会被标记,将不会被agent二次读取,如果修改了已处理的数据,则会报错,而替换的数据将不会被传输,所以个人感觉agent监测的行号
4.将已传输完成的文件名修改后,agent是否会是认为新文件重新读取
答:不会,mv 修改log文件名称,agent 不会重新读取
5. 直接将数据文件拖到监测文件目录是否可以传输
答:会
6. datastream 会不会自动删除重复数据
答:不会
重复数据需要在consumer端自行处理
7.如果传输数据大于限制(比如一个分片,1s 大于1000条或1MB),数据是否会丢失
答:个人测试不会,只不过速度会慢,分批发送,但是查阅过外网资料,有人测试过会丢失数据,也有可能我测试的数据量不够大,所以尽量还是按照分片的限制来传输,如果数据量超过,添加分片数即可。
8.agent 能否在windows上使用
答:可以,我用的测试版本是EC2-Windows Server 2022 Datacenter
官网-agent代理在windows上使用
必须安装java8以上版本和.NET Framework 大于4.6。如果默认大于4.6 ,但是启动服务错误,就卸载然后重装就行了。
windows版agent代理下载路径
role权限需要加上个sts:AssumeRole
信任关系也需要加上该帐号:这个问题研究了1天,不知道你们需要不需要,就是把自己的帐号加到这个role的信任关系里,当然你的帐号是root的话 应该就不会有这个问题。
{"Version": "2012-10-17","Statement": [{"Sid": "","Effect": "Allow","Principal": {"Service": "ec2.amazonaws.com", "AWS": "arn:aws:iam::<deployment-acct-id>:user/用户名"},"Action": "sts:AssumeRole"}]
}
配置文件也跟linux不一样:
{"Sources": [{"Id": "JsonLogSource", #source唯一标识"SourceType": "DirectorySource", #类型"RecordParser": "SingleLineJson", # 数据类型"Directory": "C:\\\\LogSource\\\\", #监测的文件路径"FileNameFilter": "*.log" #监测的文件格式}],"Sinks": [{"Id": "TestKinesisStreamSink", #sink的唯一标识"SinkType": "KinesisStream", # skin的类型,是datastream"StreamName": "muji_win_stream", #datastream 名称"AccessKey":"*","SecretKey":"*","RoleARN":"arn:aws:iam::11111111:role/kinesis_DefaultRole","Region": "ap-northeast-1"}],"Pipes": [{"Id": "JsonLogSourceToLogStream","SourceRef": "JsonLogSource", ## source id"SinkRef": "TestKinesisStreamSink" #sink id}]
}
启动的话,可以使用windows服务启动也可以使用命令
启动命令:Start-Service -Name AWSKinesisTap
无论是linux 还是windows 只要修改过配置文件都需要重新启动服务