酒店排序场景中模型实时AUC计算实践

用户在OTA平台查询酒店到返回查询结果,大致会经历下图所示过程,首先后台会在候选集中进行召回,选出数千个Item,排序模型会对这些Item进行排序。其中排序又分为粗排,精排和重排,粗排模块会结合模型离线计算的结果,线上的简单模型和一些规则对召回的Item进行初次排序,取初次排序结果排名前1000或前1500家酒店进入精排模块,精排模块使用大规模机器学习模型(主要是DNN模型)结合一些机制对item进行二次排序,最后为了一些业务目标、B端的生态建设和用户体验等因素会对二次排序的结果进行重排,再将排序结果返回前端。

排序流程图

在精排模块,我们使用大规模离散DNN模型对Item的CVR(转化率)和CTR(点击率)等指标进行预估,然后根据DNN模型预测的这些数据结合一些机制对进入精排的Item进行排序。在排序的整个过程中,模型效果好差对最后的流量转化起着至关重要的作用,所以我们需要对模型的输出进行监控。

AUC(Area under the Curve of ROC)是衡量机器学习模型效果的非常重要的一个指标,也是模型上线后算法同学较为关心的一个指标,它能评判模型预测的准确度。为了提升模型效果,结合DNN模型可以进行增量训练的特点,我们提高了模型的迭代更新频率,有些模型甚至可以做到一天一更新。原来一天计算一次AUC的方案已经难以满足现在的需求,算法同学希望能看到模型的实时AUC,及时评估线上模型的效果,于是我们开始探索如何实时计算线上模型的AUC。

ClickHouse

ClickHouse是当前较为流行OLAP数据库,广泛使用于日志查询和分析场景中。与SparkSQL、Hive和Elasticsearch等OLAP数据库相比,ClickHouse具有明显优势,例如,SparkSQL与Hive这类系统无法保障90%的查询在1秒内返回,在大数据量下的复杂查询可能会需要分钟级的响应时间;而Elasticsearch这类搜索引擎在处理亿级数据聚合查询时则显得捉襟见肘。相比较来说,ClickHouse非常适合实时查询。

Flink是当前主流的流式计算引擎之一,具有低时延、高并发等特性,且兼具可靠性。其社区活跃,迭代更新快,支持将数据写入ClickHouse。

难点

  1. 数据量大,酒店列表页一天的模型日志数据会达到千亿级别,曝光的酒店数据量也会达到数千万,如何在秒级返回AUC的实时计算结果。
  2. Clickhouse执行join的查询语句性能较差,如何将所需要的数据整合到一张宽表中去。
  3. 通过Flink将数据流写入Clickhouse时,使用的JDBC只能写入Append-only流,如何将Flink中”left join”(Retract流)的结果”写入”Clickhouse

AUC计算方法

  • AUC计算方法,我们从其名称就可以得出,计算ROC曲线下方的面积,就可以得到AUC的值,事实上,这也是在早期 Machine Learning文献中常见的AUC计算方法。我们先要画出ROC曲线,然后计算其面积,可以看出这是比较麻烦的。
  • AUC有一个性质是,它与Mann–Whitney U test是等价,该测试就是任意给一个正类样本和一个负类样本,正类样本的score有多大的概率大于负类样本的score,而这个概率值就是AUC。统计学中,我们可以通过统计频率来估计概率。取所有的正负样本对,计算正样本score大于负样本score的数量(正负样本score相等时取0.5),然后除以样本对数,便可得到AUC。
    这种方法经过推导可以得到如下公式, 其中M是正样本的数目,N为负样本的数目

    AUC计算公式

在酒店排序场景中,模型会输出的CVR(订单转化率)和CTR(点击率),我们就计算这两个指标的AUC。

首先我们要先获取酒店曝光、点击和订购的数据,根据订购表标记曝光数据是否被订购可获得计算CVR的AUC样本。此外,我们还需获得当次曝光模型输出的该酒店的预估CVR作为score,通过上述公式便可计算得出CVR的AUC指标,同理可计算CTR。

离线计算方法

  • 第一步,构建样本表。

    在hive中,我们可以通过如下SQL语句获取计算AUC所需的样本数据,其中modelVersion字段是模型的版本,hotelId是酒店的id,cvr是模型本次计算该酒店的CVR,booking_bool是表示用户本次会话中(sessionId相同表示同一会话),是否订购了该酒店,即样本的正负属性。
