> ## Documentation Index
> Fetch the complete documentation index at: https://developer.upsun.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka (Message queue service)

> Apache Kafka is an open-source stream-processing software platform.


export const DynamicCodeBlock = ({language = 'yaml', filename, icon, lines, wrap, expandable, highlight, focus, children}) => {
  const STORAGE_KEY = 'upsun_versions_cache';
  const COMPOSABLE_STORAGE_KEY = 'upsun_composable_cache';
  const CACHE_TTL = 5 * 60 * 1000;
  const API_URL = 'https://meta.upsun.com/images';
  const COMPOSABLE_API_URL = 'https://meta.upsun.com/composable';
  const DEBUG_PREFIX = '[DynamicCodeBlock cache]';
  const [versionData, setVersionData] = useState(null);
  const [versionError, setVersionError] = useState(false);
  const [composableData, setComposableData] = useState(null);
  const [composableError, setComposableError] = useState(false);
  useEffect(() => {
    const fetchData = async () => {
      let cachedData = null;
      let cachedEtag = null;
      if (typeof localStorage !== 'undefined') {
        try {
          const cached = localStorage.getItem(STORAGE_KEY);
          if (cached) {
            const parsed = JSON.parse(cached);
            cachedData = parsed?.data || null;
            cachedEtag = parsed?.etag || null;
            if (cachedData && Date.now() - parsed.timestamp < CACHE_TTL) {
              return cachedData;
            }
          }
        } catch (err) {
          console.error('Failed to load from cache:', err);
        }
      }
      const requestHeaders = cachedEtag ? {
        'If-None-Match': cachedEtag
      } : {};
      console.debug(`${DEBUG_PREFIX} revalidating`, {
        storageKey: STORAGE_KEY,
        hasCachedData: Boolean(cachedData),
        hasCachedEtag: Boolean(cachedEtag)
      });
      const response = await fetch(API_URL, {
        headers: requestHeaders
      });
      if (response.status === 304 && cachedData) {
        console.debug(`${DEBUG_PREFIX} revalidated (304)`, {
          storageKey: STORAGE_KEY
        });
        if (typeof localStorage !== 'undefined') {
          try {
            const etag = response.headers.get('etag') || cachedEtag;
            localStorage.setItem(STORAGE_KEY, JSON.stringify({
              data: cachedData,
              etag,
              timestamp: Date.now()
            }));
          } catch (err) {
            console.error('Failed to refresh cache metadata:', err);
          }
        }
        return cachedData;
      }
      if (!response.ok) throw new Error(`API request failed: ${response.statusText}`);
      const data = await response.json();
      const etag = response.headers.get('etag');
      console.debug(`${DEBUG_PREFIX} refreshed (200)`, {
        storageKey: STORAGE_KEY,
        etag
      });
      if (typeof localStorage !== 'undefined') {
        try {
          localStorage.setItem(STORAGE_KEY, JSON.stringify({
            data,
            etag,
            timestamp: Date.now()
          }));
        } catch (err) {
          console.error('Failed to cache data:', err);
        }
      }
      return data;
    };
    fetchData().then(data => setVersionData(data)).catch(err => console.error('Failed to fetch version data:', err));
  }, []);
  const findHighestVersion = versionsMap => {
    if (!versionsMap || Object.keys(versionsMap).length === 0) return null;
    const entries = Object.entries(versionsMap);
    const active = entries.filter(([, v]) => v.upsun && v.upsun.status === 'supported' || v.upsun && v.upsun.status === 'deprecated');
    const candidates = active.length > 0 ? active : entries;
    let [highestName] = candidates[0];
    for (let i = 1; i < candidates.length; i++) {
      const [currentName] = candidates[i];
      const cp = currentName.split('.').map(Number);
      const hp = highestName.split('.').map(Number);
      for (let j = 0; j < Math.max(cp.length, hp.length); j++) {
        if ((cp[j] || 0) > (hp[j] || 0)) {
          highestName = currentName;
          break;
        } else if ((cp[j] || 0) < (hp[j] || 0)) {
          break;
        }
      }
    }
    return highestName;
  };
  const getVersion = (lang, requestedVersion = 'latest') => {
    if (lang === 'composable') {
      if (!composableData || !composableData.versions || Object.keys(composableData.versions).length === 0) return null;
      if (requestedVersion && requestedVersion !== 'latest') {
        return (requestedVersion in composableData.versions) ? requestedVersion : null;
      }
      return findHighestVersion(composableData.versions);
    }
    if (!versionData) return null;
    const imageData = versionData[lang];
    if (!imageData || !imageData.versions || Object.keys(imageData.versions).length === 0) {
      return null;
    }
    if (requestedVersion && requestedVersion !== 'latest') {
      return (requestedVersion in imageData.versions) ? requestedVersion : null;
    }
    return findHighestVersion(imageData.versions);
  };
  let code = typeof children === 'string' ? children : String(children || '');
  const codeLines = code.split('\n');
  while (codeLines.length > 0 && codeLines[0].trim() === '') codeLines.shift();
  while (codeLines.length > 0 && codeLines[codeLines.length - 1].trim() === '') codeLines.pop();
  if (codeLines.length > 0) {
    const indents = codeLines.filter(line => line.trim().length > 0).map(line => line.match(/^[ \t]*/)[0].length);
    const minIndent = Math.min(...indents);
    code = codeLines.map(line => line.slice(minIndent)).join('\n');
  }
  code = code.replace(/\{\{version:(.*?)\}\}/g, (match, params) => {
    const parts = params.split(':');
    const lang = parts[0];
    const ver = parts[1] || 'latest';
    const isComposable = lang === 'composable';
    const hasError = isComposable ? composableError : versionError;
    const dataReady = isComposable ? composableData !== null : versionData !== null;
    if (hasError) return '(unavailable)';
    if (dataReady) {
      const resolvedVersion = getVersion(lang, ver);
      return resolvedVersion || match;
    }
    return '...';
  });
  const codeBlockProps = {
    language,
    ...filename && ({
      filename
    }),
    ...icon && ({
      icon
    }),
    ...lines !== undefined && ({
      lines
    }),
    ...wrap !== undefined && ({
      wrap
    }),
    ...expandable !== undefined && ({
      expandable
    }),
    ...highlight && ({
      highlight
    }),
    ...focus && ({
      focus
    })
  };
  return <CodeBlock {...codeBlockProps}>{code}</CodeBlock>;
};

