手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析Amazon Timestream 是一种快速、可扩展的无服务器时间序列数据库服务,适用于物联网和运营应用程序,使用该服务每天可以轻松存储和分析数万亿个事件,速度提高了 1000 倍。

Amazon Timestream 是一种快速、可扩展的无服务器时间序列数据库服务,适用于物联网和运营应用程序,使用该服务每天可以轻松存储和分析数万亿个事件,速度提高了 1000 倍,而成本仅为关系数据库的十分之一。通过将近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层,Amazon Timestream 为客户节省了管理时间序列数据生命周期的时间和成本。Amazon Timestream 专门构建的查询引擎可用于访问和分析近期数据和历史数据,而无需在查询中显式指定数据是保存在内存中还是成本优化层中。Amazon Timestream 内置了时间序列分析函数,可以实现近乎实时地识别数据的趋势和模式。Amazon Timestream 是无服务器服务,可自动缩放以调整容量和性能,因此无需管理底层基础设施,可以专注于构建应用程序。

本文介绍通过 Timestream、Kinesis Stream 托管服务和 Grafana 和 Flink Connector 开源软件实现物联网(以 PM 2.5场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,能从中获得启发,助力业务发展。

架构

Amazon Timestream 能够使用内置的分析函数(如平滑、近似和插值)快速分析物联网应用程序生成的时间序列数据。例如,智能家居设备制造商可以使用 Amazon Timestream 从设备传感器收集运动或温度数据,进行插值以识别没有运动的时间范围,并提醒消费者采取措施(例如减少热量)以节约能源。

本文物联网(以PM 2.5场景为示例),实现 PM2.5数据实时采集、时序数据存储和实时分析, 其中架构主要分成三大部分:

  • 实时时序数据采集:通过Python数据采集程序结合Kinesis Stream和Kinesis Data Analytics for Apache Flink connector 模拟实现从PM 2.5监控设备, 将数据实时采集数据到Timestream。
  • 时序数据存储:通过Amazon Timestream时序数据库实现时序数据存储,设定内存和磁性存储(成本优化层)存储时长,可以实现近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层。
  • 实时时序数据分析:通过Grafana (安装Timesteam For Grafana插件)实时访问Timestream数据,通过Grafana丰富的分析图表形式,结合Amazon Timestream 内置的时间序列分析函数,可以实现近乎实时地识别物联网数据的趋势和模式。

具体的架构图如下:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

部署环境

1.1 创建 Cloudformation

请使用自己帐号 (region 请选择 us-east-1)

下载 Github 上 Cloudformation Yaml 文件:

git clone https://github.com/bingbingliu18/Timestream-pm25

Timestream-pm25目录中包含下面 Cloudformation 所用文件 timestream-short-new.yaml

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

其它都选择缺省, 点击 Create Stack button.

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Cloud Formation 创建成功

1.2 连接到新建的 Ec2 堡垒机:

修改证书文件权限

chmod 0600 [path to downloaded .pem file]

ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]

执行 Amazon configure:

Amazon configure

default region name, 输入: “us-east-1”,其它选择缺省设置。

1.3 连接到 EC2堡垒机 安装相应软件

设置时区

TZ='Asia/Shanghai'; export TZ

Install python3

sudo yum install -y python3

Install python3 pip

sudo yum install -y python3-pip

pip3 install boto3

sudo pip3 install boto3

pip3 install numpy

sudo pip3 install numpy

install git

sudo yum install -y git

1.4 下载 Github Timesteram Sample 程序库

git clone https://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools

1.5 安装 Grafana Server

连接到 EC2堡垒机:

sudo vi /etc/yum.repos.d/grafana.repo

For OSS releases:(拷贝以下内容到grafana.repo)

[grafana]

name=grafana

baseurl=https://packages.grafana.com/oss/rpm

repo_gpgcheck=1

enabled=1

gpgcheck=1

gpgkey=https://packages.grafana.com/gpg.key

sslverify=1

sslcacert=/etc/pki/tls/certs/ca-bundle.crt

安装 grafana server:

sudo yum install -y grafana

启动 grafana server:

sudo service grafana-server start
sudo service grafana-server status

配置 grafana server 在操作系统启动时 自动启动:

sudo /sbin/chkconfig --add grafana-server

1.6 安装 timestream Plugin

sudo grafana-cli plugins install grafana-timestream-datasource

重启 grafana

sudo service grafana-server restart

1.7 配置 Grafana 要访问 Timesteam 服务所用的 IAM Role

获取 IAM Role Name

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

选择 IAM 服务, 选择要修改的 role, role name:

timestream-iot-grafanaEC2rolelabview-us-east-1

修改 role trust relationship:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

将 Policy document 全部选中, 替换成以下内容:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "AWS": "[请替换成CloudFormation output中的role arn]"
      },
      "Action": "sts:AssumeRole"
    } 
  ]
}