1
2
3
4
5
6
7
create table dx.tmp_check_auc_data as
select modelVersion, sessionId, hotelId, cvr, ctr, booking_bool, click_bool
from logtable
join impressiontable on logtable.sessionId=impressiontable.sessionId and logtable.hotelId=impressiontable.hotelId
left join ordertable on impressiontable.sessionId=ordertable.sessionId
left join clicktable on impressiontable.sessionId=clicktable.sessionId

  • 第二步,计算。
    在获得样本数据后,通过如下sql可以计算得到不同模型版本CVR的AUC数据, 同理可算CTR。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select modelVersion, (ry - 0.5*n1*(n1+1))/n0/n1 as auc
from
(
select modelVersion
,sum(if(booking_bool=0, 1, 0)) as n0
,sum(if(booking_bool=1, 1, 0)) as n1
,sum(if(booking_bool=1, cvr_rank, 0)) as ry
from
(
select modelVersion, booking_bool, cvr
, row_number() over(partition by modelVersion order by cvr asc) as cvr_rank
from dx.tmp_check_auc_data
where modelVersion is not null
)
group by modelVersion
)

实时计算方法

显然,在离线数据库,计算模型AUC并不复杂。但在实时场景,由于下单动作相对曝光在时间上有一定延迟,且延迟跨度较大,甚至可能达到一个小时,所以在构建样本表时,无法像hive那样简单join一次就可获得正确的样本数据。同时,为了保证实时性和性能,我们还需借助一些实时计算引擎帮助我们获得样本数据。

假设现有多个实时数据流,包括日志数据流log,曝光数据流impression,点击数据流click,订单数据量order。为了计算CVR和CTR的AUC指标,需要将日志流和曝光流inner join之后再left join 点击流和订单流。
日志流的数据有 hotelId, sessionId, ctr, ctr, eventime.
曝光流的数据有hotelId, sessionId, impressTime.
点击流的数据有hotelId, sessionid, clickTime.
订单流的数据有hotelId, sessionId, orderTime.

查询时在ClickHouse实时join

这种方案是最容易实施的方案,将四个数据流的数据落地到ClickHouse,然后使用类似离线的SQL语句计算出AUC数据。

然而,ClickHouse在使用join的时候性能并不好,速度较慢。在数据量大时,会占用大量内存,当内存不足时,就会查询失败。仔细分析就会发现,计算AUC只需要曝光酒店的CTR和CVR,不需要将日志流的全部数据落地ClickHouse, 我们可以将日志流和曝光流的数据在流式计算引擎Flink中join好之后再写入ClickHouse, 这样写入的数据量可降低一个量级,也可以减少一次在ClickHouse中的join。

事实上,ClickHouse也更加推荐使用大宽表查询,所以写入之前最好能将数据在流计算引擎中join好再写入ClickHouse。日志流和曝光流的数据可以在Flink中inner join之后写入,但由于Flink不能将Retract流写入ClickHouse, 所以曝光流和点击流、订单流的left join的过程不能简单的在Flink中提前join好再写入。

日志&曝光join表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table auc.hotel_impress
(
hotelId Int32,
ctr Float32,
cvr Float32,
sessionId String,
modelVersion LowCardinality(String),
impressTime DateTime,
datachange_lasttime DateTime default now()
)
engine = ReplicatedMergeTree()
PARTITION BY toYYYYMMDD(impressTime)
ORDER BY (impressTime, sessionId, hotelId)
TTL datachange_lasttime + toIntervalDay(7)
SETTINGS index_granularity = 8192;

点击表:

1
2
3
4
5
6
7
8
9
10
11
12
create table auc.hotel_click
(
hotelId Int32,
sessionId String,
eventTime DateTime,
datachange_lasttime DateTime default now()
)
engine = ReplicatedMergeTree()
PARTITION BY toYYYYMMDD(eventTime)
ORDER BY (eventTime, sessionId, hotelId)
TTL datachange_lasttime + toIntervalDay(7)
SETTINGS index_granularity = 8192;

订单表:

1
2
3
4
5
6
7
8
9
10
11
12
create table auc.hotel_order
(
hotelId Int32,
sessionId String,
eventTime DateTime,
datachange_lasttime DateTime default now()
)
engine = ReplicatedMergeTree()
PARTITION BY toYYYYMMDD(eventTime)
ORDER BY (eventTime, sessionId, hotelId)
TTL datachange_lasttime + toIntervalDay(7)
SETTINGS index_granularity = 8192;

