admin 管理员组

文章数量: 887007

行情订阅分钟合成

海风/go_real_md

/

订阅

版本 v0.6.5-20220722

quote.go :181

 1// 订阅行情
2var insts = make([]string, 0)
3t.Instruments.Range(func(k, v interface{}) bool {
4  if len(v.(*goctp.InstrumentField).ProductID) == 0 { // 过滤 非正常合约
5    return true
6  }
7  insts = append(insts, k.(string))
8  if len(insts) > 200 { // 一次订阅 200个合约
9    q.ReqSubscript(insts)
10    insts = make([]string, 0)
11  }
12  i++
13  return true
14})
15q.ReqSubscript(insts)

合成分钟数据

处理 Actionday

OnRspLogin

DCE 的 actionDay 可能为 tradingDay,夜盘数据不对。

trade.go :69

1    // 交易日/自然日
2    t.TradingDay = login.TradingDay
3    if t.Weekday() == time.Monday { // 周一
4          actionDay = t.AddDate(0, 0, -3).Format("20060102")     // 上周五
5          actionDayNext = t.AddDate(0, 0, -2).Format("20060102") // 上周六
6    } else {
7          actionDay = t.AddDate(0, 0, -1).Format("20060102") // 上一天
8          actionDayNext = login.TradingDay                   // 本日
9    }
OnTick

quote.go

 1    // 取tick的分钟构造当前分钟时间
2    var action = t.TradingDay
3    // 夜盘
4    hour, _ := strconv.Atoi(tick.UpdateTime[0:2])
5    if hour <= 3 {
6        action = actionDayNext
7    } else if hour >= 20 {
8        action = actionDay
9    }
10    // 分钟 yyyy-mm-dd hh:mm:00
11    minDateTime := fmt.Sprintf("%s-%s-%s %s:00", action[0:4], action[4:6], action[6:], tick.UpdateTime[0:5])

首个 Tick

开高低收值为 lastPrice,Volume当前Volume为总成交量,需要减去上一分钟最后一个tick的总成交量。

 1preVol := bar.PreVol + bar.Volume
2&Bar{
3  ID:           minDateTime,
4  InstrumentID: tick.InstrumentID,
5  OpenInterest: tick.OpenInterest,
6  TradingDay:   t.TradingDay,
7  Open:         tick.LastPrice,
8  High:         tick.LastPrice,
9  Low:          tick.LastPrice,
10  Close:        tick.LastPrice,
11  PreVol:       preVol,
12  Volume:       tick.Volume - preVol,
13}

K线更新

最高/最低/收盘价/成交量/持仓量

 1const E = 0.000001
2if tick.LastPrice-bar.High > E {
3  bar.High = tick.LastPrice
4}
5if tick.LastPrice-bar.Low < E {
6  bar.Low = tick.LastPrice
7}
8bar.Close = tick.LastPrice
9bar.Volume = tick.Volume - bar.PreVol
10bar.OpenInterest = tick.OpenInterest

数据存储

分钟 tick 数 ≥ 3

  • 开盘前竞价 tick

  • 小节收盘时只收到 1 个tick

  • json

 1// 当前分钟未被记录
2if bar.Ticks == 3 { // 控制分钟最小tick数量;避免盘歇的数据
3  jsonStr, _ := json.Marshal(&bar)
4  err := rdb.RPush(ctx, tick.InstrumentID, jsonStr).Err()
5  if err != nil {
6    logrus.Errorf("redis rpush error: %s %v", tick.InstrumentID, err)
7  }
8  // 发布分钟数据
9  rdb.Publish(ctx, "md."+tick.InstrumentID, jsonStr)
10} else if bar.Ticks > 3 {
11  jsonStr, _ := json.Marshal(&bar)
12  err := rdb.LSet(ctx, tick.InstrumentID, -1, jsonStr).Err()
13  if err != nil {
14    logrus.Errorf("redis lset error: %s %v", tick.InstrumentID, err)
15  }
16  // 发布分钟数据
17  rdb.Publish(ctx, "md."+tick.InstrumentID, jsonStr)
18}

收盘时写入 postgres

  • 读取 redis 数据 (json)

  • 写入 pg

 1func InserrtPg() (err error) {
2    // 取交易日
3    tradingDay, err := rdb.HGet(ctx, "tradingday", "curday").Result()
4    if err != nil {
5        return errors.Wrap(err, "get curday")
6    }
7    // 删除当前交易日数据(实现重复入库)
8    err = GORMpg.Where(&Bar{TradingDay: tradingDay}).Delete(&Bar{}).Error
9    if err != nil {
10        return errors.Wrap(err, "delete")
11    }
12    // 取所有 key
13    var bars = make([]*Bar, 0)
14    insts, err := rdb.Keys(ctx, "*").Result()
15    if err != nil {
16        return errors.Wrap(err, "get redis keys")
17    }
18    // 按合约key入库
19    for _, inst := range insts {
20        if inst == "tradingday" {
21            continue
22        }
23        var mins = []string{}
24        if mins, err = rdb.LRange(ctx, inst, 0, -1).Result(); err != nil {
25            logrus.Error("取redis数据错误:", inst, err)
26            continue
27        }
28        for _, bsMin := range mins {
29            var bar Bar
30            if err = json.Unmarshal([]byte(bsMin), &bar); err != nil {
31                logrus.Error("解析bar错误:", bar, " ", err)
32                continue
33            }
34            bars = append(bars, &bar)
35            if len(bars) > 100000 { // 10W
36                err = GORMpg.CreateInBatches(bars, 2000).Error
37                if err != nil {
38                    return errors.Wrap(err, "create batch")
39                }
40                bars = make([]*Bar, 0)
41            }
42        }
43    }
44    err = GORMpg.CreateInBatches(bars, 2000).Error
45    if err != nil {
46        return errors.Wrap(err, "create batch")
47    }
48    return
49}

部署

方式一

修改 Makefile 中的环境变量
  • 前置、登录参数

  • redis IP

  • pg IP 不配置则不会在收盘时写入当日分钟数据

1make docker
2make local 或者 make srv

方式二

docker-compose
1docker-compose --compatibility up -d

查看数据

redis

1# 进入 docker redis cli
2docker exec -it ${redis_docker} redis-cli
3# 合约列表
4> KEYS *
5# 查看存储的行情
6> LRANGE ${合约} 0 -1
7# 查看存储的交易日
8> HGET tradingday curday

实时分钟数据可用 pubsub 订阅

 1# pubsub 是一个查看订阅与发布系统状态的内省命令, 它由数个不同格式的子命令组成pubsub
2pubsub channels [pattern] # 查询系统中符合模式的频道信息,pattern为空,则查询系统中所有存在的频道
3pubsub numsub [channel] # 查询一个或多个频道的订阅数
4pubsub numpat  #查询当前客户端订阅了多少频道
5
6# subscribe 订阅一个或多个频道
7subscribe channel [channel...]
8
9# psubscribe 订阅符合一个或多个匹配模式的所有频道,psubscribe new.* 则是订阅所有new.开头的频道(new.log,new.studnet,etc…)
10psubscribe pattern [pattern …]

本文标签: 行情订阅分钟合成