修改后的 trust relationship:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

1.8登录到 Grafana server

第一次登录到 Grafana Server:

  1. 打开浏览器 访问 http://[Grafana server public ip]:3000
  2. 缺省的 Grafana Server 监听端口是: 3000 .

如何获取 Ec2 Public IP 地址, 如下图所示, 访问 Cloudformation output:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

  1. 在登陆界面, 输入 username: admin; password:admin.(输入用户名和密码都是 admin)
  2. 点击 Log In.登陆成功后, 会收到提示修改密码

1.9 Grafana server 中增加 Timestream 数据源

增加 Timestream 数据源

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

1.10 Grafana server 中配置 Timestream 数据源

拷贝配置所需要 role ARN 信息 (从 cloudformation output tab)Default Region: us-east-1

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

IoT 数据存储

2.1 创建 Timestream 数据库 iot

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

2.2 创建 Timestream 表 pm25

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

IoT 数据导入

3.1安装 Flink connector to Timestream

安装java8

sudo yum install -y java-1.8.0-openjdk*

java -version

安装debug info, otherwise jmap will throw exception

sudo yum --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo

Install maven

sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo 
sudo sed -i s/$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo 
sudo yum install -y apache-maven 
mvn --version 

change java version from 1.7 to 1.8

sudo update-alternatives --config java

sudo update-alternatives --config javac

安装 Apache Flink

最新的 Apache Flink 版本支持 Kinesis Data Analytics 是1.8.2.

  1. Create flink folder

cd

mkdir flink

cd flink

  1. 下载 Apache Flink version 1.8.2 源代码:

wget https://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz

  1. 解压 Apache Flink 源代码:

tar -xvf flink-1.8.2-src.tgz

  1. 进入到 Apache Flink 源代码目录:

cd flink-1.8.2

  1. Compile and install Apache Flink (这个编译时间比较长 需要大致20分钟):

mvn clean install -Pinclude-kinesis -DskipTests

3.2 创建 Kinesis Data Stream Timestreampm25Stream

aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1

3.3 运行 Flink Connector 建立 Kinesis 连接到 Timestream:

cd
cd amazon-timestream-tools/integrations/flink_connector
mvn clean compile

数据采集过程中 请持续运行以下命令:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"

3.4 准备 PM2.5演示数据:

连接到 EC2堡垒机

下载5演示数据生成程序:

cd

mkdir pm25

cd pm25

  1. 下载 Github 上数据采集 Python 程序:

git clone https://github.com/bingbingliu18/Timestream-pm25

cd Timestream-pm25

  1. 运行5演示数据生成程序 (python 程序2个参数 –region default: us-east-1; –stream default: Timestreampm25Stream)

数据采集过程中 请持续运行以下命令:

python3 pm25_new_kinisis_test.py

IoT 数据分析

4.1 登陆到 Grafana Server 创建仪表板和 Panel

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

创建 Dashboard 查询时 请设定时区为本地浏览器时区:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

创建新的 Panel:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

选择要访问的数据源, 将要查询分析所执行的 SQL 语句粘贴到新的 Panel 中:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

4.2 创建时间数据分析仪表版 Dashboard PM2.5 Analysis 1(Save as PM2.5 Analysis 1)