export const MetaImageVersionList = ({language, status}) => {
  const [versions, setVersions] = useState([]);
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState(null);
  const STORAGE_KEY = 'upsun_versions_cache';
  const CACHE_TTL = 5 * 60 * 1000;
  const API_URL = 'https://meta.upsun.com/images';
  useEffect(() => {
    if (!language) {
      setLoading(false);
      return;
    }
    setLoading(true);
    setError(null);
    const fetchData = async () => {
      let cachedData = null;
      let cachedEtag = null;
      if (typeof localStorage !== 'undefined') {
        try {
          const cached = localStorage.getItem(STORAGE_KEY);
          if (cached) {
            const parsed = JSON.parse(cached);
            cachedData = parsed?.data || null;
            cachedEtag = parsed?.etag || null;
            if (cachedData && Date.now() - parsed.timestamp < CACHE_TTL) return cachedData;
          }
        } catch (error_) {
          console.error('Failed to load from cache:', error_);
        }
      }
      const requestHeaders = cachedEtag ? {
        'If-None-Match': cachedEtag
      } : {};
      const response = await fetch(API_URL, {
        headers: requestHeaders
      });
      if (response.status === 304 && cachedData) {
        if (typeof localStorage !== 'undefined') {
          try {
            const etag = response.headers.get('etag') || cachedEtag;
            localStorage.setItem(STORAGE_KEY, JSON.stringify({
              data: cachedData,
              etag,
              timestamp: Date.now()
            }));
          } catch (error_) {
            console.error('Failed to refresh cache metadata:', error_);
          }
        }
        return cachedData;
      }
      if (!response.ok) throw new Error(`API request failed: ${response.statusText}`);
      const data = await response.json();
      const etag = response.headers.get('etag');
      if (typeof localStorage !== 'undefined') {
        try {
          localStorage.setItem(STORAGE_KEY, JSON.stringify({
            data,
            etag,
            timestamp: Date.now()
          }));
        } catch (error_) {
          console.error('Failed to cache data:', error_);
        }
      }
      return data;
    };
    fetchData().then(data => {
      if (!data || !data[language]) {
        setVersions([]);
        setLoading(false);
        return;
      }
      const imageData = data[language];
      if (!imageData.versions) {
        setVersions([]);
        setLoading(false);
        return;
      }
      let versionList = Object.entries(imageData.versions).map(([name, v]) => ({
        name,
        status: v.upsun?.status || v.status
      })).sort((a, b) => {
        const aParts = a.name.split('.').map(Number);
        const bParts = b.name.split('.').map(Number);
        const max = Math.max(aParts.length, bParts.length);
        for (let i = 0; i < max; i++) {
          const av = aParts[i] || 0;
          const bv = bParts[i] || 0;
          if (av !== bv) return bv - av;
        }
        return 0;
      });
      if (status) {
        versionList = versionList.filter(v => v.status === status);
      }
      setVersions(versionList);
      setLoading(false);
    }).catch(error_ => {
      console.error('MetaImageVersionList error:', error_);
      setError(error_.message);
      setLoading(false);
    });
  }, [language, status]);
  if (loading) return <p>Loading...</p>;
  if (error) return <p>Error: {error}</p>;
  if (!versions || versions.length === 0) {
    if (status === 'incoming') return null;
    return <p>No versions available! Contact support.</p>;
  }
  let incomingBlock = null;
  if (status === 'incoming' && versions.length > 0) {
    incomingBlock = `These versions are not yet available but are expected to be released soon.`;
  }
  return incomingBlock ? <Note>
      <p>{incomingBlock}</p>
      <ul>
        {versions.map(version => <li className="image-version" key={version.name}>
            {version.name} {version.status === 'beta' && <span className="badge">Beta</span>}
          </li>)}
      </ul>
    </Note> : <ul>
      {versions.map(version => <li className="image-version" key={version.name}>
          {version.name} {version.status === 'beta' && <span className="badge">Beta</span>}
        </li>)}
    </ul>;
};

