| 授权 | 开源 | 
| 大小 | 40.11MB | 
| 语言 | Java | 
ksqlDB允许您在流和表上定义实例化视图。物化视图由所谓的“持久查询”定义。这些查询被称为持久查询,因为它们使用表维护其增量更新的结果。
CREATE TABLE hourly_metrics AS
  SELECT url, COUNT(*)
  FROM page_views
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY url EMIT CHANGES;SELECT  *  FROM  hourly_metrics 
WHERE URL = 'http://myurl.com' AND WINDOWSTART = '2019-11-20T19:00' ;SELECT  *  FROM  hourly_metrics  EMIT  CHANGES ;CREATE STREAM vip_actions AS
  SELECT userid, page, action
  FROM clickstream c
  LEFT JOIN users u ON c.userid = u.user_id
  WHERE u.level = 'Platinum' EMIT CHANGES;ksqlDB非常适合识别实时数据的模式或异常。通过在数据到达时处理流,您可以识别并适当地以毫秒级延迟掩盖异常事件。
CREATE TABLE possible_fraud AS
  SELECT card_number, count(*)
  FROM authorization_attempts
  WINDOW TUMBLING (SIZE 5 SECONDS)
  GROUP BY card_number
  HAVING count(*) > 3 EMIT CHANGES;CREATE TABLE error_counts AS
  SELECT error_code, count(*)
  FROM monitoring_stream
  WINDOW TUMBLING (SIZE 1 MINUTE)
  WHERE  type = 'ERROR'
  GROUP BY error_code EMIT CHANGES;CREATE STREAM clicks_transformed AS
  SELECT userid, page, action
  FROM clickstream c
  LEFT JOIN users u ON c.userid = u.user_id EMIT CHANGES; CREATE SINK CONNECTOR es_sink WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'topics'          = 'clicks_transformed',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'type.name'       = '',
  'connection.url'  = 'http://elasticsearch:9200');相关源码