From cd9f27ab0898babdfa2c4f4cbd8cf045fa5ac3cc Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Wed, 18 Dec 2024 10:42:14 +0530 Subject: [PATCH 1/6] Fix: QS: logs pipelines: better validation of pipelines being saved (#6652) * chore: add test validating invalid field paths in pipeline operators are rejected * chore: refactor posted pipelines validation to use a controller method * fix: run a collector simulation to validate pipeline config being saved * chore: minor cleanup --- pkg/query-service/app/http_handler.go | 7 ++-- .../app/logparsingpipeline/controller.go | 39 +++++++++++++++++++ .../integration/logparsingpipeline_test.go | 21 ++++++++++ 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e14eec7ef6..ba16894438 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4075,10 +4075,9 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines") } - for _, p := range postable { - if err := p.IsValid(); err != nil { - return nil, model.BadRequestStr(err.Error()) - } + validationErr := aH.LogsParsingPipelineController.ValidatePipelines(ctx, postable) + if validationErr != nil { + return nil, validationErr } return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable) diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 2e6b0ba4d3..4929f72f78 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -94,6 +94,45 @@ func (ic *LogParsingPipelineController) ApplyPipelines( return ic.GetPipelinesByVersion(ctx, cfg.Version) } +func (ic *LogParsingPipelineController) ValidatePipelines( + ctx context.Context, + postedPipelines []PostablePipeline, +) *model.ApiError { + for _, p := range postedPipelines { + if err := p.IsValid(); err != nil { + return model.BadRequestStr(err.Error()) + } + } + + // Also run a collector simulation to ensure config is fit + // for e2e use with a collector + pipelines := []Pipeline{} + for _, pp := range postedPipelines { + pipelines = append(pipelines, Pipeline{ + Id: uuid.New().String(), + OrderId: pp.OrderId, + Enabled: pp.Enabled, + Name: pp.Name, + Alias: pp.Alias, + Description: &pp.Description, + Filter: pp.Filter, + Config: pp.Config, + }) + } + + sampleLogs := []model.SignozLog{{Body: ""}} + _, _, simulationErr := SimulatePipelinesProcessing( + ctx, pipelines, sampleLogs, + ) + if simulationErr != nil { + return model.BadRequest(fmt.Errorf( + "invalid pipelines config: %w", simulationErr.ToError(), + )) + } + + return nil +} + // Returns effective list of pipelines including user created // pipelines and pipelines for installed integrations func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index e06f7a280b..9e07b604d7 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -350,6 +350,27 @@ func TestLogPipelinesValidation(t *testing.T) { }, }, ExpectedResponseStatusCode: 400, + }, { + Name: "Invalid from field path", + Pipeline: logparsingpipeline.PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: validPipelineFilterSet, + Config: []logparsingpipeline.PipelineOperator{ + { + OrderId: 1, + ID: "move", + Type: "move", + From: `attributes.temp_parsed_body."@l"`, + To: "attributes.test", + Enabled: true, + Name: "test move", + }, + }, + }, + ExpectedResponseStatusCode: 400, }, } From 7031c866e8cdcda52a7acf1d4a65e2e4af307ce9 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 18 Dec 2024 17:55:22 +0700 Subject: [PATCH 2/6] fix: add flags for using trace new schema (#6651) --- deploy/docker-swarm/clickhouse-setup/docker-compose.yaml | 3 ++- .../docker-swarm/clickhouse-setup/otel-collector-config.yaml | 1 + deploy/docker/clickhouse-setup/docker-compose-local.yaml | 3 ++- deploy/docker/clickhouse-setup/docker-compose-minimal.yaml | 3 ++- deploy/docker/clickhouse-setup/docker-compose.testing.yaml | 3 ++- deploy/docker/clickhouse-setup/otel-collector-config.yaml | 1 + 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml index 88453360f9..3887e223f7 100644 --- a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml @@ -150,7 +150,8 @@ services: command: [ "-config=/root/config/prometheus.yml", - "--use-logs-new-schema=true" + "--use-logs-new-schema=true", + "--use-trace-new-schema=true" ] # ports: # - "6060:6060" # pprof port diff --git a/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml b/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml index 8c0b30df61..1b81ea214a 100644 --- a/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml +++ b/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml @@ -110,6 +110,7 @@ exporters: clickhousetraces: datasource: tcp://clickhouse:9000/signoz_traces low_cardinal_exception_grouping: ${env:LOW_CARDINAL_EXCEPTION_GROUPING} + use_new_schema: true clickhousemetricswrite: endpoint: tcp://clickhouse:9000/signoz_metrics resource_to_telemetry_conversion: diff --git a/deploy/docker/clickhouse-setup/docker-compose-local.yaml b/deploy/docker/clickhouse-setup/docker-compose-local.yaml index 7effc129fe..7a4222ff8c 100644 --- a/deploy/docker/clickhouse-setup/docker-compose-local.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose-local.yaml @@ -25,7 +25,8 @@ services: command: [ "-config=/root/config/prometheus.yml", - "--use-logs-new-schema=true" + "--use-logs-new-schema=true", + "--use-trace-new-schema=true" ] ports: - "6060:6060" diff --git a/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml b/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml index f737b7d440..37df9590d3 100644 --- a/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml @@ -167,7 +167,8 @@ services: command: [ "-config=/root/config/prometheus.yml", - "--use-logs-new-schema=true" + "--use-logs-new-schema=true", + "--use-trace-new-schema=true" ] # ports: # - "6060:6060" # pprof port diff --git a/deploy/docker/clickhouse-setup/docker-compose.testing.yaml b/deploy/docker/clickhouse-setup/docker-compose.testing.yaml index d90773844e..bd00cf1702 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.testing.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.testing.yaml @@ -173,7 +173,8 @@ services: [ "-config=/root/config/prometheus.yml", "-gateway-url=https://api.staging.signoz.cloud", - "--use-logs-new-schema=true" + "--use-logs-new-schema=true", + "--use-trace-new-schema=true" ] # ports: # - "6060:6060" # pprof port diff --git a/deploy/docker/clickhouse-setup/otel-collector-config.yaml b/deploy/docker/clickhouse-setup/otel-collector-config.yaml index cba7756d8e..b73acdea11 100644 --- a/deploy/docker/clickhouse-setup/otel-collector-config.yaml +++ b/deploy/docker/clickhouse-setup/otel-collector-config.yaml @@ -119,6 +119,7 @@ exporters: clickhousetraces: datasource: tcp://clickhouse:9000/signoz_traces low_cardinal_exception_grouping: ${env:LOW_CARDINAL_EXCEPTION_GROUPING} + use_new_schema: true clickhousemetricswrite: endpoint: tcp://clickhouse:9000/signoz_metrics resource_to_telemetry_conversion: From 83f6dea2db7a823a21a013d7d9c51ed5d7c7fb0f Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 18 Dec 2024 17:04:01 +0530 Subject: [PATCH 3/6] Add support for trace_v3 schema in messaging queues (#6663) feat: support trace v3 queries --- .../integrations/messagingQueues/kafka/sql.go | 180 +++++++++--------- pkg/query-service/constants/constants.go | 2 +- 2 files changed, 91 insertions(+), 91 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index bf07316bb2..67b32938f0 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -9,25 +9,25 @@ func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queu query := fmt.Sprintf(` WITH consumer_query AS ( SELECT - serviceName, + resource_string_service$$name, quantile(0.99)(durationNano) / 1000000 AS p99, COUNT(*) AS total_requests, - sumIf(1, statusCode = 2) AS error_count, - avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count, + avg(CASE WHEN has(attributes_number, 'messaging.message.body.size') THEN attributes_number['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName + AND attribute_string_messaging$$system = '%s' + AND attributes_string['messaging.destination.name'] = '%s' + AND attributes_string['messaging.destination.partition.id'] = '%s' + AND attributes_string['messaging.kafka.consumer.group'] = '%s' + GROUP BY resource_string_service$$name ) SELECT - serviceName AS service_name, + resource_string_service$$name AS service_name, p99, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE(total_requests / %d, 0) AS throughput, @@ -35,7 +35,7 @@ SELECT FROM consumer_query ORDER BY - serviceName; + resource_string_service$$name; `, start, end, queueType, topic, partition, consumerGroup, timeRange) return query } @@ -48,14 +48,14 @@ WITH partition_query AS ( SELECT quantile(0.99)(durationNano) / 1000000 AS p99, count(*) AS total_requests, - stringTagMap['messaging.destination.name'] AS topic, - stringTagMap['messaging.destination.partition.id'] AS partition - FROM signoz_traces.distributed_signoz_index_v2 + attributes_string['messaging.destination.name'] AS topic, + attributes_string['messaging.destination.partition.id'] AS partition + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 - AND msgSystem = '%s' + AND attribute_string_messaging$$system = '%s' GROUP BY topic, partition ) @@ -78,25 +78,25 @@ func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, que query := fmt.Sprintf(` WITH consumer_pl AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, - serviceName, + attributes_string['messaging.kafka.consumer.group'] AS consumer_group, + resource_string_service$$name, quantile(0.99)(durationNano) / 1000000 AS p99, COUNT(*) AS total_requests, - sumIf(1, statusCode = 2) AS error_count - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND attribute_string_messaging$$system = '%s' + AND attributes_string['messaging.destination.name'] = '%s' + AND attributes_string['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, resource_string_service$$name ) SELECT consumer_group, - serviceName AS service_name, + resource_string_service$$name AS service_name, p99, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE(total_requests / %d, 0) AS throughput @@ -115,23 +115,23 @@ func generateProducerPartitionThroughputSQL(start, end int64, queueType string) query := fmt.Sprintf(` WITH producer_latency AS ( SELECT - serviceName, + resource_string_service$$name, quantile(0.99)(durationNano) / 1000000 AS p99, - stringTagMap['messaging.destination.name'] AS topic, + attributes_string['messaging.destination.name'] AS topic, COUNT(*) AS total_requests, - sumIf(1, statusCode = 2) AS error_count - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 - AND msgSystem = '%s' - GROUP BY topic, serviceName + AND attribute_string_messaging$$system = '%s' + GROUP BY topic, resource_string_service$$name ) SELECT topic, - serviceName AS service_name, + resource_string_service$$name AS service_name, p99, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE(total_requests / %d, 0) AS throughput @@ -148,17 +148,17 @@ func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType WITH consumer_latency AS ( SELECT quantile(0.99)(durationNano) / 1000000 AS p99, - stringTagMap['messaging.destination.partition.id'] AS partition, + attributes_string['messaging.destination.partition.id'] AS partition, COUNT(*) AS total_requests, - sumIf(1, statusCode = 2) AS error_count - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 - AND serviceName = '%s' - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' + AND resource_string_service$$name = '%s' + AND attribute_string_messaging$$system = '%s' + AND attributes_string['messaging.destination.name'] = '%s' GROUP BY partition ) @@ -179,24 +179,24 @@ func generateConsumerLatencySQL(start, end int64, queueType string) string { query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT - serviceName, - stringTagMap['messaging.destination.name'] AS topic, + resource_string_service$$name, + attributes_string['messaging.destination.name'] AS topic, quantile(0.99)(durationNano) / 1000000 AS p99, COUNT(*) AS total_requests, - sumIf(1, statusCode = 2) AS error_count, - SUM(numberTagMap['messaging.message.body.size']) AS total_bytes - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count, + SUM(attributes_number['messaging.message.body.size']) AS total_bytes + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND msgSystem = '%s' - GROUP BY topic, serviceName + AND attribute_string_messaging$$system = '%s' + GROUP BY topic, resource_string_service$$name ) SELECT topic, - serviceName AS service_name, + resource_string_service$$name AS service_name, p99, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE(total_requests / %d, 0) AS ingestion_rate, @@ -216,17 +216,17 @@ func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueTy WITH consumer_latency AS ( SELECT quantile(0.99)(durationNano) / 1000000 AS p99, - stringTagMap['messaging.destination.partition.id'] AS partition, + attributes_string['messaging.destination.partition.id'] AS partition, COUNT(*) AS total_requests, - sumIf(1, statusCode = 2) AS error_count - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND serviceName = '%s' - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' + AND resource_string_service$$name = '%s' + AND attribute_string_messaging$$system = '%s' + AND attributes_string['messaging.destination.name'] = '%s' GROUP BY partition ) @@ -246,26 +246,26 @@ func generateProducerConsumerEvalSQL(start, end int64, queueType string, evalTim query := fmt.Sprintf(` WITH trace_data AS ( SELECT - p.serviceName AS producer_service, - c.serviceName AS consumer_service, - p.traceID, + p.resource_string_service$$name AS producer_service, + c.resource_string_service$$name AS consumer_service, + p.trace_id, p.timestamp AS producer_timestamp, c.timestamp AS consumer_timestamp, p.durationNano AS durationNano, (toUnixTimestamp64Nano(c.timestamp) - toUnixTimestamp64Nano(p.timestamp)) + p.durationNano AS time_difference FROM - signoz_traces.distributed_signoz_index_v2 p + signoz_traces.distributed_signoz_index_v3 p INNER JOIN - signoz_traces.distributed_signoz_index_v2 c - ON p.traceID = c.traceID - AND c.parentSpanID = p.spanID + signoz_traces.distributed_signoz_index_v3 c + ON p.trace_id = c.trace_id + AND c.parent_span_id = p.span_id WHERE p.kind = 4 AND c.kind = 5 AND toUnixTimestamp64Nano(p.timestamp) BETWEEN '%d' AND '%d' AND toUnixTimestamp64Nano(c.timestamp) BETWEEN '%d' AND '%d' - AND c.msgSystem = '%s' - AND p.msgSystem = '%s' + AND c.attribute_string_messaging$$system = '%s' + AND p.attribute_string_messaging$$system = '%s' ) SELECT @@ -278,7 +278,7 @@ SELECT arrayMap(x -> x.1, arraySort( x -> -x.2, - groupArrayIf((traceID, time_difference), time_difference > '%d') + groupArrayIf((trace_id, time_difference), time_difference > '%d') ) ), 1, 10 @@ -296,30 +296,30 @@ func generateProducerSQL(start, end int64, topic, partition, queueType string) s query := fmt.Sprintf(` WITH producer_query AS ( SELECT - serviceName, + resource_string_service$$name, quantile(0.99)(durationNano) / 1000000 AS p99, count(*) AS total_count, - sumIf(1, statusCode = 2) AS error_count - FROM signoz_traces.distributed_signoz_index_v2 + sumIf(1, status_code = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY serviceName + AND attribute_string_messaging$$system = '%s' + AND attributes_string['messaging.destination.name'] = '%s' + AND attributes_string['messaging.destination.partition.id'] = '%s' + GROUP BY resource_string_service$$name ) SELECT - serviceName AS service_name, + resource_string_service$$name AS service_name, p99, COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, COALESCE(total_count / %d, 0) AS throughput FROM producer_query ORDER BY - serviceName; + resource_string_service$$name; `, start, end, queueType, topic, partition, timeRange) return query } @@ -328,18 +328,18 @@ func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partit timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` SELECT - stringTagMap['messaging.client_id'] AS client_id, - stringTagMap['service.instance.id'] AS service_instance_id, - serviceName AS service_name, + attributes_string['messaging.client_id'] AS client_id, + attributes_string['service.instance.id'] AS service_instance_id, + resource_string_service$$name AS service_name, count(*) / %d AS throughput -FROM signoz_traces.distributed_signoz_index_v2 +FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' + AND attribute_string_messaging$$system = '%s' + AND attributes_string['messaging.kafka.consumer.group'] = '%s' + AND attributes_string['messaging.destination.partition.id'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY throughput DESC `, timeRange, start, end, queueType, consumerGroup, partitionID) @@ -350,12 +350,12 @@ func onboardProducersSQL(start, end int64, queueType string) string { query := fmt.Sprintf(` SELECT COUNT(*) = 0 AS entries, - COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(attribute_string_messaging$$system = '%s', 1, NULL)) = 0 AS queue, COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind, - COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, - COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition + COUNT(IF(has(attributes_string, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(attributes_string, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition FROM - signoz_traces.distributed_signoz_index_v2 + signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d';`, queueType, start, end) @@ -366,16 +366,16 @@ func onboardConsumerSQL(start, end int64, queueType string) string { query := fmt.Sprintf(` SELECT COUNT(*) = 0 AS entries, - COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(attribute_string_messaging$$system = '%s', 1, NULL)) = 0 AS queue, COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind, - COUNT(serviceName) = 0 AS svc, - COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, - COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition, - COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, - COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, - COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, - COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid -FROM signoz_traces.distributed_signoz_index_v2 + COUNT(resource_string_service$$name) = 0 AS svc, + COUNT(IF(has(attributes_string, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(attributes_string, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition, + COUNT(IF(has(attributes_string, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, + COUNT(IF(has(attributes_number, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, + COUNT(IF(has(attributes_string, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, + COUNT(IF(has(attributes_string, 'service.instance.id'), 1, NULL)) = 0 AS instanceid +FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d';`, queueType, start, end) diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 7d6f087188..33caecb0d6 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -90,7 +90,7 @@ func EnableHostsInfraMonitoring() bool { return GetOrDefaultEnv("ENABLE_INFRA_METRICS", "true") == "true" } -var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false") +var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "true") func IsDurationSortFeatureEnabled() bool { isDurationSortFeatureEnabledStr := DurationSortFeature From 83aa48c7215591470e68f1f09f814bc329a76ee3 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 18 Dec 2024 19:06:22 +0530 Subject: [PATCH 4/6] update service.instance.id (#6665) * nit: update resource id and revert the flag --- .../app/integrations/messagingQueues/kafka/sql.go | 4 ++-- pkg/query-service/constants/constants.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 67b32938f0..05577ab8b9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -329,7 +329,7 @@ func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partit query := fmt.Sprintf(` SELECT attributes_string['messaging.client_id'] AS client_id, - attributes_string['service.instance.id'] AS service_instance_id, + resources_string['service.instance.id'] AS service_instance_id, resource_string_service$$name AS service_name, count(*) / %d AS throughput FROM signoz_traces.distributed_signoz_index_v3 @@ -374,7 +374,7 @@ SELECT COUNT(IF(has(attributes_string, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, COUNT(IF(has(attributes_number, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, COUNT(IF(has(attributes_string, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, - COUNT(IF(has(attributes_string, 'service.instance.id'), 1, NULL)) = 0 AS instanceid + COUNT(IF(has(resources_string, 'service.instance.id'), 1, NULL)) = 0 AS instanceid FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 33caecb0d6..7d6f087188 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -90,7 +90,7 @@ func EnableHostsInfraMonitoring() bool { return GetOrDefaultEnv("ENABLE_INFRA_METRICS", "true") == "true" } -var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "true") +var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false") func IsDurationSortFeatureEnabled() bool { isDurationSortFeatureEnabledStr := DurationSortFeature From 85cf4f4e2ef6cfc69c912e567cbf03b2efda647c Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 18 Dec 2024 21:07:31 +0700 Subject: [PATCH 5/6] fix: use placehold in limit and use proper exists (#6667) --- pkg/query-service/app/querier/helper.go | 2 +- pkg/query-service/app/querier/v2/helper.go | 2 +- pkg/query-service/app/traces/v4/enrich.go | 4 ++-- .../app/traces/v4/query_builder.go | 19 ++++++++++++++++--- .../app/traces/v4/query_builder_test.go | 8 +++++--- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 1b2acbab8b..a82b3f815b 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -190,7 +190,7 @@ func (q *querier) runBuilderQuery( ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} return } - query = fmt.Sprintf(placeholderQuery, limitQuery) + query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1) } else { query, err = tracesQueryBuilder( start, diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index b62fffe106..4846aa3f3b 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -190,7 +190,7 @@ func (q *querier) runBuilderQuery( ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} return } - query = fmt.Sprintf(placeholderQuery, limitQuery) + query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1) } else { query, err = tracesQueryBuilder( start, diff --git a/pkg/query-service/app/traces/v4/enrich.go b/pkg/query-service/app/traces/v4/enrich.go index 848e489e86..c0974ea0b0 100644 --- a/pkg/query-service/app/traces/v4/enrich.go +++ b/pkg/query-service/app/traces/v4/enrich.go @@ -34,8 +34,8 @@ func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) return v } - for _, key := range utils.GenerateEnrichmentKeys(key) { - if val, ok := keys[key]; ok { + for _, tkey := range utils.GenerateEnrichmentKeys(key) { + if val, ok := keys[tkey]; ok { return val } } diff --git a/pkg/query-service/app/traces/v4/query_builder.go b/pkg/query-service/app/traces/v4/query_builder.go index d650e1cdaa..80d39b9016 100644 --- a/pkg/query-service/app/traces/v4/query_builder.go +++ b/pkg/query-service/app/traces/v4/query_builder.go @@ -74,6 +74,19 @@ func getSelectLabels(groupBy []v3.AttributeKey) string { return strings.Join(labels, ",") } +// TODO(nitya): use the _exists columns as well in the future similar to logs +func existsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) { + if key.DataType == v3.AttributeKeyDataTypeString { + if op == v3.FilterOperatorExists { + return fmt.Sprintf("%s %s ''", getColumnName(key), tracesOperatorMappingV3[v3.FilterOperatorNotEqual]), nil + } else { + return fmt.Sprintf("%s %s ''", getColumnName(key), tracesOperatorMappingV3[v3.FilterOperatorEqual]), nil + } + } else { + return "", fmt.Errorf("unsupported operation, exists and not exists can only be applied on custom attributes or string type columns") + } +} + func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) { var conditions []string @@ -110,7 +123,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) { conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal)) case v3.FilterOperatorExists, v3.FilterOperatorNotExists: if item.Key.IsColumn { - subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(item.Key, item.Operator) + subQuery, err := existsSubQueryForFixedColumn(item.Key, item.Operator) if err != nil { return "", err } @@ -312,7 +325,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3. } if options.GraphLimitQtype == constants.SecondQueryGraphLimit { - filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)" + filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" } switch mq.AggregateOperator { @@ -350,7 +363,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3. case v3.AggregateOperatorCount: if mq.AggregateAttribute.Key != "" { if mq.AggregateAttribute.IsColumn { - subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists) + subQuery, err := existsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists) if err == nil { filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery) } diff --git a/pkg/query-service/app/traces/v4/query_builder_test.go b/pkg/query-service/app/traces/v4/query_builder_test.go index 9db2e815f9..a9b055b67f 100644 --- a/pkg/query-service/app/traces/v4/query_builder_test.go +++ b/pkg/query-service/app/traces/v4/query_builder_test.go @@ -265,9 +265,11 @@ func Test_buildTracesFilterQuery(t *testing.T) { {Key: v3.AttributeKey{Key: "isDone", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists}, {Key: v3.AttributeKey{Key: "host1", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists}, {Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists}, + {Key: v3.AttributeKey{Key: "http_url", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists}, + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists}, }}, }, - want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND path = ''", + want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND `attribute_string_path` = '' AND http_url = '' AND `attribute_string_http$$route` = ''", }, } for _, tt := range tests { @@ -683,7 +685,7 @@ func TestPrepareTracesQuery(t *testing.T) { }, }, want: "SELECT attributes_string['function'] as `function`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 where " + - "(timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND mapContains(attributes_string, 'function') AND (`function`) GLOBAL IN (%s) group by `function` order by value DESC", + "(timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND mapContains(attributes_string, 'function') AND (`function`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `function` order by value DESC", }, { name: "test with limit with resources- first", @@ -766,7 +768,7 @@ func TestPrepareTracesQuery(t *testing.T) { want: "SELECT `attribute_string_function` as `function`, serviceName as `serviceName`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 " + "where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_number['line'] = 100 " + "AND (resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) " + - "AND simpleJSONExtractString(labels, 'hostname') = 'server1' AND labels like '%hostname%server1%')) AND (`function`,`serviceName`) GLOBAL IN (%s) group by `function`,`serviceName` order by value DESC", + "AND simpleJSONExtractString(labels, 'hostname') = 'server1' AND labels like '%hostname%server1%')) AND (`function`,`serviceName`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `function`,`serviceName` order by value DESC", }, } From 60dc479a195cab7419528096e82bad6b9bc1e78f Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 18 Dec 2024 19:57:33 +0530 Subject: [PATCH 6/6] fix: add bucketing (#6669) Co-authored-by: Nityananda Gohain --- .../integrations/messagingQueues/kafka/sql.go | 67 +++++++++++++++---- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 05577ab8b9..9b943acbc8 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -6,6 +6,8 @@ import ( func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_query AS ( SELECT @@ -18,6 +20,8 @@ WITH consumer_query AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.destination.name'] = '%s' @@ -36,13 +40,15 @@ FROM consumer_query ORDER BY resource_string_service$$name; -`, start, end, queueType, topic, partition, consumerGroup, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, consumerGroup, timeRange) return query } // S1 landing func generatePartitionLatencySQL(start, end int64, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH partition_query AS ( SELECT @@ -54,6 +60,8 @@ WITH partition_query AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND attribute_string_messaging$$system = '%s' GROUP BY topic, partition @@ -68,13 +76,15 @@ FROM partition_query ORDER BY topic; -`, start, end, queueType, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange) return query } // S1 consumer func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_pl AS ( SELECT @@ -87,6 +97,8 @@ WITH consumer_pl AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.destination.name'] = '%s' @@ -104,14 +116,15 @@ FROM consumer_pl ORDER BY consumer_group; -`, start, end, queueType, topic, partition, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange) return query } // S3, producer overview func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string { timeRange := (end - start) / 1000000000 - // t, svc, rps, byte*, p99, err + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 // t, svc, rps, byte*, p99, err query := fmt.Sprintf(` WITH producer_latency AS ( SELECT @@ -124,6 +137,8 @@ WITH producer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND attribute_string_messaging$$system = '%s' GROUP BY topic, resource_string_service$$name @@ -137,13 +152,15 @@ SELECT COALESCE(total_requests / %d, 0) AS throughput FROM producer_latency -`, start, end, queueType, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange) return query } // S3, producer topic/service overview func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT @@ -155,6 +172,8 @@ WITH consumer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND resource_string_service$$name = '%s' AND attribute_string_messaging$$system = '%s' @@ -169,13 +188,15 @@ SELECT COALESCE(total_requests / %d, 0) AS throughput FROM consumer_latency -`, start, end, service, queueType, topic, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange) return query } // S3 consumer overview func generateConsumerLatencySQL(start, end int64, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT @@ -189,6 +210,8 @@ WITH consumer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' GROUP BY topic, resource_string_service$$name @@ -205,13 +228,15 @@ FROM consumer_latency ORDER BY topic; -`, start, end, queueType, timeRange, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange, timeRange) return query } // S3 consumer topic/service func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT @@ -223,6 +248,8 @@ WITH consumer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND resource_string_service$$name = '%s' AND attribute_string_messaging$$system = '%s' @@ -237,7 +264,7 @@ SELECT COALESCE(total_requests / %d, 0) AS throughput FROM consumer_latency -`, start, end, service, queueType, topic, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange) return query } @@ -293,6 +320,8 @@ GROUP BY func generateProducerSQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH producer_query AS ( SELECT @@ -304,6 +333,8 @@ WITH producer_query AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.destination.name'] = '%s' @@ -320,12 +351,14 @@ FROM producer_query ORDER BY resource_string_service$$name; -`, start, end, queueType, topic, partition, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange) return query } func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` SELECT attributes_string['messaging.client_id'] AS client_id, @@ -336,17 +369,21 @@ FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.kafka.consumer.group'] = '%s' AND attributes_string['messaging.destination.partition.id'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY throughput DESC -`, timeRange, start, end, queueType, consumerGroup, partitionID) +`, timeRange, start, end, tsBucketStart, tsBucketEnd, queueType, consumerGroup, partitionID) return query } func onboardProducersSQL(start, end int64, queueType string) string { + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` SELECT COUNT(*) = 0 AS entries, @@ -358,11 +395,15 @@ FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d';`, queueType, start, end, tsBucketStart, tsBucketEnd) return query } func onboardConsumerSQL(start, end int64, queueType string) string { + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` SELECT COUNT(*) = 0 AS entries, @@ -378,6 +419,8 @@ SELECT FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' ;`, queueType, start, end, tsBucketStart, tsBucketEnd) return query }