export const MetaImageVersion = ({language, version}) => {
  const [selectedVersion, setSelectedVersion] = useState(null);
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState(null);
  const isComposable = language === 'composable';
  const STORAGE_KEY = isComposable ? 'upsun_composable_cache' : 'upsun_versions_cache';
  const CACHE_TTL = 5 * 60 * 1000;
  const API_URL = isComposable ? 'https://meta.upsun.com/composable' : 'https://meta.upsun.com/images';
  const findHighestVersion = versionsMap => {
    if (!versionsMap || Object.keys(versionsMap).length === 0) return null;
    const entries = Object.entries(versionsMap);
    const active = entries.filter(([, v]) => v.upsun && v.upsun.status === 'supported' || v.upsun && v.upsun.status === 'deprecated');
    const candidates = active.length > 0 ? active : entries;
    let [highestName] = candidates[0];
    for (let i = 1; i < candidates.length; i++) {
      const [currentName] = candidates[i];
      const cp = currentName.split('.').map(Number);
      const hp = highestName.split('.').map(Number);
      for (let j = 0; j < Math.max(cp.length, hp.length); j++) {
        if ((cp[j] || 0) > (hp[j] || 0)) {
          highestName = currentName;
          break;
        } else if ((cp[j] || 0) < (hp[j] || 0)) {
          break;
        }
      }
    }
    return highestName;
  };
  useEffect(() => {
    if (!language) {
      setLoading(false);
      return;
    }
    setLoading(true);
    setError(null);
    const fetchData = async () => {
      let cachedData = null;
      let cachedEtag = null;
      if (typeof localStorage !== 'undefined') {
        try {
          const cached = localStorage.getItem(STORAGE_KEY);
          if (cached) {
            const parsed = JSON.parse(cached);
            cachedData = parsed?.data || null;
            cachedEtag = parsed?.etag || null;
            if (cachedData && Date.now() - parsed.timestamp < CACHE_TTL) return cachedData;
          }
        } catch (error_) {
          console.error('Failed to load from cache:', error_);
        }
      }
      const requestHeaders = cachedEtag ? {
        'If-None-Match': cachedEtag
      } : {};
      const response = await fetch(API_URL, {
        headers: requestHeaders
      });
      if (response.status === 304 && cachedData) {
        if (typeof localStorage !== 'undefined') {
          try {
            const etag = response.headers.get('etag') || cachedEtag;
            localStorage.setItem(STORAGE_KEY, JSON.stringify({
              data: cachedData,
              etag,
              timestamp: Date.now()
            }));
          } catch (error_) {
            console.error('Failed to refresh cache metadata:', error_);
          }
        }
        return cachedData;
      }
      if (!response.ok) throw new Error(`API request failed: ${response.statusText}`);
      const data = await response.json();
      const etag = response.headers.get('etag');
      if (typeof localStorage !== 'undefined') {
        try {
          localStorage.setItem(STORAGE_KEY, JSON.stringify({
            data,
            etag,
            timestamp: Date.now()
          }));
        } catch (error_) {
          console.error('Failed to cache data:', error_);
        }
      }
      return data;
    };
    fetchData().then(data => {
      if (!data) {
        setSelectedVersion(null);
        setLoading(false);
        return;
      }
      const imageData = isComposable ? data : data[language];
      if (!imageData || !imageData.versions || Object.keys(imageData.versions).length === 0) {
        setSelectedVersion(null);
        setLoading(false);
        return;
      }
      let versionName = null;
      if (version && version !== 'latest') {
        versionName = (version in imageData.versions) ? version : null;
      } else {
        versionName = findHighestVersion(imageData.versions);
      }
      setSelectedVersion(versionName);
      setLoading(false);
    }).catch(error_ => {
      console.error('MetaImageVersion error:', error_);
      setError(error_.message);
      setLoading(false);
    });
  }, [language, version]);
  if (loading) return <span>…</span>;
  if (error) return <span title={error}>⚠ unavailable</span>;
  if (!selectedVersion) return <span>No version found</span>;
  return <span>{selectedVersion}</span>;
};

