来源:互联网 更新时间:2026-06-23 07:25
摘要:本文介绍如何使用 MindSpore 国产 AI 框架与 TDengine 时序数据库结合,实现工业数据的智能分析和预测。

信创浪潮之下,国产AI框架正从“能用”走向“好用”,MindSpore就是其中的代表。但光有框架还不够,真正落地到工业场景,还得配套一个拿得出手的时序数据引擎。TDengine恰好补上了这一环——两个国产基础设施一搭,一套完整的、不依赖海外技术的分析栈就成形了。说白了,这不是简单的工具组合,而是为工业企业提供了一条从数据采集到智能决策的自主可控路径。
要开始分析,第一步当然是把数据从TDengine里捞出来。下面这段代码定义了一个数据加载器,直接用taos连接库,执行查询后把结果转成MindSpore能吃的数据集格式:
import taos
import mindspore as ms
import numpy as np
class TDengineDataLoader:
def __init__(self, query):
self.conn = taos.connect(host="localhost", database="industry")
self.cursor = self.conn.cursor()
self.cursor.execute(query)
self.data = self.cursor.fetchall()
def to_mindspore_dataset(self):
features = []
labels = []
for row in self.data:
features.append(row[1:-1])
labels.append(row[-1])
return ms.dataset.NumpySlicesDataset(
{"features": np.array(features), "labels": np.array(labels)},
shuffle=True
)
这里有个小技巧:to_mindspore_dataset 方法把每一行数据拆成特征和标签,返回一个支持随机打乱的 NumpySlicesDataset,后续训练直接喂进去就行。
拿到的原始数据通常需要做归一化,不然模型训练容易不稳定。MindSpore 的 dataset 模块自带变换操作,直接 map 上归一化然后分个批,32 一个 batch,顺手又高效:
from mindspore import dataset as ds
def preprocess_data(dataset):
# 归一化
dataset = dataset.map(operations=ds.transforms.Normalize(
mean=[0.5, 0.5, 0.5],
std=[0.5, 0.5, 0.5]
), input_columns=["features"])
# 批量处理
dataset = dataset.batch(batch_size=32)
return dataset
工业设备的状态往往表现为时间序列上的波动,所以用 LSTM 来做异常检测是很自然的思路。下面这个模型结构很简单:两层 LSTM 提取时序特征,再接一个全连接层输出二分类概率:
import mindspore.nn as nn
class AnomalyDetectionModel(nn.Cell):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(input_size=3, hidden_size=64, num_layers=2)
self.fc = nn.Dense(64, 1)
self.sigmoid = nn.Sigmoid()
def construct(self, x):
output, _ = self.lstm(x)
output = self.fc(output[:, -1, :])
return self.sigmoid(output)
注意这里 construct 是 MindSpore 的前向方法,和 PyTorch 的 forward 一个意思。模型输出是一个介于 0 到 1 之间的概率值,超过 0.8 就算异常。
训练流程没什么花哨的,先加载过去 30 天的设备数据(温度、压力、振动以及是否异常标签),然后建模型、选损失函数、设优化器,最后调用 train 接口跑 10 个 epoch:
from mindspore import Model, nn
# 加载数据
loader = TDengineDataLoader("""
SELECT ts, temperature, pressure, vibration, anomaly
FROM device_data
WHERE ts > NOW() - 30d
""")
dataset = loader.to_mindspore_dataset()
dataset = preprocess_data(dataset)
# 定义模型
model = AnomalyDetectionModel()
loss_fn = nn.BCELoss()
optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)
# 训练
ms_model = Model(model, loss_fn, optimizer, metrics={"accuracy"})
ms_model.train(epoch=10, train_dataset=dataset)
实际项目中可以把 epoch 调大一些,或者加入早停机制,这里只是为了演示整个链路跑通。
模型训练好后,关键是怎么在生产中实时跑起来。TDengine 的 Topic 机制让这件事变得很自然——只要创建一个订阅,就能持续接收最新一分钟的流数据:
CREATE TOPIC realtime_anomaly AS
SELECT ts, temperature, pressure, vibration
FROM device_data
WHERE ts > NOW() - 1m;
订阅建立后,用 taos 的 subscribe 接口持续 poll 新数据,每来一批就把特征塞进模型,如果预测概率超过 0.8 就触发报警。代码里省去了发警报的具体逻辑,实际可以集成邮件、信息或工单系统:
import taos
class RealtimePredictor:
def __init__(self, model):
self.model = model
self.conn = taos.connect(host="localhost", database="industry")
self.consumer = self.conn.subscribe("realtime_anomaly")
def predict(self):
rows = self.consumer.poll(1000)
for row in rows:
features = ms.Tensor([row[1:]], ms.float32)
prediction = self.model(features)
if prediction > 0.8:
self.send_alert(row[0], prediction)
MindSpore 与 TDengine 的组合,本质上解决了两件事:一是用国产框架替代了海外依赖,二是让时序数据的存储与分析在同一个技术栈里完成闭环。对企业来说,这意味着更低的集成成本、更高的数据安全性,以及一个可长期自主演进的智能分析基座。在信创落地的关键窗口期,这种“国产框架+国产数据库”的搭配,恐怕会越来越常见。
关键词:时序数据库、TDengine、MindSpore、国产 AI、异常检测
《Off Campus》第二季官宣:这对CP还在,但不再是主角
币安Binance虚拟货币交易平台 币安官方APP安卓苹果下载入口
客单价碾压宝马奥迪!极氪5月交付新车34377辆:连续4个月双增长
HBO 奇幻剧《龙之家族》第三季定档 6 月 22 日,最终预告片曝光喉道海战
帅气继父网名女生可爱英文(精选100个)
帅到极致的网名女生霸气(精选100个)
折后价近千元 澳洲一店主将真老鼠缝到内裤上当时尚单品卖
如何在夸克浏览器中开启网页视频的倍速播放功能?
蒙古上单是什么梗
DOTA2 TI时隔七年重返上海!门票6月10日开抢,国服享受优先购买!
作家助手如何上传自制封面 作家助手如何设置小说的封面
韦一敏是什么梗
韩漫小少爷网名大全女生(精选100个)
archiveofourown 实战指南:常见用法整理
网络热词聊污是什么意思
抖音最火沙雕男生网名(精选100个)
有寓意的易经网名男生(精选100个)
欧易OKX官方网站直达入口 2026欧易官方App安卓版v7.1.0下载安装
阿里发布Qwen3.7-Max大模型,全球第五、国产第一
小众游戏抖音网名男生(精选100个)
手机号码测吉凶
本站所有软件,都由网友上传,如有侵犯你的版权,请发邮件haolingcc@hotmail.com 联系删除。 版权所有 Copyright@2012-2013 haoling.cc