AUC计算方法(以CVR为例,CTR同理可得)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
select t, modelVersion,  arrayAUC(cvr_arr, order_arr) as auc
from (
select modelVersion, t, groupArray(cvr) as cvr_arr, groupArray(book_bool) as order_arr
from (SELECT a.hotelId,
cvr,
modelVersion,
a.sessionId as sessionId,
toUInt32(toStartOfFifteenMinutes(impressTime)) * 1000 as t,
case when e.sessionId != '' then 1 else 0 end as book_bool
FROM ( SELECT hotelId, cvr, modelVersion, sessionId, impressTime
FROM auc.hotel_impress_all
WHERE
and impressTime >= $from
AND impressTime < $to ) a
GLOBAL
left OUTER join ( SELECT sessionId, hotelId
FROM auc.hotel_click_all
WHERE eventTime >=$from
AND eventTime < $to ) d
on a.sessionId = d.sessionId and a.hotelId = d.hotelId
GLOBAL
left OUTER join ( SELECT sessionId, hotelId
FROM auc.hotel_order_all
WHERE eventTime >= $from
AND eventTime < $to ) e
on a.sessionId = e.sessionId and a.hotelId = e.hotelId)
group by t, modelVersion
)
order by t

上述计算方法借助了ClickHouse内置的AUC函数,我们也可通过SQL自己实现AUC的计算,由于使用的ClickHouse版本不支持row_number()函数,我们使用arrayEnumerate函数实现排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
select t, modelVersion, (ry - 0.5 * n1 * (n1 + 1)) / n0 / n1
from (select modelVersion,
toUInt32(toStartOfFifteenMinutes(eventTime)) * 1000 as t,
sum(if(book_bool = 0, 1, 0)) as n0,
sum(if(book_bool = 1, 1, 0)) as n1,
sum(if(book_bool = 1, cvr_rank, 0)) as ry
FROM (SELECT modelVersion,
cvr_rank,
eventTime,
book_bool
from (SELECT modelVersion,
groupArray(book_bool) as arr_book_bool,
groupArray(sessionId) as arr_sessionid,
groupArray(hotelId) as arr_htlId,
groupArray(cvr) as arr_cvr,
groupArray(eventTime) as arr_eventTime,
arrayEnumerate(arr_cvr) AS cvr_rank
FROM (SELECT hotelId,
cvr,
modelVersion,
a.sessionId as sessionId,
impressTime as eventTime,
case when e.sessionId != '' then 1 else 0 end as book_bool
FROM ( SELECT hotelId, cvr, modelVersion, sessionId, impressTime
FROM auc.hotel_impress_all
WHERE impressTime >= $from
AND impressTime < $to ) a
GLOBAL
left OUTER join ( SELECT sessionId, hotelId
FROM auc.hotel_order_all
WHERE eventTime >= $from
AND eventTime < $to ) e
on a.sessionId = e.sessionId and a.hotelId = e.hotelId

ORDER BY cvr )
GROUP BY modelVersion, toUInt32(toStartOfFifteenMinutes(eventTime)) * 1000 as t) ARRAY JOIN
arr_book_bool as book_bool,
arr_sessionid as sessionId,
arr_htlId as htlId,
arr_cvr as cvr,
arr_eventTime as eventTime,
cvr_rank)
group by modelVersion, toUInt32(toStartOfFifteenMinutes(eventTime)) * 1000 as t)

ORDER BY t

实时auc监控

上图是查询结果图,数据每十五分钟聚合一次,可以清楚的看到各实验版本的AUC变化趋势和对比。

使用物化视图 left join

当查询的范围变大,数据量也会变大,使用实时Join的方式查询AUC查询速度会变得很慢,甚至会因为Join的数据量过大,使得内存溢出,从而造成查询失败。不能满足查询过去24小时或者过去2天等长时间跨度AUC的需求。

为了能支持更大时间范围的查询,我们需要构建一张大宽表,其中包含日志流,曝光流,点击流和订单流的所有数据。这里我们就需要用到ClickHouse的物化视图功能。

ClickHouse的物化视图类似于MySql的触发器(trigger),如果是A join B, 那么当有一批A表的数据写入时,则会触发A join B的计算,并将该批次数据的Join计算结果写入物化视图(注意,B表的更新不会触发计算)。

