X Tutup
Skip to content

Memory leak in graphql subscriptions #4314

@snuderl

Description

@snuderl

Describe the bug
We are tracking down a memory leak in our graphql subscription server. Bellow is a heap dump of the java process. It seems that per level dataloader is allocated per each subscription message and only released when subscription ends.

Image

It seems like the dataloader strategy is allocation a record per event which never gets cleared for subscription duration. Summary from Claude code:

  PerLevelDataLoaderDispatchStrategy and ExhaustedDataLoaderDispatchStrategy both maintain a ConcurrentHashMap<AlternativeCallContext, CallStack> field called alternativeCallContextMap.
  For each subscription event, SubscriptionExecutionStrategy.executeSubscriptionEvent() calls newSubscriptionExecution() which adds a new entry, but the entry is never removed after
  subscriptionEventCompletionDone() completes.

To Reproduce

We are using graphql-java 25.0 through netflix-dgs. We have ticker mode enabled for dataloaders.

package com.opensea.materialization.services.graphqlapi.lib.execution

import graphql.ExecutionInput
import graphql.Profiler
import graphql.execution.CoercedVariables
import graphql.execution.ExecutionContextBuilder
import graphql.execution.ExecutionId
import graphql.execution.incremental.AlternativeCallContext
import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy
import graphql.language.OperationDefinition
import graphql.parser.Parser
import graphql.schema.idl.RuntimeWiring
import graphql.schema.idl.SchemaGenerator
import graphql.schema.idl.SchemaParser
import io.kotest.matchers.shouldBe
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import org.dataloader.DataLoaderRegistry
import org.junit.jupiter.api.Test

/**
 * Reproducer for a memory leak in graphql-java v25's PerLevelDataLoaderDispatchStrategy.
 *
 * Each subscription event adds a CallStack entry to an internal ConcurrentHashMap
 * (`alternativeCallContextMap`) via [newSubscriptionExecution], but the entry is never
 * removed after [subscriptionEventCompletionDone] completes. This causes unbounded heap
 * growth for long-lived subscriptions.
 *
 * The test calls the strategy methods exactly as SubscriptionExecutionStrategy does,
 * then inspects the map size via reflection.
 */
class SubscriptionDispatchStrategyLeakReproducerTest {

    @Test
    fun `alternativeCallContextMap grows by one entry per subscription event and is never cleaned up`() {
        val eventCount = 1_000
        val strategy = createStrategy()

        // Simulate what SubscriptionExecutionStrategy.executeSubscriptionEvent() does
        // for each subscription event: it creates a fresh AlternativeCallContext(1, 1),
        // calls newSubscriptionExecution, executes the selection set, then calls
        // subscriptionEventCompletionDone.
        for (i in 1..eventCount) {
            val callContext = AlternativeCallContext(1, 1)
            strategy.newSubscriptionExecution(callContext)
            strategy.subscriptionEventCompletionDone(callContext)
        }

        // Inspect the internal map via reflection
        val map = getAlternativeCallContextMap(strategy)

        // BUG: the map should be empty after all events complete, but it retains
        // every entry — one per subscription event. This is the memory leak.
        //
        // Expected (fixed): map.size == 0
        // Actual (bug):     map.size == eventCount
        println("alternativeCallContextMap.size after $eventCount events: ${map.size}")
        println("Expected: 0 (entries should be removed after completion)")
        println("Actual:   ${map.size} (leaked — never cleaned up)")

        map.size shouldBe eventCount // documents the bug
    }

    @Test
    fun `leak scales linearly - each event adds ~1 CallStack with internal ConcurrentHashMaps`() {
        val strategy = createStrategy()

        // Measure heap impact: each leaked CallStack contains multiple ConcurrentHashMaps
        // (stateForLevelMap, dispatchedLevels, chainedDLStack.stateMapPerLevel).
        val counts = listOf(100, 500, 1_000)
        var prevSize = 0

        for (count in counts) {
            for (i in prevSize until count) {
                val callContext = AlternativeCallContext(1, 1)
                strategy.newSubscriptionExecution(callContext)
                strategy.subscriptionEventCompletionDone(callContext)
            }
            val map = getAlternativeCallContextMap(strategy)
            println("After $count events: map.size = ${map.size}")
            map.size shouldBe count
            prevSize = count
        }
    }

    private fun createStrategy(): PerLevelDataLoaderDispatchStrategy {
        val sdl = "type Query { dummy: String }"
        val schema =
            SchemaGenerator()
                .makeExecutableSchema(
                    SchemaParser().parse(sdl),
                    RuntimeWiring.newRuntimeWiring()
                        .type("Query") { it.dataFetcher("dummy") { "ok" } }
                        .build(),
                )

        val document = Parser.parse("{ dummy }")
        val input =
            ExecutionInput.newExecutionInput()
                .query("{ dummy }")
                .dataLoaderRegistry(DataLoaderRegistry())
                .build()

        val executionContext =
            ExecutionContextBuilder.newExecutionContextBuilder()
                .executionId(ExecutionId.generate())
                .graphQLSchema(schema)
                .executionInput(input)
                .graphQLContext(input.graphQLContext)
                .dataLoaderRegistry(input.dataLoaderRegistry)
                .document(document)
                .operationDefinition(
                    document.definitions.first() as OperationDefinition
                )
                .coercedVariables(CoercedVariables.emptyVariables())
                .locale(Locale.getDefault())
                .profiler(Profiler.NO_OP)
                .build()

        return PerLevelDataLoaderDispatchStrategy(executionContext)
    }

    private fun getAlternativeCallContextMap(
        strategy: PerLevelDataLoaderDispatchStrategy,
    ): ConcurrentHashMap<*, *> {
        val field = strategy.javaClass.getDeclaredField("alternativeCallContextMap")
        field.isAccessible = true
        return field.get(strategy) as ConcurrentHashMap<*, *>
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      X Tutup