It is a framework for storing, reading and analyzing streaming data. See the [Kafka documentation](https://kafka.apache.org/documentation) for more information.

## Supported versions

You can select the major and minor version.

Patch versions are applied periodically for bug fixes and the like.
When you deploy your app, you always get the latest available patches.

<MetaImageVersionList language="kafka" status="supported" platform="grid" />

<MetaImageVersionList language="kafka" status="incoming" platform="grid" />

## Deprecated versions

The following versions are still available in your projects,
but they're at their end of life and are no longer receiving security updates from upstream.

<MetaImageVersionList language="kafka" status="deprecated" platform="grid" />

To ensure your project remains stable in the future,
switch to a [supported version](#supported-versions).

## Retired versions

The following versions have been retired and are no longer available.
If your project uses a retired version, you must update to a [supported version](#supported-versions).

<MetaImageVersionList language="kafka" status="retired" platform="grid" />

## Relationship reference

For each service [defined via a relationship](#usage-example) to your application,
Upsun automatically generates corresponding environment variables within your application container,
in the `$<RELATIONSHIP-NAME>_<SERVICE-PROPERTY>` format.

Here is example information available through the [service environment variables](/docs/development/variables#service-environment-variables) themselves,
or through the [`PLATFORM_RELATIONSHIPS` environment variable](/docs/development/variables/use-variables#use-provided-variables).

You can obtain the complete list of available service environment variables in your app container by running `upsun ssh env`.

Note that the information about the relationship can change when an app is redeployed or restarted or the relationship is changed. So your apps should only rely on the [service environment variables](/docs/development/variables#service-environment-variables) directly rather than hard coding any values.

<Tabs>
  <Tab title="Service environment variables">
    <DynamicCodeBlock language="bash">
      {`
              KAFKA_SERVICE=kafka
              KAFKA_IP=123.456.78.90
              KAFKA_HOSTNAME=azertyuiopqsdfghjklm.kafka.service._.eu-1.platformsh.site
              KAFKA_CLUSTER=azertyuiop-main-7rqtwti
              KAFKA_HOST=kafka.internal
              KAFKA_REL=kafka
              KAFKA_SCHEME=kafka
              KAFKA_TYPE=kafka:{{version:kafka:latest}}
              KAFKA_PORT=9092
            `
          }
    </DynamicCodeBlock>
  </Tab>

  <Tab title="`PLATFORM_RELATIONSHIPS` environment variable">
    For some advanced use cases, you can use the [`PLATFORM_RELATIONSHIPS` environment variable](/docs/development/variables/use-variables#use-provided-variables).
    The structure of the `PLATFORM_RELATIONSHIPS` environment variable can be obtained by running `upsun relationships` in your terminal:

    <DynamicCodeBlock language="json">
      {`
              {
                "service": "kafka",
                "ip": "123.456.78.90",
                "hostname": "azertyuiopqsdfghjklm.kafka.service._.eu-1.platformsh.site",
                "cluster": "azertyuiop-main-7rqtwti",
                "host": "kafka.internal",
                "rel": "kafka",
                "scheme": "kafka",
                "type": "kafka:{{version:kafka:latest}}",
                "port": 9092
              }
            `
          }
    </DynamicCodeBlock>

    Here is an example of how to gather [`PLATFORM_RELATIONSHIPS` environment variable](/docs/development/variables/use-variables#use-provided-variables) information in a [`.environment` file](/docs/development/variables/set-variables#when-to-use-env-files):

    ```bash .environment theme={null}
    # Decode the built-in credentials object variable.
    export RELATIONSHIPS_JSON="$(echo "$PLATFORM_RELATIONSHIPS" | base64 --decode)"

    # Set environment variables for individual credentials.
    export APP_KAFKA_HOST="$(echo "$RELATIONSHIPS_JSON" | jq -r '.kafka[0].host')"
    ```
  </Tab>
</Tabs>

## Usage example

### 1. Configure the service

To define the service, use the `kafka` type:

<DynamicCodeBlock language="yaml" filename=".upsun/config.yaml">
  {`
      services:
        # The name of the service container. Must be unique within a project.
        <SERVICE_NAME>:
          type: kafka:<VERSION>
    `
  }
</DynamicCodeBlock>

Note that changing the name of the service replaces it with a brand new service and all existing data is lost. Back up your data before changing the service.

### 2. Define the relationship

To define the relationship, use the following configuration:

<Tabs>
  <Tab title="Using default endpoints">
    <DynamicCodeBlock language="yaml" filename=".upsun/config.yaml">
      {`
              applications:
                # The name of the app container. Must be unique within a project.
                <APP_NAME>:
                  # Relationships enable access from this app to a given service.
                  # The example below shows simplified configuration leveraging a default service
                  # (identified from the relationship name) and a default endpoint.
                  # See the Application reference for all options for defining relationships and endpoints.
                  relationships:
                    <SERVICE_NAME>:
            `
          }
    </DynamicCodeBlock>

    You can define `SERVICE_NAME` as you like, so long as it's unique between all defined services
    and matches in both the application and services configuration.

    The example above leverages [default endpoint](/docs/configure-apps/image-properties/relationships) configuration for relationships.
    That is, it uses default endpoints behind the scenes, providing a [relationship](/docs/configure-apps/image-properties/relationships)
    (the network address a service is accessible from) that is identical to the *name* of that service.

    Depending on your needs, instead of default endpoint configuration,
    you can use [explicit endpoint configuration](/docs/configure-apps/image-properties/relationships).

    With the above definition, the application container (`APP_NAME`) now has [access to the service](/docs/add-services/kafka) via the relationship `SERVICE_NAME` and its corresponding [service environment variables](/docs/development/variables#service-environment-variables)
  </Tab>

  <Tab title="Using explicit endpoints">
    <DynamicCodeBlock language="yaml" filename=".upsun/config.yaml">
      {`
              applications:
                # The name of the app container. Must be unique within a project.
                <APP_NAME>:
                  # Relationships enable access from this app to a given service.
                  # The example below shows configuration with an explicitly set service name and endpoint.
                  # See the Application reference for all options for defining relationships and endpoints.
                  relationships:
                    <RELATIONSHIP_NAME>:
                      service: <SERVICE_NAME>
                      endpoint: kafka
            `
          }
    </DynamicCodeBlock>

    You can define `SERVICE_NAME` and `<RELATIONSHIP_NAME>` as you like, so long as it's unique between all defined services and relationships
    and matches in both the application and services configuration.

    The example above leverages [explicit endpoint](/docs/configure-apps/image-properties/relationships) configuration for relationships.

    Depending on your needs, instead of explicit endpoint configuration,
    you can use [default endpoint configuration](/docs/configure-apps/image-properties/relationships).

    With the above definition, the application container now has [access to the service](#usage-example) via the relationship `<RELATIONSHIP_NAME>` and its corresponding [service environment variables](/docs/development/variables#service-environment-variables).
  </Tab>
</Tabs>

### Example configuration

<Tabs>
  <Tab title="Using default endpoints">
    <DynamicCodeBlock language="yaml" filename=".upsun/config.yaml">
      {`
              applications:
                # The name of the app container. Must be unique within a project.
                myapp:
                  # Relationships enable access from this app to a given service.
                  # The example below shows simplified configuration leveraging a default service
                  # (identified from the relationship name) and a default endpoint.
                  # See the Application reference for all options for defining relationships and endpoints.
                  relationships:
                    kafka:
              services:
                # The name of the service container. Must be unique within a project.
                kafka:
                  type: kafka:{{version:kafka:latest}}`
          }
    </DynamicCodeBlock>
  </Tab>

  <Tab title="Using explicit endpoints">
    <DynamicCodeBlock language="yaml" filename=".upsun/config.yaml">
      {`
              applications:
                # The name of the app container. Must be unique within a project.
                myapp:
                  # Relationships enable access from this app to a given service.
                  # The example below shows configuration with an explicitly set service name and endpoint.
                  # See the Application reference for all options for defining relationships and endpoints.
                  relationships:
                    kafka:
                      service: kafka
                      endpoint: kafka
              services:
                # The name of the service container. Must be unique within a project.
                kafka:
                  type: kafka:{{version:kafka:latest}}`
          }
    </DynamicCodeBlock>
  </Tab>
</Tabs>

## Exporting Data

Kafka stores messages in on-disk topic partitions. You can export messages using the
`kafka-console-consumer` tool over an SSH session.

1. Open an SSH session to your app container (Kafka is not directly accessible via `tunnel:single`):

```bash Terminal theme={null}
upsun ssh
```

2. From the app container, list available topics:

```bash Terminal theme={null}
kafka-topics.sh \
  --bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
  --list
```

3. Export all messages from a topic to a file:

```bash Terminal theme={null}
kafka-console-consumer.sh \
  --bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
  --topic <TOPIC_NAME> \
  --from-beginning \
  --max-messages <NUMBER> \
  --timeout-ms 5000 \
  > /tmp/kafka-export-<TOPIC_NAME>.txt
```

4. Download the exported file:

```bash Terminal theme={null}
# From your local machine
upsun scp remote:/tmp/kafka-export-<TOPIC_NAME>.txt ./kafka-export-<TOPIC_NAME>.txt
```

The Kafka connection credentials (`KAFKA_HOST`, `KAFKA_PORT`) are available as
[service environment variables](/docs/development/variables#service-environment-variables)
inside the app container.

Note that Kafka is designed as a streaming platform, not a primary data store.
In most architectures, the authoritative copy of your data exists elsewhere (for example in a database),
and Kafka topics can be replayed or re-populated from that source.