物化视图建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
CREATE MATERIALIZED VIEW auc_mv.impress_click_order_join_mv
(
`hotelId` Int32 COMMENT 'hotelId',
`ctr` Float64 COMMENT 'ctr',
`cvr` Float64 COMMENT 'cvr',
`modelVersion` LowCardinality(String) COMMENT 'modelVersion',
`sessionId` String COMMENT 'sessionId',
`queryTime` DateTime COMMENT 'queryTime',
`click_bool` Int8 COMMENT 'click_bool',
`book_bool` Int8 COMMENT 'book_bool'
)
ENGINE = ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMMDD(queryTime)
ORDER BY (queryTime, sessionId, hotelId)
TTL (queryTime + toIntervalDay(7)) + toIntervalHour(6)
SETTINGS index_granularity = 8192
AS
SELECT hotelId,
ctr,
cvr,
modelVersion,
queryTime,
sessionId,
click_bool,
book_bool
FROM (
SELECT a.hotelId AS hotelId,
ctr,
cvr,
modelVersion,
queryTime,
a.sessionId AS sessionId,
multiIf((d.sessionId != '') OR (e.sessionId != ''), 1, 0) AS click_bool,
multiIf(e.sessionId != '', 1, 0) AS book_bool
FROM (
SELECT hotelId,
ctr,
cvr,
modelVersion,
scene,
sessionId,
queryTime
FROM auc.hotel_impress_delay
) AS a
GLOBAL
LEFT JOIN
(
SELECT sessionId,
hotelId
FROM auc.hotel_click_all
WHERE eventTime >= toDateTime(now() - 1800)
) AS d ON (a.sessionId = d.sessionId) AND (a.hotelId = d.hotelId)
GLOBAL
LEFT JOIN
(
SELECT sessionId,
hotelId
FROM auc.hotel_order_all
WHERE eventTime >= toDateTime(now() - 1800)
) AS e ON (a.sessionId = e.sessionId) AND (a.hotelId = e.hotelId)
);

在实际运行时,我们会发现按照原来的flink实时写入数据流的方式,物化视图里有点击和订购的曝光正样本数据量非常少,甚至接近0。这是因为曝光是发生在点击和订购之前的,也就是说在曝光数据写入时,点击和订购表中还没有与之相关的记录(在不考虑处理延迟的情况下)。这时原本为正样本的,现在就会被标记成负样本,AUC的计算结果与实时join的方法相比,会有非常大差距。

为了解决这一问题,我们需要将曝光数据延迟写入,待与之对应的点击和订购数据写入后,再写入相应的曝光数据。
经过数据分析,我们发现在同一个会话(session)中,90%以上的用户会在曝光后的10分钟内下单。所以我们将曝光的数据延迟10分钟写入,就能大大提高正负样本标记的准确度。

