聊聊flink TaskManager的memory大小設(shè)置(flink taskmanager 內(nèi)存)
序
本文主要研究一下flink TaskManager的memory大小設(shè)置
flink-conf.yaml
flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml
# The heap size for the TaskManager JVM?taskmanager.heap.size: 1024m??# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.?taskmanager.numberOfTaskSlots: 1?# Specify whether TaskManager\’s managed memory should be allocated when starting# up (true) or when memory is requested.## We recommend to set this value to \’true\’ only in setups for pure batch# processing (DataSet API). Streaming setups currently do not use the TaskManager\’s# managed memory: The \’rocksdb\’ state backend uses RocksDB\’s own memory management,# while the \’memory\’ and \’filesystem\’ backends explicitly keep data as objects# to save on serialization cost.## taskmanager.memory.preallocate: false?# The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an \”Insufficient number# of network buffers\” error. The default min is 64MB, teh default max is 1GB.# # taskmanager.network.memory.fraction: 0.1# taskmanager.network.memory.min: 64mb# taskmanager.network.memory.max: 1gb
- flink-conf.yaml提供了taskmanager.heap.size來設(shè)置taskmanager的memory(heap及offHeap)大小
- 提供了taskmanager.memory相關(guān)配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size)用于設(shè)置memory
- 提供了taskmanager.network.memory相關(guān)配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min)用于設(shè)置taskmanager的network stack的內(nèi)存
config.sh
flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.sh
#!/usr/bin/env bash?# WARNING !!! , these values are only used if there is nothing else is specified in# conf/flink-conf.yaml?DEFAULT_ENV_PID_DIR=\”/tmp\” # Directory to store *.pid files toDEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keepDEFAULT_ENV_JAVA_OPTS=\”\” # Optional JVM argsDEFAULT_ENV_JAVA_OPTS_JM=\”\” # Optional JVM args (JobManager)DEFAULT_ENV_JAVA_OPTS_TM=\”\” # Optional JVM args (TaskManager)DEFAULT_ENV_JAVA_OPTS_HS=\”\” # Optional JVM args (HistoryServer)DEFAULT_ENV_SSH_OPTS=\”\” # Optional SSH parameters running in cluster modeDEFAULT_YARN_CONF_DIR=\”\” # YARN Configuration Directory, if necessaryDEFAULT_HADOOP_CONF_DIR=\”\” # Hadoop Configuration Directory, if necessary?KEY_TASKM_MEM_SIZE=\”taskmanager.heap.size\”KEY_TASKM_MEM_MB=\”taskmanager.heap.mb\”KEY_TASKM_MEM_MANAGED_SIZE=\”taskmanager.memory.size\”KEY_TASKM_MEM_MANAGED_FRACTION=\”taskmanager.memory.fraction\”KEY_TASKM_OFFHEAP=\”taskmanager.memory.off-heap\”KEY_TASKM_MEM_PRE_ALLOCATE=\”taskmanager.memory.preallocate\”?KEY_TASKM_NET_BUF_FRACTION=\”taskmanager.network.memory.fraction\”KEY_TASKM_NET_BUF_MIN=\”taskmanager.network.memory.min\”KEY_TASKM_NET_BUF_MAX=\”taskmanager.network.memory.max\”KEY_TASKM_NET_BUF_NR=\”taskmanager.network.numberOfBuffers\” # fallback?KEY_TASKM_COMPUTE_NUMA=\”taskmanager.compute.numa\”?# Define FLINK_TM_HEAP if it is not already setif [ -z \”${FLINK_TM_HEAP}\” ]; then FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 \”${YAML_CONF}\”)fi?# Try read old config key, if new key not existsif [ \”${FLINK_TM_HEAP}\” == 0 ]; then FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 \”${YAML_CONF}\”)fi?# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already setif [ -z \”${FLINK_TM_MEM_MANAGED_SIZE}\” ]; then FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 \”${YAML_CONF}\”)? if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE})) else FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}\”m\”)) fifi?# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already setif [ -z \”${FLINK_TM_MEM_MANAGED_FRACTION}\” ]; then FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 \”${YAML_CONF}\”)fi?# Define FLINK_TM_OFFHEAP if it is not already setif [ -z \”${FLINK_TM_OFFHEAP}\” ]; then FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} \”false\” \”${YAML_CONF}\”)fi?# Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already setif [ -z \”${FLINK_TM_MEM_PRE_ALLOCATE}\” ]; then FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} \”false\” \”${YAML_CONF}\”)fi??# Define FLINK_TM_NET_BUF_FRACTION if it is not already setif [ -z \”${FLINK_TM_NET_BUF_FRACTION}\” ]; then FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 \”${YAML_CONF}\”)fi?# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)if [ -z \”${FLINK_TM_NET_BUF_MIN}\” -a -z \”${FLINK_TM_NET_BUF_MAX}\” ]; then FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 \”${YAML_CONF}\”) if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN}) FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN} fifi?# Define FLINK_TM_NET_BUF_MIN if it is not already setif [ -z \”${FLINK_TM_NET_BUF_MIN}\” -o \”${FLINK_TM_NET_BUF_MIN}\” = \”-1\” ]; then # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each) FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 \”${YAML_CONF}\”) FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})fi?# Define FLINK_TM_NET_BUF_MAX if it is not already setif [ -z \”${FLINK_TM_NET_BUF_MAX}\” -o \”${FLINK_TM_NET_BUF_MAX}\” = \”-1\” ]; then # default: 1GB = 1073741824 bytes FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 \”${YAML_CONF}\”) FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})fi
- config.sh在相關(guān)變量沒有設(shè)置的前提下,初始化了FLINK_TM_HEAP、FLINK_TM_MEM_MANAGED_SIZE、FLINK_TM_MEM_MANAGED_FRACTION、FLINK_TM_OFFHEAP、FLINK_TM_MEM_PRE_ALLOCATE、FLINK_TM_NET_BUF_FRACTION等變量
taskmanager.sh
flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/taskmanager.sh
#!/usr/bin/env bash# Start/stop a Flink TaskManager.USAGE=\”Usage: taskmanager.sh (start|start-foreground|stop|stop-all)\”?STARTSTOP=$1?ARGS=(\”${@:2}\”)?if [[ $STARTSTOP != \”start\” ]] && [[ $STARTSTOP != \”start-foreground\” ]] && [[ $STARTSTOP != \”stop\” ]] && [[ $STARTSTOP != \”stop-all\” ]]; then echo $USAGE exit 1fi?bin=`dirname \”$0\”`bin=`cd \”$bin\”; pwd`?. \”$bin\”/config.sh?ENTRYPOINT=taskexecutor?if [[ $STARTSTOP == \”start\” ]] || [[ $STARTSTOP == \”start-foreground\” ]]; then? # if memory allocation mode is lazy and no other JVM options are set, # set the \’Concurrent Mark Sweep GC\’ if [[ $FLINK_TM_MEM_PRE_ALLOCATE == \”false\” ]] && [ -z \”${FLINK_ENV_JAVA_OPTS}\” ] && [ -z \”${FLINK_ENV_JAVA_OPTS_TM}\” ]; then export JVM_ARGS=\”$JVM_ARGS -XX: UseG1GC\” fi? if [ ! -z \”${FLINK_TM_HEAP_MB}\” ] && [ \”${FLINK_TM_HEAP}\” == 0 ]; then echo \”used deprecated key `${KEY_TASKM_MEM_MB}`, please replace with key `${KEY_TASKM_MEM_SIZE}`\” else flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) fi? if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ \”${FLINK_TM_HEAP_MB}\” -lt \”0\” ]]; then echo \”[ERROR] Configured TaskManager JVM heap size is not a number. Please set \’${KEY_TASKM_MEM_SIZE}\’ in ${FLINK_CONF_FILE}.\” exit 1 fi? if [ \”${FLINK_TM_HEAP_MB}\” -gt \”0\” ]; then? TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used TM_MAX_OFFHEAP_SIZE=\”8388607T\”? export JVM_ARGS=\”${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}\”? fi? # Add TaskManager-specific JVM options export FLINK_ENV_JAVA_OPTS=\”${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}\”? # Startup parameters ARGS =(\”–configDir\” \”${FLINK_CONF_DIR}\”)fi?if [[ $STARTSTOP == \”start-foreground\” ]]; then exec \”${FLINK_BIN_DIR}\”/flink-console.sh $ENTRYPOINT \”${ARGS[@]}\”else if [[ $FLINK_TM_COMPUTE_NUMA == \”false\” ]]; then # Start a single TaskManager \”${FLINK_BIN_DIR}\”/flink-daemon.sh $STARTSTOP $ENTRYPOINT \”${ARGS[@]}\” else # Example output from `numactl –show` on an AWS c4.8xlarge: # policy: default # preferred node: current # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # cpubind: 0 1 # nodebind: 0 1 # membind: 0 1 read -ra NODE_LIST <<< $(numactl –show | grep \”^nodebind: \”) for NODE_ID in \”${NODE_LIST[@]:1}\”; do # Start a TaskManager for each NUMA node numactl –membind=$NODE_ID –cpunodebind=$NODE_ID — \”${FLINK_BIN_DIR}\”/flink-daemon.sh $STARTSTOP $ENTRYPOINT \”${ARGS[@]}\” done fifi
- taskmanager.sh首先調(diào)用config.sh初始化相關(guān)變量,之后計(jì)算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后調(diào)用flink-console.sh啟動(dòng)相關(guān)類
- 如果FLINK_TM_MEM_PRE_ALLOCATE為false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都沒有設(shè)置,則追加-XX: UseG1GC到JVM_ARGS;之后讀取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0則通過calculateTaskManagerHeapSizeMB計(jì)算TM_HEAP_SIZE,然后以TM_HEAP_SIZE設(shè)置xms及Xmx,以TM_MAX_OFFHEAP_SIZE設(shè)置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM則會(huì)追加到FLINK_ENV_JAVA_OPTS
- calculateTaskManagerHeapSizeMB在config.sh中有定義,另外其對(duì)應(yīng)的java代碼在TaskManagerServices.calculateHeapSizeMB
TaskManagerServices
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
public class TaskManagerServices { //……? /** * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>) * based on the total memory to use and the given configuration parameters. * * @param totalJavaMemorySizeMB * overall available memory to use (heap and off-heap) * @param config * configuration object * * @return heap memory to use (in megabytes) */ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) { Preconditions.checkArgument(totalJavaMemorySizeMB > 0);? // subtract the Java memory used for network buffers (always off-heap) final long networkBufMB = calculateNetworkBufferMemory( totalJavaMemorySizeMB << 20, // megabytes to bytes config) >> 20; // bytes to megabytes final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB – networkBufMB;? // split the available Java memory between heap and off-heap? final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);? final long heapSizeMB; if (useOffHeap) {? long offHeapSize; String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { try { offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException( \”Could not read \” TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); } } else { offHeapSize = Long.valueOf(managedMemorySizeDefaultVal); }? if (offHeapSize <= 0) { // calculate off-heap section via fraction double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); offHeapSize = (long) (fraction * remainingJavaMemorySizeMB); }? TaskManagerServicesConfiguration .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), \”Managed memory size too large for \” networkBufMB \” MB network buffer memory and a total of \” totalJavaMemorySizeMB \” MB JVM memory\”);? heapSizeMB = remainingJavaMemorySizeMB – offHeapSize; } else { heapSizeMB = remainingJavaMemorySizeMB; }? return heapSizeMB; }? /** * Calculates the amount of memory used for network buffers based on the total memory to use and * the according configuration parameters. * * <p>The following configuration parameters are involved: * <ul> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li> * <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li> * </ul>. * * @param totalJavaMemorySize * overall available memory to use (heap and off-heap, in bytes) * @param config * configuration object * * @return memory to use for network buffers (in bytes); at least one memory segment */ @SuppressWarnings(\”deprecation\”) public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) { Preconditions.checkArgument(totalJavaMemorySize > 0);? int segmentSize = checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());? final long networkBufBytes; if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) { // new configuration based on fractions of available memory with selectable min and max float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();?? TaskManagerServicesConfiguration .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);? networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, (long) (networkBufFraction * totalJavaMemorySize)));? TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes < totalJavaMemorySize, \”(\” networkBufFraction \”, \” networkBufMin \”, \” networkBufMax \”)\”, \”(\” TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() \”, \” TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() \”, \” TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() \”)\”, \”Network buffer memory size too large: \” networkBufBytes \” >= \” totalJavaMemorySize \” (total JVM memory size)\”); TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes >= segmentSize, \”(\” networkBufFraction \”, \” networkBufMin \”, \” networkBufMax \”)\”, \”(\” TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() \”, \” TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() \”, \” TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() \”)\”, \”Network buffer memory size too small: \” networkBufBytes \” < \” segmentSize \” (\” TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() \”)\”); } else { // use old (deprecated) network buffers parameter int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;? TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);? TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes < totalJavaMemorySize, networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), \”Network buffer memory size too large: \” networkBufBytes \” >= \” totalJavaMemorySize \” (total JVM memory size)\”); TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes >= segmentSize, networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), \”Network buffer memory size too small: \” networkBufBytes \” < \” segmentSize \” (\” TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() \”)\”); }? return networkBufBytes; }? //……}
- FLINK_TM_HEAP設(shè)置的是taskmanager的memory(heap及offHeap)大小,而network buffers總是使用offHeap,因而這里首先要從FLINK_TM_HEAP扣減掉這部分offHeap然后重新計(jì)算Xms及Xmx
- calculateHeapSizeMB先調(diào)用calculateNetworkBufferMemory計(jì)算networkBufMB,然后從totalJavaMemorySizeMB扣減掉networkBufMB得到remainingJavaMemorySizeMB
- 之后讀取taskmanager.memory.off-heap設(shè)置,默認(rèn)為false,則直接以remainingJavaMemorySizeMB返回;如果為true,則需要計(jì)算offHeapSize的值,然后從remainingJavaMemorySizeMB扣減offHeapSize再返回
小結(jié)
- flink-conf.yaml提供了taskmanager.heap.size來設(shè)置taskmanager的memory(heap及offHeap)大小;提供了taskmanager.memory相關(guān)配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size)用于設(shè)置memory;提供了taskmanager.network.memory相關(guān)配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min)用于設(shè)置taskmanager的network stack的內(nèi)存
- taskmanager.sh首先調(diào)用config.sh初始化相關(guān)變量,之后計(jì)算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后調(diào)用flink-console.sh啟動(dòng)相關(guān)類;如果FLINK_TM_MEM_PRE_ALLOCATE為false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都沒有設(shè)置,則追加-XX: UseG1GC到JVM_ARGS;之后讀取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0則通過calculateTaskManagerHeapSizeMB計(jì)算TM_HEAP_SIZE,然后以TM_HEAP_SIZE設(shè)置xms及Xmx,以TM_MAX_OFFHEAP_SIZE設(shè)置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM則會(huì)追加到FLINK_ENV_JAVA_OPTS;calculateTaskManagerHeapSizeMB在config.sh中有定義,另外其對(duì)應(yīng)的java代碼在TaskManagerServices.calculateHeapSizeMB
- FLINK_TM_HEAP設(shè)置的是taskmanager的memory(heap及offHeap)大小,而network buffers總是使用offHeap,因而這里首先要從FLINK_TM_HEAP扣減掉這部分offHeap然后重新計(jì)算Xms及Xmx;calculateHeapSizeMB先調(diào)用calculateNetworkBufferMemory計(jì)算networkBufMB,然后從totalJavaMemorySizeMB扣減掉networkBufMB得到remainingJavaMemorySizeMB;之后讀取taskmanager.memory.off-heap設(shè)置,默認(rèn)為false,則直接以remainingJavaMemorySizeMB返回;如果為true,則需要計(jì)算offHeapSize的值,然后從remainingJavaMemorySizeMB扣減offHeapSize再返回
由此可見最后的jvm參數(shù)取決于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要設(shè)置內(nèi)存相關(guān)參數(shù)到JVM_ARGS,因?yàn)閠askmanager.sh在FLINK_TM_HEAP_MB大于0的時(shí)候,則使用該值計(jì)算TM_HEAP_SIZE設(shè)置Xms及Xmx追加到JVM_ARGS變量中,而FLINK_TM_HEAP_MB則取決于FLINK_TM_HEAP或者taskmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置則取決于env.java.opts以及env.java.opts.taskmanager;因而要配置taskmanager的memory(heap及offHeap)大小,可以指定FLINK_TM_HEAP環(huán)境變量(比如FLINK_TM_HEAP=512m),或者在flink-conf.yaml中指定taskmanager.heap.size;而最終的Xms及Xmx則是FLINK_TM_HEAP扣減掉offHeap而來,確定使用offHeap為network buffers,其余的看是否開啟taskmanager.memory.off-heap,默認(rèn)為false
doc
- TaskManager