admin 管理员组文章数量: 887007
行情订阅分钟合成
海风/go_real_md
/
订阅
版本 v0.6.5-20220722
quote.go :181
1// 订阅行情
2var insts = make([]string, 0)
3t.Instruments.Range(func(k, v interface{}) bool {
4 if len(v.(*goctp.InstrumentField).ProductID) == 0 { // 过滤 非正常合约
5 return true
6 }
7 insts = append(insts, k.(string))
8 if len(insts) > 200 { // 一次订阅 200个合约
9 q.ReqSubscript(insts)
10 insts = make([]string, 0)
11 }
12 i++
13 return true
14})
15q.ReqSubscript(insts)
合成分钟数据
处理 Actionday
OnRspLogin
DCE 的 actionDay 可能为 tradingDay,夜盘数据不对。
trade.go :69
1 // 交易日/自然日
2 t.TradingDay = login.TradingDay
3 if t.Weekday() == time.Monday { // 周一
4 actionDay = t.AddDate(0, 0, -3).Format("20060102") // 上周五
5 actionDayNext = t.AddDate(0, 0, -2).Format("20060102") // 上周六
6 } else {
7 actionDay = t.AddDate(0, 0, -1).Format("20060102") // 上一天
8 actionDayNext = login.TradingDay // 本日
9 }
OnTick
quote.go
1 // 取tick的分钟构造当前分钟时间
2 var action = t.TradingDay
3 // 夜盘
4 hour, _ := strconv.Atoi(tick.UpdateTime[0:2])
5 if hour <= 3 {
6 action = actionDayNext
7 } else if hour >= 20 {
8 action = actionDay
9 }
10 // 分钟 yyyy-mm-dd hh:mm:00
11 minDateTime := fmt.Sprintf("%s-%s-%s %s:00", action[0:4], action[4:6], action[6:], tick.UpdateTime[0:5])
首个 Tick
开高低收值为 lastPrice,Volume当前Volume为总成交量,需要减去上一分钟最后一个tick的总成交量。
1preVol := bar.PreVol + bar.Volume
2&Bar{
3 ID: minDateTime,
4 InstrumentID: tick.InstrumentID,
5 OpenInterest: tick.OpenInterest,
6 TradingDay: t.TradingDay,
7 Open: tick.LastPrice,
8 High: tick.LastPrice,
9 Low: tick.LastPrice,
10 Close: tick.LastPrice,
11 PreVol: preVol,
12 Volume: tick.Volume - preVol,
13}
K线更新
最高/最低/收盘价/成交量/持仓量
1const E = 0.000001
2if tick.LastPrice-bar.High > E {
3 bar.High = tick.LastPrice
4}
5if tick.LastPrice-bar.Low < E {
6 bar.Low = tick.LastPrice
7}
8bar.Close = tick.LastPrice
9bar.Volume = tick.Volume - bar.PreVol
10bar.OpenInterest = tick.OpenInterest
数据存储
分钟 tick 数 ≥ 3
开盘前竞价 tick
小节收盘时只收到 1 个tick
json
1// 当前分钟未被记录
2if bar.Ticks == 3 { // 控制分钟最小tick数量;避免盘歇的数据
3 jsonStr, _ := json.Marshal(&bar)
4 err := rdb.RPush(ctx, tick.InstrumentID, jsonStr).Err()
5 if err != nil {
6 logrus.Errorf("redis rpush error: %s %v", tick.InstrumentID, err)
7 }
8 // 发布分钟数据
9 rdb.Publish(ctx, "md."+tick.InstrumentID, jsonStr)
10} else if bar.Ticks > 3 {
11 jsonStr, _ := json.Marshal(&bar)
12 err := rdb.LSet(ctx, tick.InstrumentID, -1, jsonStr).Err()
13 if err != nil {
14 logrus.Errorf("redis lset error: %s %v", tick.InstrumentID, err)
15 }
16 // 发布分钟数据
17 rdb.Publish(ctx, "md."+tick.InstrumentID, jsonStr)
18}
收盘时写入 postgres
读取 redis 数据 (json)
写入 pg
1func InserrtPg() (err error) {
2 // 取交易日
3 tradingDay, err := rdb.HGet(ctx, "tradingday", "curday").Result()
4 if err != nil {
5 return errors.Wrap(err, "get curday")
6 }
7 // 删除当前交易日数据(实现重复入库)
8 err = GORMpg.Where(&Bar{TradingDay: tradingDay}).Delete(&Bar{}).Error
9 if err != nil {
10 return errors.Wrap(err, "delete")
11 }
12 // 取所有 key
13 var bars = make([]*Bar, 0)
14 insts, err := rdb.Keys(ctx, "*").Result()
15 if err != nil {
16 return errors.Wrap(err, "get redis keys")
17 }
18 // 按合约key入库
19 for _, inst := range insts {
20 if inst == "tradingday" {
21 continue
22 }
23 var mins = []string{}
24 if mins, err = rdb.LRange(ctx, inst, 0, -1).Result(); err != nil {
25 logrus.Error("取redis数据错误:", inst, err)
26 continue
27 }
28 for _, bsMin := range mins {
29 var bar Bar
30 if err = json.Unmarshal([]byte(bsMin), &bar); err != nil {
31 logrus.Error("解析bar错误:", bar, " ", err)
32 continue
33 }
34 bars = append(bars, &bar)
35 if len(bars) > 100000 { // 10W
36 err = GORMpg.CreateInBatches(bars, 2000).Error
37 if err != nil {
38 return errors.Wrap(err, "create batch")
39 }
40 bars = make([]*Bar, 0)
41 }
42 }
43 }
44 err = GORMpg.CreateInBatches(bars, 2000).Error
45 if err != nil {
46 return errors.Wrap(err, "create batch")
47 }
48 return
49}
部署
方式一
修改 Makefile 中的环境变量
前置、登录参数
redis IP
pg IP 不配置则不会在收盘时写入当日分钟数据
1make docker
2make local 或者 make srv
方式二
docker-compose
1docker-compose --compatibility up -d
查看数据
redis
1# 进入 docker redis cli
2docker exec -it ${redis_docker} redis-cli
3# 合约列表
4> KEYS *
5# 查看存储的行情
6> LRANGE ${合约} 0 -1
7# 查看存储的交易日
8> HGET tradingday curday
实时分钟数据可用 pubsub 订阅
1# pubsub 是一个查看订阅与发布系统状态的内省命令, 它由数个不同格式的子命令组成pubsub
2pubsub channels [pattern] # 查询系统中符合模式的频道信息,pattern为空,则查询系统中所有存在的频道
3pubsub numsub [channel] # 查询一个或多个频道的订阅数
4pubsub numpat #查询当前客户端订阅了多少频道
5
6# subscribe 订阅一个或多个频道
7subscribe channel [channel...]
8
9# psubscribe 订阅符合一个或多个匹配模式的所有频道,psubscribe new.* 则是订阅所有new.开头的频道(new.log,new.studnet,etc…)
10psubscribe pattern [pattern …]
本文标签: 行情订阅分钟合成
版权声明:本文标题:行情订阅分钟合成 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1732351444h1533183.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论