4.2.1 查询北京各个监控站点PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'fengtai_xiaotun' THEN avg_pm25 ELSE NULL END AS fengtai_xiaotou,
CASE WHEN location = 'fengtai_yungang' THEN avg_pm25 ELSE NULL END AS fengtai_yungang,
CASE WHEN location = 'daxing' THEN avg_pm25 ELSE NULL END AS daxing,
CASE WHEN location = 'wanshou' THEN avg_pm25 ELSE NULL END AS wanshou,
CASE WHEN location = 'gucheng' THEN avg_pm25 ELSE NULL END AS gucheng,
CASE WHEN location = 'tiantan' THEN avg_pm25 ELSE NULL END AS tiantan,
CASE WHEN location = 'yanshan' THEN avg_pm25 ELSE NULL END AS yanshan,
CASE WHEN location = 'miyun' THEN avg_pm25 ELSE NULL END AS miyun,
CASE WHEN location = 'changping' THEN avg_pm25 ELSE NULL END AS changping,
CASE WHEN location = 'aoti' THEN avg_pm25 ELSE NULL END AS aoti,
CASE WHEN location = 'mengtougou' THEN avg_pm25 ELSE NULL END AS mentougou,
CASE WHEN location = 'huairou' THEN avg_pm25 ELSE NULL END AS huairou,
CASE WHEN location = 'haidian' THEN avg_pm25 ELSE NULL END AS haidian,
CASE WHEN location = 'nongzhan' THEN avg_pm25 ELSE NULL END AS nongzhan,
CASE WHEN location = 'tongzhou' THEN avg_pm25 ELSE NULL END AS tongzhou,
CASE WHEN location = 'dingling' THEN avg_pm25 ELSE NULL END AS dingling,
CASE WHEN location = 'yanqing' THEN avg_pm25 ELSE NULL END AS yanqing,
CASE WHEN location = 'guanyuan' THEN avg_pm25 ELSE NULL END AS guanyuan,
CASE WHEN location = 'dongsi' THEN avg_pm25 ELSE NULL END AS dongsi,
CASE WHEN location = 'shunyi' THEN avg_pm25 ELSE NULL END AS shunyi
FROM 
(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Beijing'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

选择图形显示 select Gauge

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as Beijing PM2.5 analysis

Edit Panel Title:Beijing PM2.5 analysis

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Dashboard PM2.5 analysis 1:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

4.2.2 查询上海一天内各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,
CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian, 
CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory, 
CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing,
 CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang, 
 CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo, 
 CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,
CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan, 
CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia, 
CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou, 
CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading, 
CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang, 
CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang, 
CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu, 
CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan, 
CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongming
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shanghai'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as Shanghai PM2.5 analysis

Edit Panel Title:Shanghai PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.3查询广州各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,
CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school, 
CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,
CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station, 
CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street, 
CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school, 
CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu, 
CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha, 
CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west, 
CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town, 
CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu, 
CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun, 
CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain, 
CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua, 
CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadu
from(
    SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Guangzhou'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as Guangzhou PM2.5 analysis

Edit Panel Title:Guangzhou PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.4 查询深圳各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city,
 CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,
CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,
CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,
CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,
CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,
CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,
CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,
CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,
CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,
CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,
CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS Meisha
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shenzhen'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as Shenzhen PM2.5 analysis

Edit Panel Title:Shenzhen PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.5 深圳华侨城时间序列分析(最近5分钟内 PM2.5分析)

New Panel

select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25
where measure_name='pm2.5' 
and location='huaqiao city'
and time >= ago(5m)
GROUP BY location

选择图形显示 select Lines; Select Points:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis

Edit Panel Title: 深圳华侨城最近5分钟PM2.5分析

Save Dashboard PM2.5 analysis 1

4.2.6找出过去2小时内深圳华侨城以30秒为间隔的平均 PM2.5值 (使用线性插值填充缺失的值)

New Panel

WITH binned_timeseries AS (
    SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND location='huaqiao city'
        AND time > ago(2h)
    GROUP BY location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY location
)
SELECT time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25)

选择图形显示 select Lines:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1

Edit Panel Title: 过去2小时深圳华侨城平均PM2.5值 (使用线性插值填充缺失值)

Save Dashboard PM2.5 analysis 1

4.2.7 过去5分钟内所有城市 PM2.5平均值排名 (线性插值)

New Panel

SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,
CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,
CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,
CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,
CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,
CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,
CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,
CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,
CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,
CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,
CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,
CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasa
from(
WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), all_location_interpolated as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,avg(interpolated_avg_PM25) AS inter_avg_PM25
from all_location_interpolated
group by city
order by avg(interpolated_avg_PM25) desc)

选择 Panel 图形类型:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as all city analysis 1

Edit Panel Title: 过去5分钟所有城市PM2.5平均值

Save Dashboard PM2.5 analysis 1

4.2.8 过去5分钟内 PM2.5最高的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc desc
limit 10

选择 Table

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as all city analysis 2

Edit Panel Title:过去5分钟内 PM2.5最高的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

4.2.9 过去5分钟内 PM2.5最低的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc asc
limit 10

选择 Table

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

Save Panel as all city analysis 3

Edit Panel Title:过去5分钟内 PM2.5最低的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

设置仪表板 每5秒钟刷新一次:

手把手教你如何使用 Timestream 实现物联网时序数据存储和分析手把手教你如何使用 Timestream 实现物联网时序数据存储和分析

本 blog 着重介绍通过 Timestream、Kinesis Stream 托管服务和 Grafana 实现物联网(以 PM 2.5场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,有所启发,实现海量物联网时序数据高效管理、挖掘物联网数据中蕴含的规律、模式和价值,助力业务发展。

附录

《Amazon Timestream 开发人员指南》

《Amazon Timestream 开发程序示例》

《Amazon Timestream 与 Grafana 集成示例》

《欢迎试用亚马逊云科技数据库产品》

本篇作者

刘冰冰

亚马逊云科技数据库解决方案架构师,负责基于亚马逊云科技的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入亚马逊云科技之前曾在 Oracle 工作多年,在数据库云规划、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰富的经验。

今天的文章手把手教你如何使用 Timestream 实现物联网时序数据存储和分析分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/19286.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注