Flink可以通过重写processElement和onTimer方法实现延迟处理数据,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception {
//所有消息延迟10分钟
long timer = ctx.timerService().currentWatermark() + 600;
ctx.timerService().registerEventTimeTimer(timer);
state.update(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
if (state.value() == null) {
return;
}
out.collect(state.value());
state.clear();
}

查询方法(一段时间内不同实验版本的AUC对比):

1
2
3
4
5
6
7
8
select modelVersion,arrayAUC(cvr_arr,order_arr) as auc from (
select modelVersion,groupArray(cvr) as cvr_arr,groupArray(book_bool) as order_arr from(
select cvr, book_bool, modelVersion
from auc_mv.impress_click_order_join_mv_all
WHERE queryTime >= $from
AND queryTime < $to)
group by modelVersion)

查询结果如图:
一段时间内AUC比较

这个方法在数据写入时就将数据提前join好,相当于把查询时需要join的计算提前完成,减少查询时ClickHouse的计算量,同时曝光、点击和订购的数据都写入了一张宽表内,还可以做一些其它关于用户行为的一些分析。
经过我们测试,这种方法和实时join计算的AUC结果误差在0.01上下,能够满足不同实验版本之间的AUC对比,此方法的查询计算速度大大提高,且能够查询的时间跨度也大大增加,即使超过24小时的数据也很快返回。同样的2000w条数据,此方法对比上述的实时join的计算方法查询时间从18s降低到了2s,节约了90%的时间。

使用物化视图 right join

使用物化视图left join构造大宽表的方法,因为需要延迟写入曝光数据 ,事实上牺牲了一些实时性。如果想提升一些实时性,也愿意牺牲一些性能,那么使用物化视图right join的方法就是前两种方法的折中。

因为订购和点击肯定是发生在曝光之后的,所以理论上点击和订购数据在写入时,其对应的曝光数据已经存在在数据表中了,使用点击和订购去right join曝光数据是不需要延迟写入的,这样就提高的实时性。(考虑到现实情况可能存在的处理延迟,可以将点击和曝光延迟1分钟写入,提高正负样本的标记的准确率)。

使用这种方法,订购和点击需要分别right join 曝光,需要构造两种物化视图表,同时right join的右表——曝光表是大表,数据量较大,比较耗费资源。为了减小join次数,应该控制Flink写入的批次,订购和点击写入的批次越少,那么join的次数也越少。

在这种物化视图right join构建宽表的场景中,同一条曝光的正样本数据会被join多次,但只有当与其对应的订购数据写入的那次join才会被正确标记成正样本。假设这条数据被join了十次,那么物化视图中,这条数据就会出现十次,只有其中一条才是我们需要的正确数据。这时候就要用到ClickHouse的ReplacingMergeTree引擎的特性,我们将订单时间也写入物化视图中,并使用此字段作为引擎的参数。数据合并时,若排序键相同,ClickHouse就会留下ordertime最新的数据。如果某次right join中,没有该曝光数据对应的订购数据,那么ordertime字段会填充默认的1970-01-01 00:00:00的数据;如果该批次right join有对应的订购数据,那么ordertime字段会填充为该订单发生的时间,这个时间一定是大于默认的时间,那么在ClickHouse合并数据时,就会只留下正确的数据。计算点击数据也是同理。

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
CREATE table auc.impress_order_join
(
`hotelId` Int32,
`cvr` Float64,
`modelversion` LowCardinality(String),
`sessionId` String,
`eventTime` DateTime,
`orderId` String,
`if_order` String,
orderTime DateTime
)
ENGINE = ReplacingMergeTree(orderTime)
PARTITION BY toYYYYMMDD(eventTime)
ORDER BY (eventTime, hotelId, sessionId)
TTL eventTime + toIntervalDay(3)
SETTINGS index_granularity = 8192;


CREATE MATERIALIZED VIEW
auc.impress_order_join_mv to auc.impress_order_join
(
`hotelId` Int32,
`cvr` Float64,
`modelversion` LowCardinality (String),
`sessionId` String,
`eventTime` DateTime,
`orderId` String,
`if_order` String,
orderTime DateTime
)

AS
SELECT impress.hotelId AS hotelId,
cvr,
modelversion,
impress.sessionId AS sessionId,
impress. eventTime as eventTime,
orderId,
a.sessionId as if_order,
a.eventTime as orderTime
FROM (
SELECT distinct sessionId,
hotelId,
orderId,
eventTime
FROM auc.hotel_order
) a
GLOBAL
right join (
SELECT hotelId,
cvr,
modelversion,
sessionId,
eventTime
FROM auc.hotel_impress_all
where eventTime >= toDateTime(now() - 1800)
) as impress on impress.hotelId = a.hotelId and impress.sessionId = a.sessionId;

查询方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
select modelversion, arrayAUC(cvr_arr, order_arr) as auc
from (
select modelversion,
groupArray(cvr) as cvr_arr,
groupArray(order_bool) as order_arr
from (
select modelversion,
cvr,
case when if_order != '' then 1 else 0 end as order_bool
from (
select distinct cvr, modelversion, eventTime, if_order
from auc.impress_order_join
WHERE eventTime >= $from
AND eventTime < $to
)
)
group by modelversion
)

物化视图right join的方法在写入时计算量更大,但最终查询结果相对left join的方式更准确,且数据延迟更低,适合服务器性能较好的情况。

结语

为了监控模型的预测效果,本文详细介绍了如何借助ClickHouse和Flink实时计算线上模型的AUC,由于无法在Flink中完成left join然后写入ClickHouse,为了将曝光、点击和订购数据写入同一张宽表内,我们使用了ClickHouse的物化视图功能,并介绍了Flink如何延迟写入数据。

物化视图本质上是将查询的部分计算逻辑前移到数据写入时进行,物化视图写入时的计算量越大,ClickHouse写入时压力也越大,如果ClickHouse服务器的性能受限的话,很有可能造成Flink写入任务的背压,所以要根据服务器性能、需求、查询频次等多方面考虑选择AUC计算方法。