使用Java脚本节点实现MongoDB的聚合查询及统计数据并传入后继节点

苛子 苛子   |    193   |    2022-07-31 18:22

目前MongoDB的输入节点只能读取集合中的行列数据,而不能对集合中的数据进行聚合运算再传入后续节点,通过脚本节点可以快速实现这一需求。

e7d945b51bd14f7bab339f8b83d2e288

从Mongodb中读数据并分组然后输出到日志中脚本代码如下:

package cn.restcloud.etl.rule.ext;

import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import java.sql.Connection;
import cn.restcloud.framework.core.context.*;
import cn.restcloud.etl.base.IETLBaseEvent;
import cn.restcloud.etl.base.IETLBaseProcessEngine;
import cn.restcloud.framework.core.util.*;
import cn.restcloud.framework.core.util.db.rdb.*;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.AggregateIterable;
import cn.restcloud.etl.base.IETLBaseProcessEngine;
import cn.restcloud.framework.core.util.db.mongo.MongoUtil;


/**
indoc为流数据
执行成功必须返回字符1,返回0表示终止流程
*/

public class ETL_T00002_1EJ0RJSV8LZ implements IETLBaseEvent {

	public String execute(IETLBaseProcessEngine engine, Document modelNodeDoc, Document indoc,String fieldId,String params) throws Exception {
        List<Bson> filters=new ArrayList<Bson>();
        filters.add(BsonDocument.parse("{'$match':{\"actionTime\":{$gte:'2022-06-30 00:00',$lte:'2022-07-30 00:00'},appId:{$eq:'core'}}}")); //数据查询条件,相当于sql where条件
        filters.add(BsonDocument.parse("{$group :" + 
        		"  {" + 
        		"      _id : {mapUrl:\"$actionMapUrl\",configName:'$actionName',appId:\"$appId\",methodType:'$methodType'}, \r\n" + 
        		"      total : {$sum : 1}," + 
        		"      avgTime:{$avg:'$runTotalTime'}" + 
        		"	   min:{$min:'$runTotalTime'},"+
        		"	   max:{$max:'$runTotalTime'},"+
        		"  }" + 
        		"}")); //增加一个聚合查询条件
        
        filters.add(BsonDocument.parse("{'$sort':{total:-1}}"));//指定排序方式
        filters.add(BsonDocument.parse("{ $limit : 300 }"));//指定最大返回数
        
        List<Document> docs=new ArrayList<Document>();
        AggregateIterable<Document> aggdocs=MongoUtil.getCollection("RC_ApiLog", "P_ActionUrlAccessLog").aggregate(filters); //执行查询
        for(Document doc:aggdocs) {
        	//把结果转换为二级结构的数据
        	Document newdoc=new Document();
        	newdoc.putAll((Document)doc.get("_id"));
        	newdoc.put("accessTotal", doc.get("total"));
        	newdoc.put("avgTime", doc.get("avgTime"));
        	newdoc.put("min", doc.get("min"));
        	newdoc.put("max",doc.get("max"));
        	docs.add(newdoc);
        }
        
        indoc.put("data", docs); //传到后面的节点中去
        
        return "1";
	}

}
    
文章标签: 其他
推荐指数:

真诚点赞 诚不我欺

使用Java脚本节点实现MongoDB的聚合查询及统计数据并传入后继节点

点赞 收藏 评论 分享